Publishing RabbitMQ messages from PostgreSQL
One quite useful thing is the ability to publish something to a message broker such as RabbitMQ directly from a database like PostgreSQL. This article will show how this can be done using some simple Node.js bridge code so that whenever a row is inserted into a table in PostgreSQL, a message will be published to RabbitMQ in a safe and reliable manner.
Requirements
The features that we will be using have been widely available for several years in the above software, so unless you plan to run this on an extremely old version of any of the above compatibility is unlikely to be an issue.
The queue_message
table
The basic unit of operation is going to be the PostgreSQL queue_message
table. Every row inserted into queue_message
will be turned into a corresponding message published to a RabbitMQ exchange. When the message has been published successfully, it will be deleted from the table.
There is no reason to be fancy with the table design, we are simply going to make it so that it has exactly all the columns required for the purpose on hand. We are going to use the publish method in the Node.js library amqplib, which can be used to communicate with RabbitMQ. The publish
method in amqplib takes all of the below table columns directly as parameters, except the id
column which we will use as the primary key.
|
|
Whenever a row is inserted into queue_message
we need to perform a PostgreSQL pg_notify
. This can be done with a basic insert trigger on the queue_message
table as shown below. I wrote about broadcasting row updates previously on this website.
|
|
Listening for notifications with Node.js
This is where we break out Node.js. We are going to use the pg-listen package to listen on the queue_message_notify
channel. As can be seen in the definition above we send the ID of the inserted row out on the channel as a JSON-formatted message. As soon as the ID is received by our listener it will be used to read the entire row from the database.
When reading the row from the database, the most important thing to do is to lock it using the FOR UPDATE
clause in PostgreSQL. This ensures that only the current connection/transaction to PostgreSQL will be able to manipulate it, which is crucial for our use case. If we did not lock the row, every concurrent listener on queue_message_notify
would then go on and publish as many messages to RabbitMQ, which is definitely not what we want. We want one row to equal one message published.
PostgreSQL has an option to error when a particular row could not be locked, but we don’t want to do that either. From the point of view of one running listener, not being able to acquire a lock for a particular row probably means another listener locked the row before us. So we will use the SKIP LOCKED
clause to safely ignore unlockable rows. The query to select the row for publishing used by every listener then becomes the following:
|
|
Putting everything together then results in the following simple Node.js “table listener”, listener.js
:
|
|
Most of the interesting things happen in popOneQueueMessage()
at the bottom of the listen.js
. The operation of this method is basically the following:
- Begin a transaction
- Attempt to lock a row in
queue_message
by ID - Open a RabbitMQ confirmation channel and publish the message to it
- Wait for the message to have been confirmed
- Delete the row from
queue_message
- Commit the transaction, making the delete permanent
- If an error happens, roll the transaction back, “returning” the row to
queue_message
for another listener to handle
When the listener is started (in the run()
method) it also automatically checks for any existing queue messages and then tries to “pop” them one-by-one from the queue. The listener might not have been running when a row was inserted into queue_message
so it needs to immediately check on start if there are any outstanding messages to publish or not.
Connection setup
I like to make a services.js
file where connection setup can happen so that this tedious bit of code is kept from immediate view. Here it is, for reference:
|
|
Note that the pg library will use the libpq environment variables (such as PGHOST
and so on) by default, so we don’t have to define where to connect in case we are just using the default database of the current user.
Testing everything
Now we are going to test the listener to make sure that it works. I setup a basic testing script in Node.js that spawns a definable amount of listeners and inserts some random rows in an incrementing series into queue_message
. It lets the listeners handle the messages and then checks that every message was published successfully to RabbitMQ, only once. See test.js
below:
|
|
Running this, we will see that it will work:
$ node test.js
> connected to postgres
> connected to amqp
> spawned listener #0
> spawned listener #1
> 1 out: connected to postgres
> 0 out: connected to postgres
> 1 out: connected to amqp
> 0 out: connected to amqp
> 0 out: pgListen connected to postgres
> 1 out: pgListen connected to postgres
> 1 out: listening on queue_message_notify
> 0 out: listening on queue_message_notify
> killed listener #0
> killed listener #1
> pass: all 100 messages received
> pass: no duplicates received
Wrapping up
Here are some thought on how the above code can be extended to be better and/or more useful.
Saving published messages
Rows inserted into queue_message
are immediately discarded, but we can make some very simple modifications to store the rows permanently instead. Adding a boolean column is_published
that is set to TRUE
instead of deleting the just published row would solve this. Naming it published_at
and letting it be a timestamp
instead would work too, if we are interested in recording the time the message was published.
We would then select only rows where is_published
equals FALSE
or published_at
IS NOT NULL
when we need to pop from queue_message
. Finally we will update is_published
to TRUE
or set a time stamp instead of running a DELETE
query against the message row.
Expiring messages after a number of attempts
One problem with the above listener.js
is what happens if a message fails to be routed in RabbitMQ, or if there is some other problem within the try
block. What happens then is that the transaction rolls back and the queue_message
row gets “returned” to the table. And what then? Not much, actually. Since the return does not cause the table trigger to run, the row will remain there until the next listener.js
instance is launched (remember that the queue_message
table is checked on every startup) at which point a new attempt is made.
I briefly experimented with adding a retry
integer column to the table and extending the definition of the trigger so that it would also run whenever this column was updated. Whenever the message failed to publish the retry column would be incremented which caused the trigger to run and the message to be retried. This sort-of solved the problem, but it is dependent on the that the message can be published at all, even if eventually. If the message was meant for an exchange that does not exist or if it contains some other semantic error the only correct way is to log it and then discard it completely. How to solve this is in a good way is going to be dependent on the application.
Receiving messages
Expanding on “Saving published messages” we could then extend the code in listener.js
to receive messages from RabbitMQ as well. This could be used to create a more concrete service for a specific purpose. For instance a service that triggers the fetching of an URL whenever a row is inserted, storing the result in the same row. If we create a table that looks like the following:
|
|
We can then use this table for our queue instead of queue_message
. Using listener.js
the popQueueMessage()
method would be changed to SELECT
unfetched rows where response_at IS NULL
that are then downloaded and the result written into the response_*
columns. Downloading a URL becomes as simple as the following:
|
|
You can then query the table with the returned ID for the response. There are lots of possibilities like this.
Source code
The source code for all of the above examples is available here: https://github.com/tedeh/publishing-rabbitmq-messages-from-postgresql.
Alternative solutions
There is actually an unofficial RabbitMQ plugin available on Github for a similar use case to this: gmr/pgsql-listen-exchange. I briefly investigated this but it seems too generic for me. It listens to all Postgres NOTIFY
calls instead of just the ones that are required. You could solve this problem with additional routing of the correct notifications within RabbitMQ itself but I prefer the solution I have presented above.
If I required a stand-alone solution that was not part of an existing deployment of Node.js applications I would probably investigate the use of a plugin such as that one first.