This guide shows you how to add a caller-visible async operation to the orders-service. By the end, callers will be able to start Orders.Process, watch typed progress updates, and receive the terminal result — without needing to know anything about the service’s internal jobs.
Before starting, complete Jobs: TypeScript.
Jobs vs operations
Jobs are strictly internal to a service. An operation is a public contract surface for async work. Callers interact with operations; your internal jobs remain hidden. One operation can be backed by many jobs, or by direct logic — that is an implementation detail.
1. Add Orders.Process to the contract
Update contracts/orders_service.ts to add an operations section:
import {
ProcessOrderRequestSchema,
ProcessOrderProgressSchema,
ProcessOrderResultSchema,
} from "./schemas.ts";
// In the first defineServiceContract argument, add to the schemas map:
{
schemas: {
// ... existing schemas ...
ProcessOrderRequest: ProcessOrderRequestSchema,
ProcessOrderProgress: ProcessOrderProgressSchema,
ProcessOrderResult: ProcessOrderResultSchema,
},
}
// In the returned contract body, make sure these local capabilities are declared
// in the top-level capabilities map before referencing them from the operation:
capabilities: {
"orders.read": { /* ... */ },
"orders.write": { /* ... */ },
},
operations: {
"Orders.Process": {
version: "v1",
input: { schema: "ProcessOrderRequest" },
progress: { schema: "ProcessOrderProgress" },
output: { schema: "ProcessOrderResult" },
capabilities: {
call: ["orders.write"],
observe: ["orders.read"],
cancel: ["orders.write"],
control: ["orders.write"],
},
cancel: true,
},
}, call starts the operation, observe gates get/wait/watch, cancel gates
cancellation, and control gates named post-start signals. If you do not declare
signals, control can be omitted; it is shown here to make the four operation
capability gates explicit. If observe is omitted, it defaults to call; an
explicit empty observe list makes watching available to authenticated callers.
Add the schemas to schemas.ts:
export const ProcessOrderRequestSchema = Type.Object({
orderId: Type.String(),
});
export type ProcessOrderRequest = Static<typeof ProcessOrderRequestSchema>;
export const ProcessOrderProgressSchema = Type.Object({
step: Type.String(),
message: Type.String(),
});
export type ProcessOrderProgress = Static<typeof ProcessOrderProgressSchema>;
export const ProcessOrderResultSchema = Type.Object({
orderId: Type.String(),
status: Type.String(),
});
export type ProcessOrderResult = Static<typeof ProcessOrderResultSchema>; 2. Implement the operation handler
The operation handler calls op.started(), optionally publishes initial progress, creates a job, and awaits op.attach(job.value). The framework then automatically wires the job’s terminal result through to the operation:
await service.handle.operation.orders.process(async ({ input, op }) => {
const started = await op.started();
if (started.isErr()) return Result.err(started.error);
const progress = await op.progress({
step: "queued",
message: "Fulfillment started",
});
if (progress.isErr()) return Result.err(progress.error);
const job = await service.jobs.reserveInventory.create({
orderId: input.orderId,
items: [],
});
if (job.isErr()) return Result.err(job.error);
return await op.attach(job.value);
}); op.attach(job.value) blocks until the job chain completes. Pass job.value — the unwrapped JobRef from the Result<JobRef>. The framework extracts the typed output from the terminal job snapshot to resolve the caller’s OperationRef.
If the attached queue uses keyed concurrency, create(...) may return a typed
expected failure when another active or queued job already owns the same key. In
that case, translate the failure into operation progress or a terminal operation
error that is meaningful to the caller. If you intentionally want to report
policy outcomes such as coalesced or replaced, use the job queue’s
policy-aware submit(...) helper and attach only the JobRef returned by an accepted or replaced outcome. Do not attach a caller’s operation to a job that
was merely reported as already running unless your service has explicitly built a
shared operation-progress model.
Deferring terminal completion
Most handlers complete by returning Result.ok(output), op.attach(job), or a
terminal error. If another durable control path will complete the operation later,
return op.defer() instead of keeping the handler promise open forever. This is
the pattern used by review-gated flows such as device activation: the first
handler records progress, then an admin decision completes or rejects the same
operation id.
await service.handle.operation.orders.process(async ({ input, op }) => {
const job = await service.jobs.reserveInventory.create({
operationId: op.id,
orderId: input.orderId,
items: [],
});
if (job.isErr()) return Result.err(job.error);
const progress = await op.progress({
step: "pending_review",
message: `Order ${input.orderId} is waiting for approval`,
});
if (progress.isErr()) return Result.err(progress.error);
return op.defer();
}); The job or review handler resumes the operation through the public
operation-scoped service control helper. It only needs the durable operation id;
it does not need the original in-memory op object and should not reach into
private service internals.
service.jobs.reserveInventory.handle(async ({ job }) => {
const op = await service.handle.operation.orders.process
.control(job.payload.operationId)
.orThrow();
await op.progress({
step: "fulfilling",
message: "Inventory reserved; preparing shipment",
}).orThrow();
await op.complete({
orderId: job.payload.orderId,
status: "ready_to_ship",
}).orThrow();
return Result.ok({ completed: true });
}); 3. The job worker — no changes needed
For op.attach(job) flows, the job workers from Jobs: TypeScript run unchanged. They report progress via job.progress(...) and return Result.ok(result). The op.attach() wires their results to the operation automatically.
For op.defer() flows, the worker owns the operation terminal path and should use service.handle.operation.<group>.<leaf>.control(operationId) to publish caller-visible progress,
completion, failure, or cancellation after the original handler has returned.
4. Caller side
A caller starts the operation and observes it:
// Start
const ref = await client.operation.orders.process.start({ orderId: "ord_123" });
if (ref.isErr()) throw ref.error;
const op = ref.value;
// Wait for final result (most common)
const result = await op.wait();
if (result.isErr()) throw result.error;
console.log(result.value.output); // { orderId, status }
// Or watch live progress events
for await (const event of op.watch()) {
if (event.type === "progress") {
console.log(event.snapshot.progress); // { step, message }
}
if (event.type === "completed") {
console.log(event.snapshot.output);
break;
}
} 5. Cancellation
A caller can cancel in-flight operations:
const snapshot = await op.cancel(); The cancellation signal flows to the active job worker via job.cancelled. The worker checks it cooperatively:
service.jobs.reserveInventory.handle(async ({ job }) => {
if (job.cancelled) {
return Result.err({ type: "JobCancelledError", message: "Cancelled" });
}
// ...
}); When the job returns a cancelled error, op.attach() resolves with a cancelled TerminalOperation.
6. Transfer-capable operations
Use a transfer-capable operation when the async workflow starts with caller-sent bytes, such as a document upload that the service stores and then processes. The operation remains the caller-visible workflow; byte-transfer progress is separate from business progress reported with op.progress(...).
Declare the transfer on the operation contract. The store names a service-owned resources.store binding, while key and contentType are JSON Pointers into the operation input:
operations: {
"Documents.Files.Upload": {
version: "v1",
input: ref.schema("FilesUploadRequest"),
progress: ref.schema("FilesUploadProgress"),
output: ref.schema("FilesUploadResult"),
transfer: {
direction: "send",
store: "uploads",
key: "/key",
contentType: "/contentType",
expiresInMs: 60_000,
},
capabilities: {
call: ["documents.write"],
observe: ["documents.read"],
},
},
}, Callers attach the bytes before starting the operation. Use transfer callbacks for progress bars that track bytes, and use operation progress for domain milestones such as stored, queued, or processed:
const upload = await client.operation.documents.filesUpload
.input({
key: "incoming/report.pdf",
contentType: "application/pdf",
})
.transfer(fileBytes)
.onTransfer((event) => {
console.log("bytes sent", event.transfer.transferredBytes);
})
.onProgress((event) => {
console.log(event.progress.stage, event.progress.message);
})
.start()
.orThrow();
const completed = await upload.wait().orThrow(); On the provider side, transfer.completed() is the bridge from transport to store-backed processing. Once it resolves, the staged bytes are durable in the service store named by the contract. Process them directly, or enqueue a private job if follow-up work should retry or outlive the current process:
await service.handle.operation.documents.filesUpload(
async ({ input, op, transfer }) => {
const transferred = await transfer.completed();
if (transferred.isErr()) return Result.err(transferred.error);
const stored = await op.progress({
stage: "stored",
message: `Stored ${transferred.value.size} bytes`,
});
if (stored.isErr()) return Result.err(stored.error);
const job = await service.jobs.processUpload.create({
key: input.key,
size: transferred.value.size,
});
if (job.isErr()) return Result.err(job.error);
return await op.attach(job.value);
},
); For post-transfer store access and cleanup, see Store resources: TypeScript. Keep raw store bindings service-local; if a caller needs to receive stored bytes later, expose a contract-owned receive transfer grant instead of handing out store access.
Development loop
- Update operation schemas and contract
operationssection - Rebuild contract artifacts and SDK
- Upgrade the contract in Trellis
- Restart the service