Publishing RabbitMQ messages from PostgreSQL

One quite useful thing is the ability to publish something to a message broker such as RabbitMQ directly from a database like PostgreSQL. This article will show how this can be done using some simple Node.js bridge code so that whenever a row is inserted into a table in PostgreSQL, a message will be published to RabbitMQ in a safe and reliable manner.

Requirements

The features that we will be using have been widely available for several years in the above software, so unless you plan to run this on an extremely old version of any of the above compatibility is unlikely to be an issue.

The queue_message table

The basic unit of operation is going to be the PostgreSQL queue_message table. Every row inserted into queue_message will be turned into a corresponding message published to a RabbitMQ exchange. When the message has been published successfully, it will be deleted from the table.

There is no reason to be fancy with the table design, we are simply going to make it so that it has exactly all the columns required for the purpose on hand. We are going to use the publish method in the Node.js library amqplib, which can be used to communicate with RabbitMQ. The publish method in amqplib takes all of the below table columns directly as parameters, except the id column which we will use as the primary key.

1
2
3
4
5
6
7
8
9
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE IF NOT EXISTS queue_message (
  id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
  exchange text NOT NULL,
  routing_key text NOT NULL,
  content jsonb NOT NULL DEFAULT '{}',
  options jsonb NOT NULL DEFAULT '{}'
);

