This guide adds background job processing to the Rust orders-service from Tutorial: Write a service. By the end the service will enqueue fulfillment jobs when an order is created and process them with retry, progress tracking, and event emission on completion.
Before starting, complete the Rust path in Tutorial: Write a service.
What are jobs?
Jobs are service-private background work units. For caller-visible async workflows, see Operations: Rust.
1. Add top-level jobs to the contract manifest
Update contracts/orders_service.manifest.json to add a top-level jobs map with the three fulfillment queue types:
"resources": {
"kv": {
"orders": { "purpose": "Store order records", "history": 1, "ttlMs": 0 }
}
},
"jobs": {
"reserve-inventory": {
"payload": { "schema": "ReserveInventoryPayload" },
"result": { "schema": "ReserveInventoryResult" },
"maxDeliver": 5,
"backoffMs": [5000, 30000, 120000]
},
"charge-payment": {
"payload": { "schema": "ChargePaymentPayload" },
"result": { "schema": "ChargePaymentResult" },
"maxDeliver": 5,
"ackWaitMs": 600000
},
"notify-warehouse": {
"payload": { "schema": "NotifyWarehousePayload" },
"result": { "schema": "NotifyWarehouseResult" },
"maxDeliver": 3
}
} Regenerate the participant facade after updating the manifest.
2. Create jobs from the Orders.Create handler
The participant facade exposes service.jobs() returning a JobsFacade with one typed JobQueue per declared queue. The facade converts kebab-case manifest queue names to snake_case Rust method names: "reserve-inventory" → reserve_inventory(), "charge-payment" → charge_payment(), etc.
Update the Orders.Create handler:
service
.handle()
.rpc()
.orders()
.create(|_ctx, req| async move {
let order_id = ulid::Ulid::new().to_string();
// save order...
service
.jobs()
.reserve_inventory()
.create(ReserveInventoryPayload {
order_id: order_id.clone(),
items: req.items,
})
.await?;
Ok(CreateOrderResponse { order_id, status: "pending".into() })
}); 3. Register job workers
Use handle(handler) on each JobQueue to register a worker. Call all registrations before start_workers():
service
.jobs()
.reserve_inventory()
.handle(|job: ActiveJob<ReserveInventoryPayload, ReserveInventoryResult>| async move {
tracing::info!(
request_id = %job.context().request_id,
trace_id = %job.context().trace_id,
"reserve inventory job started",
);
job.progress(JobProgress {
step: Some("reserving".into()),
message: Some("Checking inventory".into()),
..Default::default()
})
.await?;
if job.is_cancelled() {
return Err(Err(JobsError::Message { message: "cancelled".to_string() }));
}
// reserve inventory...
service
.jobs()
.charge_payment()
.create(ChargePaymentPayload {
order_id: job.payload().order_id.clone(),
customer_id: "cust_placeholder".into(),
amount_cents: 1000,
})
.await?;
Ok(ReserveInventoryResult { reserved: true })
})
.await?;
service
.jobs()
.charge_payment()
.handle(|job: ActiveJob<ChargePaymentPayload, ChargePaymentResult>| async move {
job.progress(JobProgress {
step: Some("charging".into()),
message: Some("Processing payment".into()),
..Default::default()
})
.await?;
if job.is_cancelled() {
return Err(Err(JobsError::Message { message: "cancelled".to_string() }));
}
let charge_id = format!("ch_{}", ulid::Ulid::new());
service
.jobs()
.notify_warehouse()
.create(NotifyWarehousePayload {
order_id: job.payload().order_id.clone(),
items: vec![],
})
.await?;
Ok(ChargePaymentResult { charge_id })
})
.await?;
service
.jobs()
.notify_warehouse()
.handle(|job: ActiveJob<NotifyWarehousePayload, NotifyWarehouseResult>| async move {
job.progress(JobProgress {
step: Some("notifying".into()),
message: Some("Notifying warehouse".into()),
..Default::default()
})
.await?;
if job.is_cancelled() {
return Err(Err(JobsError::Message { message: "cancelled".to_string() }));
}
service
.events()
.publish::<orders_participant::events::OrdersShipped>(OrderShippedEvent {
order_id: job.payload().order_id.clone(),
customer_id: "cust_placeholder".into(),
shipped_at: chrono::Utc::now().to_rfc3339(),
})
.await?;
Ok(NotifyWarehouseResult { notified: true })
})
.await?; Every active job exposes immutable job.context() with request_id, trace_id, traceparent, and optional tracestate. The runtime copies this context to all
job lifecycle events and NATS headers, so include request_id or trace_id in
service logs when you need to follow work across RPCs, events, operations, and
jobs.
4. Start workers
Call start_workers() after all handle() registrations:
let worker_host = service.jobs().start_workers().await?; Hold the worker_host handle for the lifetime of the service. Stop it during shutdown:
worker_host.stop().await?; 5. Long-running jobs
Call job.heartbeat().await periodically to extend the ack deadline for jobs that may run longer than ackWaitMs:
job.heartbeat().await?; 6. Retry and failure
Returning Err(...) from a handler triggers JetStream NAK and redelivery per the backoffMs schedule. After maxDeliver attempts the job moves to the dead queue. Use job.is_redelivery() and job.redelivery_count() to adjust behavior on redelivery.
7. Keyed concurrency and queue depth
Use keyed concurrency when one logical unit of work must have at most one active
job across all service instances. Queues without keyConcurrency keep normal
work-queue behavior. The existing numeric concurrency option still controls how
many worker tasks run for the queue; keyConcurrency controls the active limit
for a derived logical key.
"jobs": {
"sync-tickets": {
"payload": { "schema": "SyncTicketsPayload" },
"result": { "schema": "SyncTicketsResult" },
"concurrency": 4,
"keyConcurrency": {
"key": ["zendesk", "/origin", "tickets"],
"maxActive": 1,
"stalePolicy": "fail-stale"
},
"queue": {
"maxQueuedPerKey": 1,
"whenFull": "reject"
}
}
} The default keyed policy is conservative: maxActive: 1, stalePolicy: "fail-stale", maxQueuedPerKey: 0, and whenFull: "reject".
That means Trellis fails a duplicate create by default instead of silently
coalescing or replacing work. Use the generated strict create(...) method when
you need a new job or a typed failure; use the generated policy-aware submit(...) method when the service needs to inspect accepted, rejected, coalesced, or replaced outcomes.
Even with keyed concurrency, keep service-owned checkpoint commits idempotent. Trellis prevents duplicate active work; your service database should still guard durable state and external side effects.
Next steps
- Operations: Rust — expose a caller-visible
Orders.Processoperation backed by these jobs - Administer Jobs — query and manage these jobs as an operator