Durable Jobs & External Workers
Atomo's action system is event-driven — great for short, safe, in-data-path hooks, wrong for long-running, side-effect-heavy work (calling flaky external APIs, browser automation, ffmpeg/image pipelines, minute-long polling, large binaries).
For that work Atomo provides a durable job queue plus external workers: the server owns an event-sourced job lifecycle; a trusted worker process (any language, full native ecosystem) pulls jobs, does the messy I/O, and reports results back. The server is the brain; the worker is the hands.
For the why and the full design, see External Workers & Blob Storage.
The shape
enqueue lease (X-Worker-Token)
───────▶ POST /jobs ──┐ ┌──────────────────────────┐
workflow Job step ───────┤ │ your worker process │
JobStore::enqueue ───────┘ │ @atomo-cc/worker-sdk │
│ Playwright · ffmpeg · … │
┌──────────────────┐ └───────────┬──────────────┘
poll │ atomo-server │◀── complete / fail / heartbeat
──────▶│ jobs (events + │
GET /jobs/{id} projection)│
└──────────────────┘1. Mint a worker token (admin)
Workers authenticate with a worker token — a credential distinct from user logins, scoped to specific queues. An admin mints one (the plaintext is shown once):
curl -X POST http://localhost:3000/jobs/workers \
-H "authorization: Bearer $ADMIN_JWT" -H 'content-type: application/json' \
-d '{"name":"media-worker","queues":["media-gen"]}'
# → 201 { "id": "...", "token": "wkr_…", "queues": ["media-gen"] }The token can only lease the queues it was minted for (["*"] allows any). Store it as a secret for the worker; only its SHA-256 is kept server-side. Admins can GET /jobs/workers to list tokens (metadata only) and DELETE /jobs/workers/{id} to revoke one — a revoked token's requests immediately become 401.
2. Enqueue work
Five ways, all idempotent when you pass an idempotencyKey:
From an app/UI (HTTP). Any authenticated user; the job is stamped with the caller's tenant.
curl -X POST http://localhost:3000/jobs \
-H "authorization: Bearer $JWT" -H 'content-type: application/json' \
-d '{"queue":"media-gen","kind":"video.generate","payload":{"prompt":"a calm sea"}}'
# → 201 { "id": "..." }From GraphQL. The enqueueJob mutation (requires an authenticated request; stamps the request's tenant). Returns the job id:
mutation { enqueueJob(queue: "media-gen", kind: "video.generate") }From a workflow (no code). Add a Job step — the new job id lands in the workflow context as job_id:
{ "name": "kick-off-render",
"action": { "Job": { "queue": "media-gen", "kind": "video.generate",
"payload": { "prompt": "…" }, "idempotency_key": "deal-42" } } }From an action: the action dispatcher automatically enqueues jobs when event conditions match.
From Rust (in-process): JobStore::enqueue(queue, kind, payload, idempotency_key, max_attempts, priority, tenant_id).
3. Run a worker
Use @atomo-cc/worker-sdk — you write a handler per kind; the SDK owns the lease → heartbeat → complete/fail loop.
import { createWorker, NonRetryableError } from "@atomo-cc/worker-sdk";
const worker = createWorker({
url: "http://localhost:3000",
token: process.env.ATOMO_WORKER_TOKEN!,
queues: ["media-gen"],
concurrency: 4,
});
worker.on("video.generate", async ({ job, signal, progress }) => {
if (typeof job.payload !== "object") throw new NonRetryableError("bad payload");
await progress({ percent: 0.1, message: "calling provider" }); // live update (see below)
const mp4 = await runProviderPipeline(job.payload, { signal }); // your native code
return { assetId: await upload(mp4) }; // → stored as the job result
});
worker.start();
process.on("SIGTERM", () => worker.stop());Live progress (optional)
Call ctx.progress({ percent?, message?, data? }) from a handler to publish an ephemeral update. It fans out over realtime on the channel job:{id} (it is not persisted). A UI that enqueued the job subscribes to that channel over /realtime/ws to show a live progress bar:
const ws = new WebSocket("ws://localhost:3000/realtime/ws?token=" + jwt);
ws.onopen = () => ws.send(JSON.stringify({ op: "subscribe", channel: `job:${jobId}` }));
ws.onmessage = (e) => { const m = JSON.parse(e.data); /* { jobId, percent, message, data } */ };4. Poll the result
curl http://localhost:3000/jobs/<id> -H "authorization: Bearer $JWT"
# → { "id", "queue", "kind", "status": "succeeded", "attempts": 1,
# "maxAttempts": 5, "result": { "assetId": "…" }, "error": null }status is one of queued · leased · succeeded · dead. A tenant-bound user only sees their own tenant's jobs.
Semantics you can rely on
- At-least-once delivery. A job can run twice (e.g. a lease expired after the handler finished). Make handlers idempotent;
idempotencyKeymakes enqueue idempotent. - Leases & crash recovery. A leased job has a visibility window; if the worker dies, the lease expires and a background sweep (every
ATOMO_JOB_RECLAIM_INTERVALseconds, default 30) returns the job to the queue. Each lease carries a unique token, so a revived zombie worker can't complete a job that was already re-leased (itsheartbeat/complete/failget409). - Retry / backoff / dead-letter. A failed attempt is retried with exponential backoff until
maxAttempts, then dead-lettered (status: "dead", never silently dropped). ThrowNonRetryableError(or reportretryable: false) to dead-letter immediately. - Concurrent workers. Dispatch uses Postgres
SELECT … FOR UPDATE SKIP LOCKED, so many workers lease disjoint jobs from one database without contention. - Event-sourced. Every transition is emitted as a
Jobmodel event (audit / history / replay) — the operational advantage over a mutable status column for flaky pipelines.
Security boundary
A worker is trusted relative to the sandbox but still least-privilege: its token is scoped to specific queues, distinct from user JWTs, and revocable. The action system is untouched — an action can enqueue a job but never becomes a worker.
See also
- Jobs API (REST) — endpoint reference
- Workflows API — the
Jobstep - External Workers & Blob Storage — the design
- Multi-tenant — jobs carry
tenant_id