Publishing RabbitMQ messages from PostgreSQL
One quite useful thing is the ability to publish something to a message broker such as RabbitMQ directly from a database like PostgreSQL. This article will show how this can be done using some simple Node.js bridge code so that whenever a row is inserted into a table in PostgreSQL, a message will be published to RabbitMQ in a safe and reliable manner.
The features that we will be using have been widely available for several years in the above software, so unless you plan to run this on an extremely old version of any of the above compatibility is unlikely to be an issue.
The basic unit of operation is going to be the PostgreSQL
queue_message table. Every row inserted into
queue_message will be turned into a corresponding message published to a RabbitMQ exchange. When the message has been published successfully, it will be deleted from the table.
There is no reason to be fancy with the table design, we are simply going to make it so that it has exactly all the columns required for the purpose on hand. We are going to use the publish method in the Node.js library amqplib, which can be used to communicate with RabbitMQ. The
publish method in amqplib takes all of the below table columns directly as parameters, except the
id column which we will use as the primary key.
Whenever a row is inserted into
queue_message we need to perform a PostgreSQL
pg_notify. This can be done with a basic insert trigger on the
queue_message table as shown below. I wrote about broadcasting row updates previously on this website.
Listening for notifications with Node.js
This is where we break out Node.js. We are going to use the pg-listen package to listen on the
queue_message_notify channel. As can be seen in the definition above we send the ID of the inserted row out on the channel as a JSON-formatted message. As soon as the ID is received by our listener it will be used to read the entire row from the database.
When reading the row from the database, the most important thing to do is to lock it using the
FOR UPDATE clause in PostgreSQL. This ensures that only the current connection/transaction to PostgreSQL will be able to manipulate it, which is crucial for our use case. If we did not lock the row, every concurrent listener on
queue_message_notify would then go on and publish as many messages to RabbitMQ, which is definitely not what we want. We want one row to equal one message published.
PostgreSQL has an option to error when a particular row could not be locked, but we don’t want to do that either. From the point of view of one running listener, not being able to acquire a lock for a particular row probably means another listener locked the row before us. So we will use the
SKIP LOCKED clause to safely ignore unlockable rows. The query to select the row for publishing used by every listener then becomes the following:
Putting everything together then results in the following simple Node.js “table listener”,
Most of the interesting things happen in
popOneQueueMessage() at the bottom of the
listen.js. The operation of this method is basically the following:
- Begin a transaction
- Attempt to lock a row in
- Open a RabbitMQ confirmation channel and publish the message to it
- Wait for the message to have been confirmed
- Delete the row from
- Commit the transaction, making the delete permanent
- If an error happens, roll the transaction back, “returning” the row to
queue_messagefor another listener to handle
When the listener is started (in the
run() method) it also automatically checks for any existing queue messages and then tries to “pop” them one-by-one from the queue. The listener might not have been running when a row was inserted into
queue_message so it needs to immediately check on start if there are any outstanding messages to publish or not.
I like to make a
services.js file where connection setup can happen so that this tedious bit of code is kept from immediate view. Here it is, for reference:
Note that the pg library will use the libpq environment variables (such as
PGHOST and so on) by default, so we don’t have to define where to connect in case we are just using the default database of the current user.
Now we are going to test the listener to make sure that it works. I setup a basic testing script in Node.js that spawns a definable amount of listeners and inserts some random rows in an incrementing series into
queue_message. It lets the listeners handle the messages and then checks that every message was published successfully to RabbitMQ, only once. See
Running this, we will see that it will work:
$ node test.js > connected to postgres > connected to amqp > spawned listener #0 > spawned listener #1 > 1 out: connected to postgres > 0 out: connected to postgres > 1 out: connected to amqp > 0 out: connected to amqp > 0 out: pgListen connected to postgres > 1 out: pgListen connected to postgres > 1 out: listening on queue_message_notify > 0 out: listening on queue_message_notify > killed listener #0 > killed listener #1 > pass: all 100 messages received > pass: no duplicates received
Here are some thought on how the above code can be extended to be better and/or more useful.
Saving published messages
Rows inserted into
queue_message are immediately discarded, but we can make some very simple modifications to store the rows permanently instead. Adding a boolean column
is_published that is set to
TRUE instead of deleting the just published row would solve this. Naming it
published_at and letting it be a
timestamp instead would work too, if we are interested in recording the time the message was published.
We would then select only rows where
IS NOT NULL when we need to pop from
queue_message. Finally we will update
TRUE or set a time stamp instead of running a
DELETE query against the message row.
Expiring messages after a number of attempts
One problem with the above
listener.js is what happens if a message fails to be routed in RabbitMQ, or if there is some other problem within the
try block. What happens then is that the transaction rolls back and the
queue_message row gets “returned” to the table. And what then? Not much, actually. Since the return does not cause the table trigger to run, the row will remain there until the next
listener.js instance is launched (remember that the
queue_message table is checked on every startup) at which point a new attempt is made.
I briefly experimented with adding a
retry integer column to the table and extending the definition of the trigger so that it would also run whenever this column was updated. Whenever the message failed to publish the retry column would be incremented which caused the trigger to run and the message to be retried. This sort-of solved the problem, but it is dependent on the that the message can be published at all, even if eventually. If the message was meant for an exchange that does not exist or if it contains some other semantic error the only correct way is to log it and then discard it completely. How to solve this is in a good way is going to be dependent on the application.
Expanding on “Saving published messages” we could then extend the code in
listener.js to receive messages from RabbitMQ as well. This could be used to create a more concrete service for a specific purpose. For instance a service that triggers the fetching of an URL whenever a row is inserted, storing the result in the same row. If we create a table that looks like the following:
We can then use this table for our queue instead of
popQueueMessage() method would be changed to
SELECT unfetched rows where
response_at IS NULL that are then downloaded and the result written into the
response_* columns. Downloading a URL becomes as simple as the following:
You can then query the table with the returned ID for the response. There are lots of possibilities like this.
The source code for all of the above examples is available here: https://github.com/tedeh/publishing-rabbitmq-messages-from-postgresql.
There is actually an unofficial RabbitMQ plugin available on Github for a similar use case to this: gmr/pgsql-listen-exchange. I briefly investigated this but it seems too generic for me. It listens to all Postgres
NOTIFY calls instead of just the ones that are required. You could solve this problem with additional routing of the correct notifications within RabbitMQ itself but I prefer the solution I have presented above.
If I required a stand-alone solution that was not part of an existing deployment of Node.js applications I would probably investigate the use of a plugin such as that one first.