This guide adds background job processing to the Rust orders-service from Tutorial: Write a service. By the end the service will enqueue fulfillment jobs when an order is created and process them with retry, progress tracking, and event emission on completion.

Before starting, complete the Rust path in Tutorial: Write a service.

What are jobs?

Jobs are service-private background work units. For caller-visible async workflows, see Operations: Rust.

1. Add top-level jobs to the contract manifest

Update contracts/orders_service.manifest.json to add a top-level jobs map with the three fulfillment queue types:

"resources": {
  "kv": {
    "orders": { "purpose": "Store order records", "history": 1, "ttlMs": 0 }
  }
},
"jobs": {
  "reserve-inventory": {
    "payload": { "schema": "ReserveInventoryPayload" },
    "result": { "schema": "ReserveInventoryResult" },
    "maxDeliver": 5,
    "backoffMs": [5000, 30000, 120000]
  },
  "charge-payment": {
    "payload": { "schema": "ChargePaymentPayload" },
    "result": { "schema": "ChargePaymentResult" },
    "maxDeliver": 5,
    "ackWaitMs": 600000
  },
  "notify-warehouse": {
    "payload": { "schema": "NotifyWarehousePayload" },
    "result": { "schema": "NotifyWarehouseResult" },
    "maxDeliver": 3
  }
}

Regenerate the participant facade after updating the manifest.

2. Create jobs from the Orders.Create handler

The participant facade exposes service.jobs() returning a JobsFacade with one typed JobQueue per declared queue. The facade converts kebab-case manifest queue names to snake_case Rust method names: "reserve-inventory"reserve_inventory(), "charge-payment"charge_payment(), etc.

Update the Orders.Create handler:

service
    .handle()
    .rpc()
    .orders()
    .create(|_ctx, req| async move {
        let order_id = ulid::Ulid::new().to_string();
        // save order...

        service
            .jobs()
            .reserve_inventory()
            .create(ReserveInventoryPayload {
                order_id: order_id.clone(),
                items: req.items,
            })
            .await?;

        Ok(CreateOrderResponse { order_id, status: "pending".into() })
    });

3. Register job workers

Use handle(handler) on each JobQueue to register a worker. Call all registrations before start_workers():

service
    .jobs()
    .reserve_inventory()
    .handle(|job: ActiveJob<ReserveInventoryPayload, ReserveInventoryResult>| async move {
        tracing::info!(
            request_id = %job.context().request_id,
            trace_id = %job.context().trace_id,
            "reserve inventory job started",
        );
        job.progress(JobProgress {
            step: Some("reserving".into()),
            message: Some("Checking inventory".into()),
            ..Default::default()
        })
        .await?;

        if job.is_cancelled() {
            return Err(Err(JobsError::Message { message: "cancelled".to_string() }));
        }

        // reserve inventory...

        service
            .jobs()
            .charge_payment()
            .create(ChargePaymentPayload {
                order_id: job.payload().order_id.clone(),
                customer_id: "cust_placeholder".into(),
                amount_cents: 1000,
            })
            .await?;

        Ok(ReserveInventoryResult { reserved: true })
    })
    .await?;

service
    .jobs()
    .charge_payment()
    .handle(|job: ActiveJob<ChargePaymentPayload, ChargePaymentResult>| async move {
        job.progress(JobProgress {
            step: Some("charging".into()),
            message: Some("Processing payment".into()),
            ..Default::default()
        })
        .await?;

        if job.is_cancelled() {
            return Err(Err(JobsError::Message { message: "cancelled".to_string() }));
        }

        let charge_id = format!("ch_{}", ulid::Ulid::new());

        service
            .jobs()
            .notify_warehouse()
            .create(NotifyWarehousePayload {
                order_id: job.payload().order_id.clone(),
                items: vec![],
            })
            .await?;

        Ok(ChargePaymentResult { charge_id })
    })
    .await?;

service
    .jobs()
    .notify_warehouse()
    .handle(|job: ActiveJob<NotifyWarehousePayload, NotifyWarehouseResult>| async move {
        job.progress(JobProgress {
            step: Some("notifying".into()),
            message: Some("Notifying warehouse".into()),
            ..Default::default()
        })
        .await?;

        if job.is_cancelled() {
            return Err(Err(JobsError::Message { message: "cancelled".to_string() }));
        }

        service
            .events()
            .publish::<orders_participant::events::OrdersShipped>(OrderShippedEvent {
                order_id: job.payload().order_id.clone(),
                customer_id: "cust_placeholder".into(),
                shipped_at: chrono::Utc::now().to_rfc3339(),
            })
            .await?;

        Ok(NotifyWarehouseResult { notified: true })
    })
    .await?;

Every active job exposes immutable job.context() with request_id, trace_id, traceparent, and optional tracestate. The runtime copies this context to all job lifecycle events and NATS headers, so include request_id or trace_id in service logs when you need to follow work across RPCs, events, operations, and jobs.

4. Start workers

Call start_workers() after all handle() registrations:

let worker_host = service.jobs().start_workers().await?;

Hold the worker_host handle for the lifetime of the service. Stop it during shutdown:

worker_host.stop().await?;

5. Long-running jobs

Call job.heartbeat().await periodically to extend the ack deadline for jobs that may run longer than ackWaitMs:

job.heartbeat().await?;

6. Retry and failure

Returning Err(...) from a handler triggers JetStream NAK and redelivery per the backoffMs schedule. After maxDeliver attempts the job moves to the dead queue. Use job.is_redelivery() and job.redelivery_count() to adjust behavior on redelivery.

7. Keyed concurrency and queue depth

Use keyed concurrency when one logical unit of work must have at most one active job across all service instances. Queues without keyConcurrency keep normal work-queue behavior. The existing numeric concurrency option still controls how many worker tasks run for the queue; keyConcurrency controls the active limit for a derived logical key.

"jobs": {
  "sync-tickets": {
    "payload": { "schema": "SyncTicketsPayload" },
    "result": { "schema": "SyncTicketsResult" },
    "concurrency": 4,
    "keyConcurrency": {
      "key": ["zendesk", "/origin", "tickets"],
      "maxActive": 1,
      "stalePolicy": "fail-stale"
    },
    "queue": {
      "maxQueuedPerKey": 1,
      "whenFull": "reject"
    }
  }
}

The default keyed policy is conservative: maxActive: 1, stalePolicy: "fail-stale", maxQueuedPerKey: 0, and whenFull: "reject". That means Trellis fails a duplicate create by default instead of silently coalescing or replacing work. Use the generated strict create(...) method when you need a new job or a typed failure; use the generated policy-aware submit(...) method when the service needs to inspect accepted, rejected, coalesced, or replaced outcomes.

Even with keyed concurrency, keep service-owned checkpoint commits idempotent. Trellis prevents duplicate active work; your service database should still guard durable state and external side effects.

Next steps