Outbox
The outbox pattern records side-effect intent in the same database transaction as the business write. A separate worker drains those records after commit and delivers events or jobs with retries.
Use an outbox when an event or job must not be lost after the database commit: notifications, integrations, billing syncs, audit streams, search indexing, or other workflow-critical side effects.
bun add @beignet/coreWhy it exists
After-commit publishing avoids one class of bug: listeners, jobs, and mail do not see data that later rolls back. It still leaves a different failure window:
- The database transaction commits.
- The process starts publishing the event or dispatching the job.
- The process crashes or the provider call fails.
- The business record is durable, but the side effect is lost.
The outbox closes that gap by writing the event or job to an outbox_messages
table inside the transaction. Delivery becomes a retryable background workflow.
Outbox delivery is at least once, not exactly once. If the worker delivers a message and crashes before marking it delivered, the message may be delivered again. Use Idempotency inside listeners or job handlers when duplicate delivery would be harmful.
Use Workflow primitives to decide whether the side effect should be modeled as an event, job, notification, idempotent command, schedule, or outbox record, and see side effects after commit for the rule the outbox makes durable. Payment workflows often pair verified webhooks with outbox-backed entitlement, notification, or integration side effects; see Payments and billing.
Core API
Use @beignet/core/outbox for typed messages, registries, memory test storage,
and the drain worker:
import {
createMemoryOutbox,
defineOutboxRegistry,
drainOutbox,
} from "@beignet/core/outbox";The app-facing port is intentionally small:
import type { OutboxPort } from "@beignet/core/outbox";
export type AppPorts = {
outbox: OutboxPort;
};Production adapters implement:
| Operation | Purpose |
|---|---|
enqueue(...) | Store a pending event or job |
claimBatch(...) | Atomically claim available messages with a lease |
markDelivered(...) | Ack a claimed message with its claim token |
markFailed(...) | Retry or dead-letter a claimed message |
Ack and fail operations require the current claimToken. This prevents an old
worker from acking a message after its lease expired and another worker claimed
it.
Transaction-scoped recording
Keep the existing Beignet event API in use cases. The outbox should sit behind
tx.events, not introduce a parallel use-case workflow:
const publishPostUseCase = useCase
.command("posts.publish")
.input(PublishPostInput)
.output(PostOutput)
.emits([PostPublished])
.run(async ({ ctx, input, events }) =>
ctx.ports.uow.transaction(async (tx) => {
const post = await tx.posts.publish(input.slug);
await events.record(tx.events, PostPublished, {
postId: post.id,
slug: post.slug,
publishedAt: post.publishedAt,
});
return post;
}),
);Wire tx.events with createOutboxEventRecorder(...) in production:
import { createOutboxEventRecorder } from "@beignet/core/outbox";
import {
createDrizzleSqliteOutboxPort,
createDrizzleSqliteUnitOfWork,
} from "@beignet/provider-db-drizzle/sqlite";
uow: createDrizzleSqliteUnitOfWork({
db: ports.db.db,
createTransactionPorts: (tx) => {
const outbox = createDrizzleSqliteOutboxPort(tx);
return {
posts: createPostRepository(tx),
audit: createAuditLog(tx),
events: createOutboxEventRecorder(outbox),
outbox,
};
},
});This keeps .emits(...) and events.record(...) enforcement intact while
moving durability into infrastructure.
Transactional job enqueueing
Use createOutboxJobDispatcher(...) when a use case should enqueue a job
inside the same transaction as the business write. Wire it as the
transaction-scoped jobs port the same way createOutboxEventRecorder(...)
backs tx.events above, then use the normal job dispatcher shape:
await ctx.ports.uow.transaction(async (tx) => {
const appointment = await tx.appointments.create(input);
await tx.jobs.dispatch(SendAppointmentReminderJob, {
appointmentId: appointment.id,
});
return appointment;
});Use this for direct transactional job enqueueing. For event-driven workflows, prefer recording an event and letting a listener enqueue the job during outbox drain.
Registry and draining
The worker needs an explicit registry of typed events and jobs it may deliver.
New apps do not ship server/outbox.ts; create it by hand when a feature first
needs durable delivery:
import { defineOutboxRegistry } from "@beignet/core/outbox";
import { createServiceActor } from "@beignet/core/ports";
import type { AppContext } from "@/app-context";
import { postEvents } from "@/features/posts/domain/events";
import { postJobs } from "@/features/posts/jobs";
import { server } from "@/server";
export const outboxRegistry = defineOutboxRegistry({
events: postEvents,
jobs: postJobs,
});
export async function createOutboxDrainContext(): Promise<AppContext> {
return server.createServiceContext({
actor: createServiceActor("beignet-outbox"),
});
}
export async function stopOutboxDrainContext(): Promise<void> {
await server.stop();
}server.createServiceContext(...) builds a
service context for the drain worker.
Once server/outbox.ts exists, beignet make event and beignet make job
append new feature registries to it, and beignet doctor warns about feature
events and jobs the registry cannot deliver; beignet doctor --fix registers
them.
Drain messages from a cron route, worker process, queue consumer, or scheduled
task. In Next.js apps, prefer createOutboxDrainRoute(...) so the outbox runs
as a bounded serverless invocation instead of a provider startup loop:
// app/api/cron/outbox/drain/route.ts
import { createOutboxDrainRoute } from "@beignet/next";
import { env } from "@/lib/env";
import { server } from "@/server";
import { outboxRegistry } from "@/server/outbox";
export const runtime = "nodejs";
export const { GET, POST } = createOutboxDrainRoute({
server,
registry: outboxRegistry,
secret: env.CRON_SECRET,
batchSize: 100,
});For non-Next runtimes, call drainOutbox(...) from the host's bounded
background entrypoint. Do not start setInterval polling from provider
lifecycle hooks in serverless apps.
For a local, CI, or worker-hosted drain, use the same server/outbox.ts module
with the CLI:
beignet outbox drain --batch-size 100The CLI loads outboxRegistry, creates the app context through
createOutboxDrainContext(...), drains one batch, records instrumentation,
then calls stopOutboxDrainContext(...) when present.
See Deployment for the difference between cron routes, worker-hosted drains, and command-based drains.
drainOutbox(...) claims available messages, validates payloads against the
registry, publishes events through eventBus, dispatches jobs through jobs,
then marks each message delivered. Failed deliveries are retried with backoff
until maxAttempts, then dead-lettered.
When you pass ctx.ports.devtools or another instrumentation port to
drainOutbox(...), Beignet records first-class outbox events for delivered,
retried, and dead-lettered messages, including attempt counts, retry timing,
and a redacted error summary. createOutboxDrainRoute(...) passes the drain
request's requestId and traceId into those rows so the devtools request
view can expand into the messages delivered by that cron invocation.
Outbox delivery uses the same retry vocabulary as
jobs. A retried message is marked pending again with a future availableAt
computed from the backoff, maxAttempts caps total delivery attempts, and a
dead-lettered message is in the terminal outbox state and is no longer retried
automatically.
For job messages, enqueueJob(...) and createOutboxJobDispatcher(...) use
the job definition's retry policy by default. Customize
the retry delay for outbox delivery when the worker needs an override:
await drainOutbox({
outbox,
registry,
eventBus,
jobs,
retryDelayMs: ({ message }) => Math.min(60_000, 1000 * message.attempts),
});Drizzle SQLite
@beignet/provider-db-drizzle includes a durable outbox adapter on its
/sqlite subpath:
bun add @beignet/provider-db-drizzleimport {
createDrizzleSqliteOutboxPort,
createDrizzleSqliteOutboxSetupStatements,
} from "@beignet/provider-db-drizzle/sqlite";
const outbox = createDrizzleSqliteOutboxPort(db);Beignet does not hide migrations. Add the setup statements to your app-owned migration/bootstrap flow:
for (const statement of createDrizzleSqliteOutboxSetupStatements()) {
await client.execute(statement);
}The default table is outbox_messages; pass
{ tableName: "app_outbox_messages" } to both the setup statements and the
port to override it.
Testing
Use the memory adapter in use-case tests:
import {
createMemoryOutbox,
createOutboxEventRecorder,
} from "@beignet/core/outbox";
const outbox = createMemoryOutbox();
const uow = createNoopUnitOfWork(() => ({
posts,
events: createOutboxEventRecorder(outbox),
outbox,
}));Then assert pending messages or drain them:
expect(outbox.messages).toMatchObject([
{
kind: "event",
name: "post.published",
status: "pending",
},
]);Use direct in-memory event recorders and inline jobs when durability is not the behavior under test.
When not to use it
Do not force every side effect through the outbox.
Use direct after-commit event publishing or inline jobs for low-stakes local workflows, tests, and single-process development. Use the outbox when losing the side effect would create user-visible, financial, compliance, or workflow correctness issues.