Outbox

The outbox pattern records side-effect intent in the same database transaction as the business write. A separate worker drains those records after commit and delivers events or jobs with retries.

Use an outbox when an event or job must not be lost after the database commit: notifications, integrations, billing syncs, audit streams, search indexing, or other workflow-critical side effects.

bun add @beignet/core

Why it exists

After-commit publishing avoids one class of bug: listeners, jobs, and mail do not see data that later rolls back. It still leaves a different failure window:

  1. The database transaction commits.
  2. The process starts publishing the event or dispatching the job.
  3. The process crashes or the provider call fails.
  4. The business record is durable, but the side effect is lost.

The outbox closes that gap by writing the event or job to an outbox_messages table inside the transaction. Delivery becomes a retryable background workflow.

Outbox delivery is at least once, not exactly once. If the worker delivers a message and crashes before marking it delivered, the message may be delivered again. Use Idempotency inside listeners or job handlers when duplicate delivery would be harmful.

Core API

Use @beignet/core/outbox for typed messages, registries, memory test storage, and the drain worker:

import {
  createMemoryOutbox,
  defineOutboxRegistry,
  drainOutbox,
} from "@beignet/core/outbox";

The app-facing port is intentionally small:

import type { OutboxPort } from "@beignet/core/outbox";

export type AppPorts = {
  outbox: OutboxPort;
};

Production adapters implement:

OperationPurpose
enqueue(...)Store a pending event or job
claimBatch(...)Atomically claim available messages with a lease
markDelivered(...)Ack a claimed message with its claim token
markFailed(...)Retry or dead-letter a claimed message

Ack and fail operations require the current claimToken. This prevents an old worker from acking a message after its lease expired and another worker claimed it.

Transaction-scoped recording

Keep the existing Beignet event API in use cases. The outbox should sit behind tx.events, not introduce a parallel use-case workflow:

const publishPostUseCase = useCase
  .command("posts.publish")
  .input(PublishPostInput)
  .output(PostOutput)
  .emits([PostPublished])
  .run(async ({ ctx, input, events }) =>
    ctx.ports.uow.transaction(async (tx) => {
      const post = await tx.posts.publish(input.slug);

      await events.record(tx.events, PostPublished, {
        postId: post.id,
        slug: post.slug,
        publishedAt: post.publishedAt,
      });

      return post;
    }),
  );

Wire tx.events with createOutboxEventRecorder(...) in production:

import { createOutboxEventRecorder } from "@beignet/core/outbox";
import {
  createDrizzleTursoOutboxPort,
  createDrizzleTursoUnitOfWork,
} from "@beignet/provider-drizzle-turso";

uow: createDrizzleTursoUnitOfWork({
  db: ports.db.db,
  createTransactionPorts: (tx) => {
    const outbox = createDrizzleTursoOutboxPort(tx);

    return {
      posts: createPostRepository(tx),
      audit: createAuditLog(tx),
      events: createOutboxEventRecorder(outbox),
      outbox,
    };
  },
});

This keeps .emits(...) and events.record(...) enforcement intact while moving durability into infrastructure.

Transactional job enqueueing

Use createOutboxJobDispatcher(...) when a use case should enqueue a job inside the same transaction as the business write:

import { createOutboxJobDispatcher } from "@beignet/core/outbox";

uow: createDrizzleTursoUnitOfWork({
  db: ports.db.db,
  createTransactionPorts: (tx) => {
    const outbox = createDrizzleTursoOutboxPort(tx);

    return {
      appointments: createAppointmentRepository(tx),
      jobs: createOutboxJobDispatcher(outbox),
      outbox,
    };
  },
});

Then use the normal job dispatcher shape:

await ctx.ports.uow.transaction(async (tx) => {
  const appointment = await tx.appointments.create(input);

  await tx.jobs.dispatch(SendAppointmentReminderJob, {
    appointmentId: appointment.id,
  });

  return appointment;
});

Use this for direct transactional job enqueueing. For event-driven workflows, prefer recording an event and letting a listener enqueue the job during outbox drain.

Registry and draining

The worker needs an explicit registry of typed events and jobs it may deliver:

import { defineOutboxRegistry } from "@beignet/core/outbox";
import { postEvents } from "@/features/posts/domain/events";
import { postJobs } from "@/features/posts/jobs";

export const outboxRegistry = defineOutboxRegistry({
  events: postEvents,
  jobs: postJobs,
});

Drain messages from a cron route, worker process, queue consumer, or scheduled task:

import { drainOutbox } from "@beignet/core/outbox";
import { outboxRegistry } from "@/server/outbox";

await drainOutbox({
  outbox: ctx.ports.outbox,
  registry: outboxRegistry,
  eventBus: ctx.ports.eventBus,
  jobs: ctx.ports.jobs,
  batchSize: 100,
  leaseMs: 30_000,
});

drainOutbox(...) claims available messages, validates payloads against the registry, publishes events through eventBus, dispatches jobs through jobs, then marks each message delivered. Failed deliveries are retried with backoff until maxAttempts, then dead-lettered.

Customize retry delay when needed:

await drainOutbox({
  outbox,
  registry,
  eventBus,
  jobs,
  retryDelayMs: ({ message }) => Math.min(60_000, 1000 * message.attempts),
});

Drizzle and Turso

@beignet/provider-drizzle-turso includes a durable outbox adapter:

bun add @beignet/provider-drizzle-turso
import {
  createDrizzleTursoOutboxPort,
  createDrizzleTursoOutboxSetupStatements,
} from "@beignet/provider-drizzle-turso";

const outbox = createDrizzleTursoOutboxPort(db);

Beignet does not hide migrations. Add the setup statements to your app-owned migration/bootstrap flow:

for (const statement of createDrizzleTursoOutboxSetupStatements()) {
  await client.execute(statement);
}

The default table is outbox_messages. You can override it:

createDrizzleTursoOutboxSetupStatements({
  tableName: "app_outbox_messages",
});

createDrizzleTursoOutboxPort(tx, {
  tableName: "app_outbox_messages",
});

Testing

Use the memory adapter in use-case tests:

import {
  createMemoryOutbox,
  createOutboxEventRecorder,
} from "@beignet/core/outbox";

const outbox = createMemoryOutbox();

const uow = createNoopUnitOfWork(() => ({
  posts,
  events: createOutboxEventRecorder(outbox),
  outbox,
}));

Then assert pending messages or drain them:

expect(outbox.messages).toMatchObject([
  {
    kind: "event",
    name: "post.published",
    status: "pending",
  },
]);

Use direct in-memory event recorders and inline jobs when durability is not the behavior under test.

When not to use it

Do not force every side effect through the outbox.

Use direct after-commit event publishing or inline jobs for low-stakes local workflows, tests, and single-process development. Use the outbox when losing the side effect would create user-visible, financial, compliance, or workflow correctness issues.