This guide shows you how to add a caller-visible async operation to the Rust orders-service. By the end of this guide, callers will be able to start Orders.Process, watch typed progress updates, and receive the terminal result.

Before starting, complete Jobs: Rust.

1. Add Orders.Process to the contract manifest

Update contracts/orders_service.manifest.json to add an operations section:

"operations": {
  "Orders.Process": {
    "version": "v1",
    "input": { "schema": { "$ref": "ProcessOrderRequest" } },
    "progress": { "schema": { "$ref": "ProcessOrderProgress" } },
    "output": { "schema": { "$ref": "ProcessOrderResult" } },
    "capabilities": {
      "call": ["orders.write"],
      "observe": ["orders.read"],
      "cancel": ["orders.write"],
      "control": ["orders.write"]
    },
    "cancel": true
  }
}

call starts the operation, observe gates get/wait/watch, cancel gates cancellation, and control gates named post-start signals. If the operation has no named signals, control can be omitted. If observe is omitted, it defaults to call; an explicit empty observe list makes watching available to authenticated callers.

Regenerate the participant facade after updating the manifest.

2. Implement the operation handler

Rust operation handlers use a service-owned operation runtime and an OperationControl handle. The handler accepts an operation id, marks the operation running, publishes any initial progress, and then either completes it directly or attaches completion to service-owned async work.

let runtime = InMemoryOperationRuntime::new("orders-service");
let operations = runtime.operation::<orders_participant::operations::OrdersProcess>();

router.register_operation::<orders_participant::operations::OrdersProcess, _, _, _, _, _, _, _, _>(
    {
        let operations = operations.clone();
        move |_ctx, input| {
            let operations = operations.clone();
            async move {
                let operation_id = format!("orders-process-{}", input.order_id);
                let accepted = operations.accept(operation_id.clone()).await?;
                let control = operations.control(operation_id).await?;

                control.started().await?;
                control
                    .progress(ProcessOrderProgress {
                        step: "queued".into(),
                        message: "Fulfillment started".into(),
                    })
                    .await?;

                tokio::spawn(async move {
                    let result = run_fulfillment(input).await;
                    match result {
                        Ok(output) => {
                            let _ = control.complete(output).await;
                        }
                        Err(error) => {
                            let _ = control.fail(OperationFailure {
                                message: error.to_string(),
                            }).await;
                        }
                    }
                });

                Ok(accepted)
            }
        }
    },
    {
        let operations = operations.clone();
        move |_ctx, operation_id| {
            let operations = operations.clone();
            async move { operations.get(operation_id).await }
        }
    },
    {
        let operations = operations.clone();
        move |_ctx, operation_id| {
            let operations = operations.clone();
            async move { operations.wait(operation_id).await }
        }
    },
    |_ctx, _operation_id| async move {
        Err(ServerError::OperationUnsupportedControl {
            operation: "Orders.Process".into(),
            action: "cancel".into(),
        })
    },
);

For service-owned async work that can drive the operation to a terminal state, use OperationControl::attach(...). The attached task must complete, fail, or cancel the operation before it returns; otherwise attach(...) returns an error because the operation is still non-terminal.

control
    .attach(async {
        let output = run_fulfillment(input).await?;
        control.complete(output).await?;
        Ok::<(), ServerError>(())
    })
    .await?;

3. The job workers — no changes needed

The workers from Jobs: Rust run unchanged. They report progress via job.progress(...) and return Ok(result). If a job owns the terminal operation path, persist the operation id in the job payload and resume the service-owned OperationControl by id when the worker completes.

4. Caller side

let client = orders_participant::connect_service(...).await?;
let op_ref = client.orders()
    .operation::<OrdersProcessOperation>()
    .start(ProcessOrderRequest { order_id: "ord_123".into() })
    .await?;

// Wait for terminal result
let terminal = op_ref.wait().await?;
println!("{:?}", terminal.output);

// Or watch live progress
let mut stream = op_ref.watch();
while let Some(event) = stream.next().await {
    match event {
        OperationEvent::Progress { snapshot } => println!("{:?}", snapshot.progress),
        OperationEvent::Completed { snapshot } => { println!("{:?}", snapshot.output); break; }
        _ => {}
    }
}

5. Cancellation

let snapshot = op_ref.cancel().await?;

The cancellation signal flows through the operation control path. The service runtime rejects cancellation for non-cancelable operations; cancelable operations should propagate cancellation to any service-private job or task and resolve the operation with a cancelled terminal snapshot.