This guide adds background job processing to the TypeScript orders-service from Tutorial: Write a service. By the end the service will enqueue fulfillment jobs when an order is created and process them with retry, progress tracking, and event emission on completion.

Before starting, complete the TypeScript path in Tutorial: Write a service.

What are jobs?

Jobs are service-private background work units. Callers never interact with your jobs directly — they are internal to your service. For caller-visible async workflows, see Operations: TypeScript.

1. Add top-level jobs to the contract

Jobs participation is declared in the contract’s top-level jobs map. Job schemas are registered by name in the contract’s schemas map and referenced by that name in queue definitions. Update contracts/orders_service.ts:

import {
  ReserveInventoryPayloadSchema,
  ReserveInventoryResultSchema,
  ChargePaymentPayloadSchema,
  ChargePaymentResultSchema,
  NotifyWarehousePayloadSchema,
  NotifyWarehouseResultSchema,
} from "./schemas.ts";

// In the first defineServiceContract argument, add to the schemas map:
{
  schemas: {
    // ... existing order schemas ...
    ReserveInventoryPayload: ReserveInventoryPayloadSchema,
    ReserveInventoryResult: ReserveInventoryResultSchema,
    ChargePaymentPayload: ChargePaymentPayloadSchema,
    ChargePaymentResult: ChargePaymentResultSchema,
    NotifyWarehousePayload: NotifyWarehousePayloadSchema,
    NotifyWarehouseResult: NotifyWarehouseResultSchema,
  },
}

// In the returned contract body:
resources: {
  kv: {
    orders: {
      purpose: "Store order records",
      schema: ref.schema("Order"),
      history: 1,
      ttlMs: 0,
    },
  },
},
jobs: {
  reserveInventory: {
    payload: { schema: "ReserveInventoryPayload" },
    result: { schema: "ReserveInventoryResult" },
    maxDeliver: 5,
    backoffMs: [5000, 30000, 120000],
  },
  chargePayment: {
    payload: { schema: "ChargePaymentPayload" },
    result: { schema: "ChargePaymentResult" },
    maxDeliver: 5,
    ackWaitMs: 600000,
  },
  notifyWarehouse: {
    payload: { schema: "NotifyWarehousePayload" },
    result: { schema: "NotifyWarehouseResult" },
    maxDeliver: 3,
  },
},

2. Declare job schemas

Add to schemas.ts:

export const ReserveInventoryPayloadSchema = Type.Object({
  orderId: Type.String(),
  items: Type.Array(Type.Object({ productId: Type.String(), quantity: Type.Number() })),
});
export type ReserveInventoryPayload = Static<typeof ReserveInventoryPayloadSchema>;

export const ReserveInventoryResultSchema = Type.Object({ reserved: Type.Boolean() });
export type ReserveInventoryResult = Static<typeof ReserveInventoryResultSchema>;

export const ChargePaymentPayloadSchema = Type.Object({
  orderId: Type.String(),
  customerId: Type.String(),
  amountCents: Type.Number(),
});
export type ChargePaymentPayload = Static<typeof ChargePaymentPayloadSchema>;

export const ChargePaymentResultSchema = Type.Object({ chargeId: Type.String() });
export type ChargePaymentResult = Static<typeof ChargePaymentResultSchema>;

export const NotifyWarehousePayloadSchema = Type.Object({
  orderId: Type.String(),
  items: Type.Array(Type.Object({ productId: Type.String(), quantity: Type.Number() })),
});
export type NotifyWarehousePayload = Static<typeof NotifyWarehousePayloadSchema>;

export const NotifyWarehouseResultSchema = Type.Object({ notified: Type.Boolean() });
export type NotifyWarehouseResult = Static<typeof NotifyWarehouseResultSchema>;

3. Create jobs from the Orders.Create RPC handler

Update the Orders.Create handler in main.ts to enqueue a fulfillment job after saving the order:

await app.handle.rpc.orders.create(async ({ input, client, deps }) => {
  const orderId = deps.id();
  const order = {
    orderId,
    customerId: input.customerId,
    status: "pending",
    items: input.items,
    createdAt: deps.clock().toISOString(),
  };
  await client.kv.orders.put(orderId, order).orThrow();

  const job = await client.jobs.reserveInventory.create({
    orderId,
    items: input.items,
  });
  if (job.isErr()) throw job.error;

  return Result.ok({ orderId, status: "pending" });
});

4. Register job workers

Register workers for each job type before calling service.wait(). Handler registration is synchronous; the connected service owns worker startup and shutdown as part of its lifecycle:

Use the service.jobs handle returned by TrellisService.connect(...), or the bound app.jobs wrapper when job handlers need application dependencies. Jobs bindings are Trellis runtime internals, not values service code should fetch with Trellis.Bindings.Get or pass into Trellis constructors.

