This guide adds background job processing to the TypeScript orders-service from Tutorial: Write a service. By the end the service will enqueue fulfillment jobs when an order is created and process them with retry, progress tracking, and event emission on completion.
Before starting, complete the TypeScript path in Tutorial: Write a service.
What are jobs?
Jobs are service-private background work units. Callers never interact with your jobs directly — they are internal to your service. For caller-visible async workflows, see Operations: TypeScript.
1. Add top-level jobs to the contract
Jobs participation is declared in the contract’s top-level jobs map. Job schemas are registered by name in the contract’s schemas map and referenced by that name in queue definitions. Update contracts/orders_service.ts:
import {
ReserveInventoryPayloadSchema,
ReserveInventoryResultSchema,
ChargePaymentPayloadSchema,
ChargePaymentResultSchema,
NotifyWarehousePayloadSchema,
NotifyWarehouseResultSchema,
} from "./schemas.ts";
// In the first defineServiceContract argument, add to the schemas map:
{
schemas: {
// ... existing order schemas ...
ReserveInventoryPayload: ReserveInventoryPayloadSchema,
ReserveInventoryResult: ReserveInventoryResultSchema,
ChargePaymentPayload: ChargePaymentPayloadSchema,
ChargePaymentResult: ChargePaymentResultSchema,
NotifyWarehousePayload: NotifyWarehousePayloadSchema,
NotifyWarehouseResult: NotifyWarehouseResultSchema,
},
}
// In the returned contract body:
resources: {
kv: {
orders: {
purpose: "Store order records",
schema: ref.schema("Order"),
history: 1,
ttlMs: 0,
},
},
},
jobs: {
reserveInventory: {
payload: { schema: "ReserveInventoryPayload" },
result: { schema: "ReserveInventoryResult" },
maxDeliver: 5,
backoffMs: [5000, 30000, 120000],
},
chargePayment: {
payload: { schema: "ChargePaymentPayload" },
result: { schema: "ChargePaymentResult" },
maxDeliver: 5,
ackWaitMs: 600000,
},
notifyWarehouse: {
payload: { schema: "NotifyWarehousePayload" },
result: { schema: "NotifyWarehouseResult" },
maxDeliver: 3,
},
}, 2. Declare job schemas
Add to schemas.ts:
export const ReserveInventoryPayloadSchema = Type.Object({
orderId: Type.String(),
items: Type.Array(Type.Object({ productId: Type.String(), quantity: Type.Number() })),
});
export type ReserveInventoryPayload = Static<typeof ReserveInventoryPayloadSchema>;
export const ReserveInventoryResultSchema = Type.Object({ reserved: Type.Boolean() });
export type ReserveInventoryResult = Static<typeof ReserveInventoryResultSchema>;
export const ChargePaymentPayloadSchema = Type.Object({
orderId: Type.String(),
customerId: Type.String(),
amountCents: Type.Number(),
});
export type ChargePaymentPayload = Static<typeof ChargePaymentPayloadSchema>;
export const ChargePaymentResultSchema = Type.Object({ chargeId: Type.String() });
export type ChargePaymentResult = Static<typeof ChargePaymentResultSchema>;
export const NotifyWarehousePayloadSchema = Type.Object({
orderId: Type.String(),
items: Type.Array(Type.Object({ productId: Type.String(), quantity: Type.Number() })),
});
export type NotifyWarehousePayload = Static<typeof NotifyWarehousePayloadSchema>;
export const NotifyWarehouseResultSchema = Type.Object({ notified: Type.Boolean() });
export type NotifyWarehouseResult = Static<typeof NotifyWarehouseResultSchema>; 3. Create jobs from the Orders.Create RPC handler
Update the Orders.Create handler in main.ts to enqueue a fulfillment job after saving the order:
await app.handle.rpc.orders.create(async ({ input, client, deps }) => {
const orderId = deps.id();
const order = {
orderId,
customerId: input.customerId,
status: "pending",
items: input.items,
createdAt: deps.clock().toISOString(),
};
await client.kv.orders.put(orderId, order).orThrow();
const job = await client.jobs.reserveInventory.create({
orderId,
items: input.items,
});
if (job.isErr()) throw job.error;
return Result.ok({ orderId, status: "pending" });
}); 4. Register job workers
Register workers for each job type before calling service.wait(). Handler registration is synchronous; the connected service owns worker startup and shutdown as part of its lifecycle:
Use the service.jobs handle returned by TrellisService.connect(...), or the
bound app.jobs wrapper when job handlers need application dependencies. Jobs
bindings are Trellis runtime internals, not values service code should fetch with Trellis.Bindings.Get or pass into Trellis constructors.
app.jobs.reserveInventory.handle(async ({ job, client, deps }) => {
console.info(
`reserveInventory job ${job.ref.id} request=${job.context.requestId} trace=${job.context.traceId}`,
);
await job.progress({ step: "reserving", message: "Checking inventory" });
if (job.cancelled) {
return Result.err({ type: "JobCancelledError", message: "Cancelled" });
}
// reserve inventory...
const reserved = true;
if (!reserved) {
return Result.err({ type: "InventoryError", message: "Items not available" });
}
deps.log.info("inventory reserved", { orderId: job.payload.orderId });
const chargeJob = await client.jobs.chargePayment.create({
orderId: job.payload.orderId,
customerId: "cust_placeholder",
amountCents: 1000,
});
if (chargeJob.isErr()) throw chargeJob.error;
return Result.ok({ reserved: true });
});
app.jobs.chargePayment.handle(async ({ job, client, deps }) => {
await job.progress({ step: "charging", message: "Processing payment" });
if (job.cancelled) {
return Result.err({ type: "JobCancelledError", message: "Cancelled" });
}
// charge payment...
const chargeId = `ch_${deps.id()}`;
const notifyJob = await client.jobs.notifyWarehouse.create({
orderId: job.payload.orderId,
items: [],
});
if (notifyJob.isErr()) throw notifyJob.error;
return Result.ok({ chargeId });
});
app.jobs.notifyWarehouse.handle(async ({ job, client, deps }) => {
await job.progress({ step: "notifying", message: "Notifying warehouse" });
if (job.cancelled) {
return Result.err({ type: "JobCancelledError", message: "Cancelled" });
}
// notify warehouse...
await client.event.orders.shipped.publish({
orderId: job.payload.orderId,
customerId: "cust_placeholder",
shippedAt: deps.clock().toISOString(),
});
return Result.ok({ notified: true });
}); Every active job exposes immutable job.context with requestId, traceId, traceparent, and optional tracestate. The runtime copies this context to all
job lifecycle events and NATS headers, so include requestId or traceId in
service logs when you need to follow work across RPCs, events, operations, and
jobs.
5. Run the service
Call service.wait() after all handle() registrations. The service starts registered workers, keeps them running, and stops them when service.stop() or process shutdown runs:
await service.wait(); 6. Retry and failure
If a job handler returns Result.err(...), Trellis NAKs the JetStream message and the job retries according to the backoffMs schedule declared in the contract. After maxDeliver attempts, the job moves to the dead queue for operator review.
To prevent retry for a specific job type, set maxDeliver: 1 in the contract queue declaration.
7. Long-running jobs
If a job may run longer than the configured ackWaitMs (default 5 minutes), call job.heartbeat() periodically to extend the deadline:
app.jobs.chargePayment.handle(async ({ job }) => {
while (processing) {
await job.heartbeat();
// continue work
}
return Result.ok({ chargeId });
}); 8. Keyed concurrency and queue depth
Use keyed concurrency when one logical unit of work must have at most one active
job across all service instances. Existing queues without keyConcurrency keep
the normal work-queue behavior. The existing numeric concurrency option still
controls how many worker tasks run for the queue; keyConcurrency controls the
active limit for a derived logical key.
jobs: {
syncTickets: {
payload: { schema: "SyncTicketsPayload" },
result: { schema: "SyncTicketsResult" },
concurrency: 4,
keyConcurrency: {
key: ["zendesk", "/origin", "tickets"],
maxActive: 1,
stalePolicy: "fail-stale",
},
queue: {
maxQueuedPerKey: 1,
whenFull: "reject",
},
},
} The default keyed policy is conservative: maxActive: 1, stalePolicy: "fail-stale", maxQueuedPerKey: 0, and whenFull: "reject".
That means Trellis fails a duplicate create by default instead of silently
coalescing or replacing work. Opt into whenFull: "coalesce" for scheduler ticks
that should compress into an existing queued/running job, or whenFull: "replace-oldest" when newer queued payloads supersede older queued
payloads.
Use create(payload) when you need a new job or a typed failure. Use submit(payload) when the caller needs to inspect whether the runtime accepted,
rejected, coalesced, or replaced work:
const submitted = await service.jobs.syncTickets.submit({ origin: "foo" });
if (submitted.isErr()) return Result.err(submitted.error);
switch (submitted.value.kind) {
case "accepted":
console.info("sync job accepted", submitted.value.ref.id);
break;
case "rejected":
console.info("sync already active or queued", submitted.value.key);
break;
case "coalesced":
console.info("sync coalesced", submitted.value.existing);
break;
case "replaced":
console.info("sync job replaced older queued work", submitted.value.ref.id);
break;
} Even with keyed concurrency, keep service-owned checkpoint commits idempotent. Trellis prevents duplicate active work; your service database should still guard durable state and external side effects.
Development loop
- Update job schemas and the contract’s top-level
jobsmap - Rebuild contract artifacts:
deno task prepare - Review and accept or reject the pending service authority change in Console if queues changed; use
trellis svc <service-id> authority plan listandauthority plan show <PLAN_ID>only for local or automation fallback - Restart the service
Next steps
- Operations: TypeScript — expose a caller-visible
Orders.Processoperation backed by these jobs - Administer Jobs — query and manage these jobs as an operator