Beignet API reference
    Preparing search index...

    Module @beignet/provider-event-bus-memory

    @beignet/provider-event-bus-memory

    In-memory EventBusPort adapter for Beignet applications.

    Use it for tests, local development, and single-process apps. Distributed systems should adapt a queue, stream, outbox, or message broker behind the same EventBusPort interface.

    bun add @beignet/provider-event-bus-memory
    
    import { defineEvent } from "@beignet/core/events";
    import { createInMemoryEventBus } from "@beignet/provider-event-bus-memory";
    import { z } from "zod";

    // Define your domain events
    const UserRegistered = defineEvent("user.registered", {
    payload: z.object({
    userId: z.string(),
    email: z.string().email(),
    }),
    });

    // Create the event bus
    const eventBus = createInMemoryEventBus();

    // Subscribe to events
    const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
    console.log(`User registered: ${payload.email}`);
    // Send welcome email, update analytics, etc.
    });

    // Publish events
    await eventBus.publish(UserRegistered, {
    userId: "123",
    email: "user@example.com",
    });

    // Unsubscribe when done
    unsubscribe();
    import { createNextServer } from "@beignet/next";
    import { createInMemoryEventBusProvider } from "@beignet/provider-event-bus-memory";
    import { appPorts } from "@/infra/app-ports";
    import { routes } from "@/server/routes";

    export const server = await createNextServer({
    ports: appPorts,
    providers: [createInMemoryEventBusProvider()],
    createContext: ({ ports }) => ({
    ports,
    }),
    routes,
    });

    Use createInMemoryEventBus() directly when you want to manually assign an event bus under ports.

    Pass a provider instrumentation target when creating the direct event bus to record published events under the eventBus watcher:

    const eventBus = createInMemoryEventBus({
    instrumentation: ports,
    });
    import { defineEvent } from "@beignet/core/events";
    import { z } from "zod";

    const OrderPlaced = defineEvent("order.placed", {
    payload: z.object({
    orderId: z.string(),
    total: z.number(),
    }),
    });

    // Subscribe to events in your application setup
    ctx.ports.eventBus.subscribe(OrderPlaced, async (payload) => {
    // Send order confirmation email
    await ctx.ports.mailer.send({
    to: customer.email,
    subject: "Order Confirmation",
    text: `Your order ${payload.orderId} has been placed!`,
    });
    });

    const placeOrder = useCase
    .command("orders.place")
    .input(PlaceOrderInput)
    .output(OrderOutput)
    .emits([OrderPlaced])
    .run(async ({ ctx, input, events }) => {
    return ctx.ports.uow.transaction(async (tx) => {
    const order = await tx.orders.create(input);

    await events.record(tx.events, OrderPlaced, {
    orderId: order.id,
    total: order.total,
    });

    return order;
    });
    });

    Publish a domain event with a typed payload.

    await eventBus.publish(UserRegistered, {
    userId: "123",
    email: "user@example.com",
    });

    By default, the in-memory bus awaits handlers so local development and tests are deterministic. Handler errors are rethrown unless onHandlerError is provided. Use delivery: "fire-and-forget" when you intentionally want detached in-process delivery.

    Subscribe to a domain event. Returns an unsubscribe function.

    const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
    console.log(`New user: ${payload.email}`);
    });

    // Later, when you want to stop listening:
    unsubscribe();

    The event bus provides full type safety:

    import type { EventBusPort } from "@beignet/core/ports";
    import { definePorts } from "@beignet/core/ports";

    // Type-safe ports definition
    const appPorts = definePorts({
    eventBus: createInMemoryEventBus() as EventBusPort,
    // ... other ports
    });

    type AppPorts = typeof appPorts;

    The in-memory event bus is perfect for testing:

    import { describe, expect, it, mock } from "bun:test";

    describe("User Registration", () => {
    it("should publish UserRegistered event", async () => {
    const eventBus = createInMemoryEventBus();
    const handler = mock(() => {});

    eventBus.subscribe(UserRegistered, handler);

    // Perform registration
    await registerUser(ctx, { email: "test@example.com" });

    expect(handler).toHaveBeenCalledWith({
    userId: expect.any(String),
    email: "test@example.com",
    });
    });
    });
    • Awaited by default: publish(...) waits for subscribed handlers unless delivery: "fire-and-forget" is configured
    • In-process: Events are only delivered within the same process
    • Memory-only: No persistence - events are lost if the process crashes
    • Order: Handlers are called in the order they were subscribed
    • Multiple handlers: Multiple handlers can subscribe to the same event
    • Error handling: Handler errors reject publish(...) unless onHandlerError is configured

    Good for:

    • Single-process applications
    • Development and testing
    • Simple event-driven workflows
    • Decoupling application components

    Not suitable for:

    • Distributed systems
    • Event persistence requirements
    • Guaranteed delivery needs
    • Cross-service communication

    For production distributed systems, implement EventBusPort with a proper message broker.

    MIT

    Interfaces

    InMemoryEventBusOptions
    InMemoryEventBusProviderOptions
    InMemoryEventBusProviderPorts

    Type Aliases

    InMemoryEventBusDelivery

    Functions

    createInMemoryEventBus
    createInMemoryEventBusProvider