Metered Command Primitives
Status: implemented (library primitives in
atomo_server) · Layer: Atomo core (atomo_server::metered+atomo_server::jobs) · Pull trigger: a consumer exposing a metered command (a credit/billing debit, a rate-limited public command, a metered-API gateway) that needs the whole command to commit or roll back as one unit.
A metered command is one that must, atomically: reserve some budget, spend a one-time grant (a single-use token), and start durable work (a job). If any step fails, none must take effect — a half-applied command that consumes a token without enqueuing work, or charges a budget for a job that never ran, is a correctness bug.
Atomo provides three composable primitives for this. They own no business policy: scopes, purposes, limits, windows, and units are all caller-supplied opaque values. Nothing here knows what is being metered, who the caller is, or what the work does.
The primitives
1. Transactional job enqueue — JobStore::enqueue_tx
JobStore::enqueue runs on its own pooled connection, so it cannot be part of a larger atomic operation. enqueue_tx takes a caller-supplied &mut PgConnection (pass &mut *tx) so the enqueue commits or rolls back with the caller's other writes:
let (job_id, created) = job_store
.enqueue_tx(&mut tx, queue, kind, payload, Some(idem_key), max_attempts, priority, tenant_id)
.await?;It is idempotent on (queue, idempotency_key) exactly like enqueue: a second enqueue for the same key returns the existing id with created = false. It emits no event — a rolled-back transaction must not leak a phantom Job Created event. After the transaction commits, call job_store.emit_enqueued(&job_id, queue, kind) when created is true.
2. Expiring single-use token store — metered::ExpiringTokenStore
A one-time grant: mint an opaque token now, redeem it exactly once before it expires. Only the SHA-256 of the token is stored; the plaintext is returned once at mint and is not recoverable from the database (same discipline as worker tokens).
// At grant time (e.g. after an upload is accepted):
let token = tokens.mint("upload", &subject, json!({ "assetId": id }), 3600).await?; // plaintext, shown once
// Inside the command transaction — succeeds at most once:
if let Some(grant) = tokens.consume(&mut tx, "upload", &subject, &presented).await? {
let asset_id = grant.payload; // the opaque payload attached at mint
}purposenamespaces token kinds;subjectis an opaque caller-chosen scope (e.g. a session identifier). Consumption requires both to match.- Single use is enforced by a single
UPDATE … WHERE consumed_at IS NULL … RETURNING— concurrent redeemers race for one row lock, so exactly one wins. consumetakes&mut PgConnectionand must run inside the caller's transaction, so a rolled-back command leaves the token unspent.purge_expired()reclaims expired rows; wire it to a scheduler under your retention policy.
3. Integer-unit budget ledger — metered::BudgetLedger
An append-only ledger of signed integer amounts (the unit is the caller's — micro-USD, credits, tokens) charged against an opaque scope. Capacity is the windowed sum over a recent interval.
// Inside the command transaction — reserve only if it keeps the window within budget:
if ledger.try_reserve(&mut tx, "global:hourly", cost_micros, hourly_limit, 3600).await? {
// within budget — proceed
}
let spent = ledger.windowed_sum("global:hourly", 3600).await?; // read-only, for reportingtry_reserve takes a transaction-scoped advisory lock keyed by scope, recomputes the windowed sum, and inserts the reservation only if windowed_sum + amount <= limit. The advisory lock serializes concurrent reservations on the same scope, so two callers can never both pass a check that only one budget allows (verified by a concurrency test). The lock releases on commit/rollback.
Composing a metered command
The three primitives combine in one transaction so the command is all-or-nothing:
let mut tx = pool.begin().await?;
if !ledger.try_reserve(&mut tx, &scope, cost, limit, window).await? {
return /* over budget */;
}
let Some(grant) = tokens.consume(&mut tx, "upload", &subject, &presented).await? else {
return /* invalid/used token */;
};
let (job_id, created) = job_store
.enqueue_tx(&mut tx, &queue, &kind, payload(grant.payload), Some(&idem), attempts, 0, tenant)
.await?;
tx.commit().await?; // budget, token, and job commit together — or none do
if created { job_store.emit_enqueued(&job_id, &queue, &kind); }The result/cost of the work is reconciled separately when the job completes — workers report completion through the job lifecycle (/jobs complete/fail) and the resulting Job event, so a read of command status causes no state change. (Reconciling a metered command's actual cost back into the ledger via the completion event is a consumer-owned projection.)
Boundary (what this is not)
- No business policy. These primitives do not define what a token grants, what a budget protects, or who may call. A consumer composes them with its own schema, routes, and rules.
- No HTTP surface. They are library APIs, intentionally not network-exposed: the atomic guarantees require composing them inside one transaction with consumer-owned writes, which only a same-process or same-database consumer can do. Exposing each primitive as a standalone endpoint would break atomicity across the network boundary.
- Not event-sourced (v1). The token and ledger tables are operational state, not part of the event log. The job remains event-sourced.
Storage
Two idempotently-created tables (self-initialized at boot like the other operational stores):
expiring_tokens (token_sha256 PK, purpose, subject, payload, expires_at, consumed_at, created_at)budget_ledger (id PK, scope, amount, created_at)with a(scope, created_at)window index
Configuration
ATOMO_ENABLE_METERED_COMMANDS (ServerConfig::enable_metered_commands, default false) controls whether the two tables are created at boot. Set it to true to make the primitives available; leave it off to skip them on deployments that don't use the feature. There are no other env vars — scopes, purposes, limits, windows, and units are all passed by the caller at the call site.
Tests
- Unit (
atomo_server::metered): token hashing is stable/opaque; advisory-key derivation is deterministic and scope-specific. - Integration (
crates/atomo_server/tests/metered_store.rs, Postgres-gated): single-use consumption; scope/expiry rejection; windowed budget enforcement; concurrent reservations never over-commit; and transactional atomicity — a rolled-back command consumes no token, reserves no budget, and enqueues no job, while a committed one does all three.
DATABASE_URL=postgres:///atomo_test \
cargo test -p atomo_server --test metered_store -- --ignored --test-threads=1