This guide shows you how to add a caller-visible async operation to the Rust orders-service. By the end of this guide, callers will be able to start Orders.Process, watch typed progress updates, and receive the terminal result.
Before starting, complete Jobs: Rust.
1. Add Orders.Process to the contract manifest
Update contracts/orders_service.manifest.json to add an operations section:
"operations": {
"Orders.Process": {
"version": "v1",
"input": { "schema": { "$ref": "ProcessOrderRequest" } },
"progress": { "schema": { "$ref": "ProcessOrderProgress" } },
"output": { "schema": { "$ref": "ProcessOrderResult" } },
"capabilities": {
"call": ["orders.write"],
"observe": ["orders.read"],
"cancel": ["orders.write"],
"control": ["orders.write"]
},
"cancel": true
}
} call starts the operation, observe gates get/wait/watch, cancel gates
cancellation, and control gates named post-start signals. If the operation has
no named signals, control can be omitted. If observe is omitted, it defaults
to call; an explicit empty observe list makes watching available to
authenticated callers.
Regenerate the participant facade after updating the manifest.
2. Implement the operation handler
Rust operation handlers use a service-owned operation runtime and an OperationControl handle. The handler accepts an operation id, marks the
operation running, publishes any initial progress, and then either completes it
directly or attaches completion to service-owned async work.
let runtime = InMemoryOperationRuntime::new("orders-service");
let operations = runtime.operation::<orders_participant::operations::OrdersProcess>();
router.register_operation::<orders_participant::operations::OrdersProcess, _, _, _, _, _, _, _, _>(
{
let operations = operations.clone();
move |_ctx, input| {
let operations = operations.clone();
async move {
let operation_id = format!("orders-process-{}", input.order_id);
let accepted = operations.accept(operation_id.clone()).await?;
let control = operations.control(operation_id).await?;
control.started().await?;
control
.progress(ProcessOrderProgress {
step: "queued".into(),
message: "Fulfillment started".into(),
})
.await?;
tokio::spawn(async move {
let result = run_fulfillment(input).await;
match result {
Ok(output) => {
let _ = control.complete(output).await;
}
Err(error) => {
let _ = control.fail(OperationFailure {
message: error.to_string(),
}).await;
}
}
});
Ok(accepted)
}
}
},
{
let operations = operations.clone();
move |_ctx, operation_id| {
let operations = operations.clone();
async move { operations.get(operation_id).await }
}
},
{
let operations = operations.clone();
move |_ctx, operation_id| {
let operations = operations.clone();
async move { operations.wait(operation_id).await }
}
},
|_ctx, _operation_id| async move {
Err(ServerError::OperationUnsupportedControl {
operation: "Orders.Process".into(),
action: "cancel".into(),
})
},
); For service-owned async work that can drive the operation to a terminal state,
use OperationControl::attach(...). The attached task must complete, fail, or
cancel the operation before it returns; otherwise attach(...) returns an error
because the operation is still non-terminal.
control
.attach(async {
let output = run_fulfillment(input).await?;
control.complete(output).await?;
Ok::<(), ServerError>(())
})
.await?; 3. The job workers — no changes needed
The workers from Jobs: Rust run unchanged. They report
progress via job.progress(...) and return Ok(result). If a job owns the
terminal operation path, persist the operation id in the job payload and resume
the service-owned OperationControl by id when the worker completes.
4. Caller side
let client = orders_participant::connect_service(...).await?;
let op_ref = client.orders()
.operation::<OrdersProcessOperation>()
.start(ProcessOrderRequest { order_id: "ord_123".into() })
.await?;
// Wait for terminal result
let terminal = op_ref.wait().await?;
println!("{:?}", terminal.output);
// Or watch live progress
let mut stream = op_ref.watch();
while let Some(event) = stream.next().await {
match event {
OperationEvent::Progress { snapshot } => println!("{:?}", snapshot.progress),
OperationEvent::Completed { snapshot } => { println!("{:?}", snapshot.output); break; }
_ => {}
}
} 5. Cancellation
let snapshot = op_ref.cancel().await?; The cancellation signal flows through the operation control path. The service
runtime rejects cancellation for non-cancelable operations; cancelable operations
should propagate cancellation to any service-private job or task and resolve the
operation with a cancelled terminal snapshot.