Outbox
The outbox pattern records side-effect intent in the same database transaction as the business write. A separate worker drains those records after commit and delivers events or jobs with retries.
Use an outbox when an event or job must not be lost after the database commit: notifications, integrations, billing syncs, audit streams, search indexing, or other workflow-critical side effects.
bun add @beignet/core
Why it exists
After-commit publishing avoids one class of bug: listeners, jobs, and mail do not see data that later rolls back. It still leaves a different failure window:
- The database transaction commits.
- The process starts publishing the event or dispatching the job.
- The process crashes or the provider call fails.
- The business record is durable, but the side effect is lost.
The outbox closes that gap by writing the event or job to an outbox_messages
table inside the transaction. Delivery becomes a retryable background workflow.
Outbox delivery is at least once, not exactly once. If the worker delivers a message and crashes before marking it delivered, the message may be delivered again. Use Idempotency inside listeners or job handlers when duplicate delivery would be harmful.
Core API
Use @beignet/core/outbox for typed messages, registries, memory test storage,
and the drain worker:
import {
createMemoryOutbox,
defineOutboxRegistry,
drainOutbox,
} from "@beignet/core/outbox";
The app-facing port is intentionally small:
import type { OutboxPort } from "@beignet/core/outbox";
export type AppPorts = {
outbox: OutboxPort;
};
Production adapters implement:
| Operation | Purpose |
|---|---|
enqueue(...) | Store a pending event or job |
claimBatch(...) | Atomically claim available messages with a lease |
markDelivered(...) | Ack a claimed message with its claim token |
markFailed(...) | Retry or dead-letter a claimed message |
Ack and fail operations require the current claimToken. This prevents an old
worker from acking a message after its lease expired and another worker claimed
it.
Transaction-scoped recording
Keep the existing Beignet event API in use cases. The outbox should sit behind
tx.events, not introduce a parallel use-case workflow:
const publishPostUseCase = useCase
.command("posts.publish")
.input(PublishPostInput)
.output(PostOutput)
.emits([PostPublished])
.run(async ({ ctx, input, events }) =>
ctx.ports.uow.transaction(async (tx) => {
const post = await tx.posts.publish(input.slug);
await events.record(tx.events, PostPublished, {
postId: post.id,
slug: post.slug,
publishedAt: post.publishedAt,
});
return post;
}),
);
Wire tx.events with createOutboxEventRecorder(...) in production:
import { createOutboxEventRecorder } from "@beignet/core/outbox";
import {
createDrizzleTursoOutboxPort,
createDrizzleTursoUnitOfWork,
} from "@beignet/provider-drizzle-turso";
uow: createDrizzleTursoUnitOfWork({
db: ports.db.db,
createTransactionPorts: (tx) => {
const outbox = createDrizzleTursoOutboxPort(tx);
return {
posts: createPostRepository(tx),
audit: createAuditLog(tx),
events: createOutboxEventRecorder(outbox),
outbox,
};
},
});
This keeps .emits(...) and events.record(...) enforcement intact while
moving durability into infrastructure.
Transactional job enqueueing
Use createOutboxJobDispatcher(...) when a use case should enqueue a job
inside the same transaction as the business write:
import { createOutboxJobDispatcher } from "@beignet/core/outbox";
uow: createDrizzleTursoUnitOfWork({
db: ports.db.db,
createTransactionPorts: (tx) => {
const outbox = createDrizzleTursoOutboxPort(tx);
return {
appointments: createAppointmentRepository(tx),
jobs: createOutboxJobDispatcher(outbox),
outbox,
};
},
});
Then use the normal job dispatcher shape:
await ctx.ports.uow.transaction(async (tx) => {
const appointment = await tx.appointments.create(input);
await tx.jobs.dispatch(SendAppointmentReminderJob, {
appointmentId: appointment.id,
});
return appointment;
});
Use this for direct transactional job enqueueing. For event-driven workflows, prefer recording an event and letting a listener enqueue the job during outbox drain.
Registry and draining
The worker needs an explicit registry of typed events and jobs it may deliver:
import { defineOutboxRegistry } from "@beignet/core/outbox";
import { postEvents } from "@/features/posts/domain/events";
import { postJobs } from "@/features/posts/jobs";
export const outboxRegistry = defineOutboxRegistry({
events: postEvents,
jobs: postJobs,
});
Drain messages from a cron route, worker process, queue consumer, or scheduled task:
import { drainOutbox } from "@beignet/core/outbox";
import { outboxRegistry } from "@/server/outbox";
await drainOutbox({
outbox: ctx.ports.outbox,
registry: outboxRegistry,
eventBus: ctx.ports.eventBus,
jobs: ctx.ports.jobs,
batchSize: 100,
leaseMs: 30_000,
});
drainOutbox(...) claims available messages, validates payloads against the
registry, publishes events through eventBus, dispatches jobs through jobs,
then marks each message delivered. Failed deliveries are retried with backoff
until maxAttempts, then dead-lettered.
Customize retry delay when needed:
await drainOutbox({
outbox,
registry,
eventBus,
jobs,
retryDelayMs: ({ message }) => Math.min(60_000, 1000 * message.attempts),
});
Drizzle and Turso
@beignet/provider-drizzle-turso includes a durable outbox adapter:
bun add @beignet/provider-drizzle-turso
import {
createDrizzleTursoOutboxPort,
createDrizzleTursoOutboxSetupStatements,
} from "@beignet/provider-drizzle-turso";
const outbox = createDrizzleTursoOutboxPort(db);
Beignet does not hide migrations. Add the setup statements to your app-owned migration/bootstrap flow:
for (const statement of createDrizzleTursoOutboxSetupStatements()) {
await client.execute(statement);
}
The default table is outbox_messages. You can override it:
createDrizzleTursoOutboxSetupStatements({
tableName: "app_outbox_messages",
});
createDrizzleTursoOutboxPort(tx, {
tableName: "app_outbox_messages",
});
Testing
Use the memory adapter in use-case tests:
import {
createMemoryOutbox,
createOutboxEventRecorder,
} from "@beignet/core/outbox";
const outbox = createMemoryOutbox();
const uow = createNoopUnitOfWork(() => ({
posts,
events: createOutboxEventRecorder(outbox),
outbox,
}));
Then assert pending messages or drain them:
expect(outbox.messages).toMatchObject([
{
kind: "event",
name: "post.published",
status: "pending",
},
]);
Use direct in-memory event recorders and inline jobs when durability is not the behavior under test.
When not to use it
Do not force every side effect through the outbox.
Use direct after-commit event publishing or inline jobs for low-stakes local workflows, tests, and single-process development. Use the outbox when losing the side effect would create user-visible, financial, compliance, or workflow correctness issues.