app.jobs.reserveInventory.handle(async ({ job, client, deps }) => {
  console.info(
    `reserveInventory job ${job.ref.id} request=${job.context.requestId} trace=${job.context.traceId}`,
  );
  await job.progress({ step: "reserving", message: "Checking inventory" });

  if (job.cancelled) {
    return Result.err({ type: "JobCancelledError", message: "Cancelled" });
  }

  // reserve inventory...
  const reserved = true;

  if (!reserved) {
    return Result.err({ type: "InventoryError", message: "Items not available" });
  }

  deps.log.info("inventory reserved", { orderId: job.payload.orderId });
  const chargeJob = await client.jobs.chargePayment.create({
    orderId: job.payload.orderId,
    customerId: "cust_placeholder",
    amountCents: 1000,
  });
  if (chargeJob.isErr()) throw chargeJob.error;

  return Result.ok({ reserved: true });
});

app.jobs.chargePayment.handle(async ({ job, client, deps }) => {
  await job.progress({ step: "charging", message: "Processing payment" });

  if (job.cancelled) {
    return Result.err({ type: "JobCancelledError", message: "Cancelled" });
  }

  // charge payment...
  const chargeId = `ch_${deps.id()}`;

  const notifyJob = await client.jobs.notifyWarehouse.create({
    orderId: job.payload.orderId,
    items: [],
  });
  if (notifyJob.isErr()) throw notifyJob.error;

  return Result.ok({ chargeId });
});

app.jobs.notifyWarehouse.handle(async ({ job, client, deps }) => {
  await job.progress({ step: "notifying", message: "Notifying warehouse" });

  if (job.cancelled) {
    return Result.err({ type: "JobCancelledError", message: "Cancelled" });
  }

  // notify warehouse...

  await client.event.orders.shipped.publish({
    orderId: job.payload.orderId,
    customerId: "cust_placeholder",
    shippedAt: deps.clock().toISOString(),
  });

  return Result.ok({ notified: true });
});

Every active job exposes immutable job.context with requestId, traceId, traceparent, and optional tracestate. The runtime copies this context to all job lifecycle events and NATS headers, so include requestId or traceId in service logs when you need to follow work across RPCs, events, operations, and jobs.

5. Run the service

Call service.wait() after all handle() registrations. The service starts registered workers, keeps them running, and stops them when service.stop() or process shutdown runs:

await service.wait();

6. Retry and failure

If a job handler returns Result.err(...), Trellis NAKs the JetStream message and the job retries according to the backoffMs schedule declared in the contract. After maxDeliver attempts, the job moves to the dead queue for operator review.

To prevent retry for a specific job type, set maxDeliver: 1 in the contract queue declaration.

7. Long-running jobs

If a job may run longer than the configured ackWaitMs (default 5 minutes), call job.heartbeat() periodically to extend the deadline:

app.jobs.chargePayment.handle(async ({ job }) => {
  while (processing) {
    await job.heartbeat();
    // continue work
  }
  return Result.ok({ chargeId });
});

8. Keyed concurrency and queue depth

Use keyed concurrency when one logical unit of work must have at most one active job across all service instances. Existing queues without keyConcurrency keep the normal work-queue behavior. The existing numeric concurrency option still controls how many worker tasks run for the queue; keyConcurrency controls the active limit for a derived logical key.

jobs: {
  syncTickets: {
    payload: { schema: "SyncTicketsPayload" },
    result: { schema: "SyncTicketsResult" },
    concurrency: 4,
    keyConcurrency: {
      key: ["zendesk", "/origin", "tickets"],
      maxActive: 1,
      stalePolicy: "fail-stale",
    },
    queue: {
      maxQueuedPerKey: 1,
      whenFull: "reject",
    },
  },
}

The default keyed policy is conservative: maxActive: 1, stalePolicy: "fail-stale", maxQueuedPerKey: 0, and whenFull: "reject". That means Trellis fails a duplicate create by default instead of silently coalescing or replacing work. Opt into whenFull: "coalesce" for scheduler ticks that should compress into an existing queued/running job, or whenFull: "replace-oldest" when newer queued payloads supersede older queued payloads.

Use create(payload) when you need a new job or a typed failure. Use submit(payload) when the caller needs to inspect whether the runtime accepted, rejected, coalesced, or replaced work:

const submitted = await service.jobs.syncTickets.submit({ origin: "foo" });
if (submitted.isErr()) return Result.err(submitted.error);

switch (submitted.value.kind) {
  case "accepted":
    console.info("sync job accepted", submitted.value.ref.id);
    break;
  case "rejected":
    console.info("sync already active or queued", submitted.value.key);
    break;
  case "coalesced":
    console.info("sync coalesced", submitted.value.existing);
    break;
  case "replaced":
    console.info("sync job replaced older queued work", submitted.value.ref.id);
    break;
}

Even with keyed concurrency, keep service-owned checkpoint commits idempotent. Trellis prevents duplicate active work; your service database should still guard durable state and external side effects.

Development loop

  1. Update job schemas and the contract’s top-level jobs map
  2. Rebuild contract artifacts: deno task prepare
  3. Review and accept or reject the pending service authority change in Console if queues changed; use trellis svc <service-id> authority plan list and authority plan show <PLAN_ID> only for local or automation fallback
  4. Restart the service

Next steps