In a recent project, I ran into a problem: whenever RabbitMQ briefly went offline, my newly created work items would get “stuck” in the database as PROCESSING
, but they never actually made it onto the queue. The user would see “we’re working on it,” but nothing ever happened. In this post, I’ll walk through the root cause, why publishing directly to RabbitMQ in your HTTP handler can fail, and how the Transactional Outbox pattern fixed it for good.
Here’s the workflow I originally had in my API:
const ds = await db.dataSource.create({
data: { /* … */, status: 'PROCESSING' }
});
await sendToQueue('dataSourceQueue', {
dataSourceId: ds.id,
/* … */
});
res.status(202).json({ message: 'OK' });
When RabbitMQ was healthy, this worked. But the moment the broker was down or flaky:
PROCESSING
.sendToQueue
would throw an error.Result: the user saw "PROCESSING", but the job never ran and I had no clear way to retry it.
You might try catching the error and rolling back, or resetting the record back to
PENDING
. The problem is you have two separate systems (database + message broker)
with no built-in way to undo both steps together.
Even with retries, it’s hard to coordinate exactly when your code should attempt to re‑enqueue the payload.
The Outbox Pattern ensures that your database write and message broker publish are atomic. You introduce a second table in your database, the outbox, which stores messages to be sent to the broker. And you run a background worker that reads from the outbox and sends the messages to RabbitMQ.
CREATE TABLE outbox (
id UUID NOT NULL DEFAULT gen_random_uuid(),
aggregate_type TEXT NOT NULL,
aggregate_id UUID NOT NULL,
topic TEXT NOT NULL,
payload JSONB NOT NULL,
attempts SMALLINT NOT NULL DEFAULT 0,
processed BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
processed_at TIMESTAMPTZ NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (id)
);
dataSource
row with status PROCESSING
.outbox
row recording the same payload.await db.$transaction(async tx => {
const ds = await tx.dataSource.create({
data: { /* … */, status: 'PROCESSING' }
});
await tx.outbox.create({
data: {
aggregateId: ds.id,
topic: 'dataSourceQueue',
aggregateType: 'DataSource',
payload: { dataSourceId: ds.id, /* … */ },
/* … */
}
});
});
res.status(202).json({
message: 'Your work is queued for processing.',
status: 'PROCESSING'
});
A separate worker periodically scans outbox WHERE processed=false
and attempts to publish each message:
const entries = await db.outbox.findMany({ where: { processed: false } });
for (const e of entries) {
try {
channel.sendToQueue(e.topic, Buffer.from(JSON.stringify(e.payload)), {
persistent: true,
});
await channel.waitForConfirms();
await db.outbox.update({ where: { id: e.id }, data: { processed: true } });
} catch {
// leave processed=false to retry later
}
}
processed=1
.Your existing consumer can remain unchanged:
channel.consume('dataSourceQueue', async msg => {
const { dataSourceId, … } = JSON.parse(msg.content.toString());
try {
await doWork(…);
await db.dataSource.update({ where: { id: dataSourceId }, data: { status: 'COMPLETED' } });
channel.ack(msg);
} catch (err) {
await db.dataSource.update({ where: { id: dataSourceId }, data: { status: 'ERROR' } });
channel.nack(msg, false, false);
}
});