Whenever a row is inserted into queue_message we need to perform a PostgreSQL pg_notify. This can be done with a basic insert trigger on the queue_message table as shown below. I wrote about broadcasting row updates previously on this website.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE OR REPLACE FUNCTION queue_message_notify() RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify(
      'queue_message_notify',
      jsonb_build_object('id', NEW.id::text)::text
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER do_notify
AFTER INSERT ON queue_message
FOR EACH ROW EXECUTE PROCEDURE queue_message_notify();

Listening for notifications with Node.js

This is where we break out Node.js. We are going to use the pg-listen package to listen on the queue_message_notify channel. As can be seen in the definition above we send the ID of the inserted row out on the channel as a JSON-formatted message. As soon as the ID is received by our listener it will be used to read the entire row from the database.

When reading the row from the database, the most important thing to do is to lock it using the FOR UPDATE clause in PostgreSQL. This ensures that only the current connection/transaction to PostgreSQL will be able to manipulate it, which is crucial for our use case. If we did not lock the row, every concurrent listener on queue_message_notify would then go on and publish as many messages to RabbitMQ, which is definitely not what we want. We want one row to equal one message published.

PostgreSQL has an option to error when a particular row could not be locked, but we don’t want to do that either. From the point of view of one running listener, not being able to acquire a lock for a particular row probably means another listener locked the row before us. So we will use the SKIP LOCKED clause to safely ignore unlockable rows. The query to select the row for publishing used by every listener then becomes the following:

1
SELECT * FROM queue_message WHERE id = $1 FOR UPDATE SKIP LOCKED;

Putting everything together then results in the following simple Node.js “table listener”, listener.js:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
const services = require('./services');

run().catch(err => {
  console.error(err);
});

async function run () {
  const pg = await services.pg();
  const amqp = await services.amqp();
  const pgSubscriber = await services.pgListen();

  await pgSubscriber.listenTo('queue_message_notify');
  pgSubscriber.events.on('notification', ({channel, payload}) => {
    const { id } = payload;
    popOneQueueMessage({pg, amqp}, id).catch(err => {
      console.error(err);
    });
  });
  console.log('listening on queue_message_notify');

  const result = await pg.query(`SELECT id, content FROM queue_message`);
  for (const row of result.rows) {
    await popOneQueueMessage({pg, amqp}, row.id);
  }
}

async function popOneQueueMessage (deps, id) {
  const { pg, amqp } = deps;
  const pool = await pg.connect();

  try {
    await pool.query('BEGIN');

    const result = await pool.query({
      text: 'SELECT * FROM queue_message WHERE id = $1 FOR UPDATE SKIP LOCKED',
      values: [id],
    });

    const row = result?.rows[0];
    if (!row) {
      return false;
    }
    const { exchange, routing_key, content, options } = row;

    const ch = await amqp.createConfirmChannel();

    // required by amqplib not to exit program on error
    ch.on('error', err => {
      console.error(err);
    });

    const str = JSON.stringify(content);
    await ch.publish(exchange, routing_key, Buffer.from(str), options);
    await ch.waitForConfirms();
    await ch.close();

    await pool.query({
      text: 'DELETE FROM queue_message WHERE id = $1',
      values: [id],
    });

    await pool.query('COMMIT');
    return true;
  } catch (err) {
    console.error(err);
    await pool.query('ROLLBACK');
    return false;
  } finally {
    await pool.release();
  }
}

Most of the interesting things happen in popOneQueueMessage() at the bottom of the listen.js. The operation of this method is basically the following:

  1. Begin a transaction
  2. Attempt to lock a row in queue_message by ID
  3. Open a RabbitMQ confirmation channel and publish the message to it
  4. Wait for the message to have been confirmed
  5. Delete the row from queue_message
  6. Commit the transaction, making the delete permanent
  7. If an error happens, roll the transaction back, “returning” the row to queue_message for another listener to handle

When the listener is started (in the run() method) it also automatically checks for any existing queue messages and then tries to “pop” them one-by-one from the queue. The listener might not have been running when a row was inserted into queue_message so it needs to immediately check on start if there are any outstanding messages to publish or not.

Connection setup

I like to make a services.js file where connection setup can happen so that this tedious bit of code is kept from immediate view. Here it is, for reference:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const amqplib = require('amqplib');
const pgListen = require('pg-listen');
const pg = require('pg');

const AMQP_URL = process.env.AMQP_URL || 'amqp://guest:[email protected]:5672';

exports.pg = async function () {
  const pgClient = new pg.Pool();
  process.on('exit', () => pgClient.end());
  console.log('connected to postgres');
  return pgClient;
};

exports.amqp = async function () {
  const amqp = await amqplib.connect(AMQP_URL);
  amqp.on('error', err => {
    console.error('amqp error');
    console.error(err.message);
  });
  console.log('connected to amqp');
  return amqp;
};

exports.pgListen = async function () {
  const pgListenClient = pgListen();
  process.on('exit', () => pgListenClient.close());
  await pgListenClient.connect();
  console.log('pgListen connected to postgres');
  return pgListenClient;
};

Note that the pg library will use the libpq environment variables (such as PGHOST and so on) by default, so we don’t have to define where to connect in case we are just using the default database of the current user.

Testing everything

Now we are going to test the listener to make sure that it works. I setup a basic testing script in Node.js that spawns a definable amount of listeners and inserts some random rows in an incrementing series into queue_message. It lets the listeners handle the messages and then checks that every message was published successfully to RabbitMQ, only once. See test.js below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
const assert = require('assert');
const { spawn } = require('child_process');
const delay = require('delay');
const services = require('./services');

const LISTENERS = parseInt(process.env.LISTENERS, 10) || 2;
const PUBLISH_COUNT = parseInt(process.env.PUBLISH_COUNT, 10) || 100;
const JITTER = parseInt(process.env.JITTER, 10) || 7;
const AFTER_PUBLISH_DELAY = parseInt(process.env.AFTER_PUBLISH_DELAY, 10) || PUBLISH_COUNT * JITTER;

test();

async function test () {
  const pg = await services.pg();
  const amqp = await services.amqp();

  const ch = await amqp.createChannel();

  // setup a queue
  await ch.assertQueue('queue_message_test_log');
  await ch.purgeQueue('queue_message_test_log');
  await ch.bindQueue('queue_message_test_log', 'amq.direct', 'queue_message_test');

  const received = {}; // every message value stored here
  await ch.consume('queue_message_test_log', msg => {
    const json = JSON.parse(msg.content);
    assert.ok(typeof json.value === 'number');
    received[json.value] = (received[json.value] || 0) + 1;
  });

  // spawn sub-processes "listeners"
  const children = [];
  for (let i = 0; i < LISTENERS; i++) {
    const child = spawn('node', ['./listen.js']);
    child.stdout.on('data', data => {
      process.stdout.write(`${i} out: ${data}`);
    });
    child.stderr.on('data', data => {
      process.stderr.write(`${i} err: ${data}`);
    });
    console.log(`spawned listener #${i}`);
    children.push(child);
  }

  await pg.query(`CREATE TEMP SEQUENCE queue_message_test_seq MINVALUE 0 START WITH 0`);
  await pg.query('TRUNCATE queue_message');

  for (let i = 0; i < PUBLISH_COUNT; i++) {
    await pg.query(`
      INSERT INTO queue_message (exchange, routing_key, content)
      VALUES (
        'amq.direct',
        'queue_message_test',
        jsonb_build_object('value', nextval('queue_message_test_seq'))
      );
    `);
    await delay(Math.random() * JITTER);
  }

  // we wait a fixed amount of time here to give every listener a chance to finish
  await delay(AFTER_PUBLISH_DELAY);

  for (const i in children) {
    const child = children[i];
    child.kill();
    console.log(`killed listener #${i}`);
  }

  // check what was received
  const duplicates = [];
  const unreceived = [];
  for (let i = 0; i < PUBLISH_COUNT; i++) {
    if (typeof received[i] === 'undefined') {
      unreceived.push(i);
    } else if (received[i] > 1) {
      duplicates.push(i);
    }
  }
  assert.ok(unreceived.length === 0, `error: did not receive ${unreceived.join(', ')}`);
  console.log('pass: all messages received');
  assert.ok(duplicates.length === 0, `error: duplicates received ${duplicates.join(', ')}`);
  console.log('pass: no duplicates received');
  process.exit();
}

Running this, we will see that it will work:

$ node test.js
> connected to postgres
> connected to amqp
> spawned listener #0
> spawned listener #1
> 1 out: connected to postgres
> 0 out: connected to postgres
> 1 out: connected to amqp
> 0 out: connected to amqp
> 0 out: pgListen connected to postgres
> 1 out: pgListen connected to postgres
> 1 out: listening on queue_message_notify
> 0 out: listening on queue_message_notify
> killed listener #0
> killed listener #1
> pass: all 100 messages received
> pass: no duplicates received

Wrapping up

Here are some thought on how the above code can be extended to be better and/or more useful.

Saving published messages

Rows inserted into queue_message are immediately discarded, but we can make some very simple modifications to store the rows permanently instead. Adding a boolean column is_published that is set to TRUE instead of deleting the just published row would solve this. Naming it published_at and letting it be a timestamp instead would work too, if we are interested in recording the time the message was published.

We would then select only rows where is_published equals FALSE or published_at IS NOT NULL when we need to pop from queue_message. Finally we will update is_published to TRUE or set a time stamp instead of running a DELETE query against the message row.

Expiring messages after a number of attempts

One problem with the above listener.js is what happens if a message fails to be routed in RabbitMQ, or if there is some other problem within the try block. What happens then is that the transaction rolls back and the queue_message row gets “returned” to the table. And what then? Not much, actually. Since the return does not cause the table trigger to run, the row will remain there until the next listener.js instance is launched (remember that the queue_message table is checked on every startup) at which point a new attempt is made.

I briefly experimented with adding a retry integer column to the table and extending the definition of the trigger so that it would also run whenever this column was updated. Whenever the message failed to publish the retry column would be incremented which caused the trigger to run and the message to be retried. This sort-of solved the problem, but it is dependent on the that the message can be published at all, even if eventually. If the message was meant for an exchange that does not exist or if it contains some other semantic error the only correct way is to log it and then discard it completely. How to solve this is in a good way is going to be dependent on the application.

Receiving messages

Expanding on “Saving published messages” we could then extend the code in listener.js to receive messages from RabbitMQ as well. This could be used to create a more concrete service for a specific purpose. For instance a service that triggers the fetching of an URL whenever a row is inserted, storing the result in the same row. If we create a table that looks like the following:

1
2
3
4
5
6
7
CREATE TABLE url_fetch (
  id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
  url string NOT NULL,
  response_at timestamp,
  response_code integer,
  response_body text
);

We can then use this table for our queue instead of queue_message. Using listener.js the popQueueMessage() method would be changed to SELECT unfetched rows where response_at IS NULL that are then downloaded and the result written into the response_* columns. Downloading a URL becomes as simple as the following:

1
INSERT INTO url_fetch (url) VALUES ('https://tedeh.net') RETURNING id;

You can then query the table with the returned ID for the response. There are lots of possibilities like this.

Source code

The source code for all of the above examples is available here: https://github.com/tedeh/publishing-rabbitmq-messages-from-postgresql.

Alternative solutions

There is actually an unofficial RabbitMQ plugin available on Github for a similar use case to this: gmr/pgsql-listen-exchange. I briefly investigated this but it seems too generic for me. It listens to all Postgres NOTIFY calls instead of just the ones that are required. You could solve this problem with additional routing of the correct notifications within RabbitMQ itself but I prefer the solution I have presented above.

If I required a stand-alone solution that was not part of an existing deployment of Node.js applications I would probably investigate the use of a plugin such as that one first.