A high-performance, PostgreSQL-native distributed job queue and workflow engine, written in Rust.
Durable background jobs and real DAG workflows on a database you already run — no Redis, no broker, no separate state store.
Quick start · Workflows · How-to · Docs · Roadmap
QueueFlow Core runs durable background jobs and real DAG workflows directly on PostgreSQL — any
plain PostgreSQL 13+ (RDS, Cloud SQL, Azure, your laptop); no extensions. The jobs table is the
queue: workers claim rows with FOR UPDATE SKIP LOCKED and own them via lease tokens. Handlers can
live in the server binary (Rust) or in any language via the HTTP worker protocol; the OpenAPI spec
is generated from the code and ships with every release.
- Workflows as a real DAG orchestrator — dependency gating, context propagation, per-step failure policies, cycle detection, a builder DSL, and a Mermaid diagram endpoint.
- Thoroughly tested — 92 tests; the entire engine, scheduler, HTTP API, and Rust client run against a deterministic in-memory adapter with no database, plus opt-in Postgres integration tests.
- Durable retries & scheduling — a retry (or a
run_atjob) is just a row whosescheduled_atlies in the future, so delays survive restarts. - Delivery safety — lease tokens guard every outcome write (no stale worker can overwrite a finished or cancelled job), and a janitor reclaims expired leases through the normal retry policy, so a crashed worker consumes retry budget instead of crash-looping.
- Ports & adapters — the engine depends on the
JobStoretrait; Postgres and in-memory adapters implement it (the store doubles as the queue). - Code-generated OpenAPI — the spec is generated from the code (utoipa), so it can't drift; emitted by
queueflow spec. - Ergonomic API — durations are plain integer seconds; statuses and backoff are exhaustive enums, so illegal states are unrepresentable.
- Multi-tenancy — tenant isolation enforced on every job/workflow endpoint.
- Polyglot workers — a lease/heartbeat/complete/fail HTTP protocol lets handlers run in any language, with the same retry/DLQ/workflow semantics as in-process handlers.
- Idempotent enqueue — an
Idempotency-Keyheader makes client retries safe (no duplicate jobs). - Clients — a native Rust crate (
queueflow-client), a hand-written TypeScript SDK, and self-serve generation for anything else from the released spec.
- 🐘 PostgreSQL native — durable, transactional, at-least-once delivery on any plain Postgres 13+; no extensions, no extra infrastructure.
- 🔀 Workflows as DAGs — declare steps and
depends_on; the engine schedules each step once its dependencies complete, threads results through a shared context, and aggregates the final status. - ♻️ Durable retries — typed backoff (fixed / linear / exponential, capped, with jitter) scheduled in the row, so retries outlive restarts.
- 🪦 Dead-letter queue — exhausted, non-retryable, and unhandled jobs land somewhere you can inspect and replay.
- 🧪 Built to be tested — a ports-and-adapters core means the whole system runs in memory, deterministically, with a controllable clock.
- 📜 Code-generated OpenAPI — the spec is derived from the handlers and attached to every release; generate a client for any language against it.
- 📈 Observable — Prometheus metrics, structured JSON logs (
tracing), health/readiness probes. - 🛑 Graceful shutdown — drains in-flight work on
SIGINT/SIGTERM.
queueflow-core/ Cargo workspace
├── crates/
│ ├── queueflow-core/ library: domain, ports, adapters, engine, workflow
│ │ ├── domain.rs Job / Workflow / config / status enums
│ │ ├── ports.rs JobStore (storage + claim queue) + Clock traits
│ │ ├── adapters/
│ │ │ ├── memory.rs in-memory store (tests / local dev)
│ │ │ ├── clock.rs SystemClock + TestClock
│ │ │ └── postgres/ PostgreSQL adapter + LISTEN/NOTIFY hub (feature "postgres")
│ │ ├── engine/ enqueue, worker loop, durable retry, DLQ, janitor
│ │ ├── workflow/ DAG validation, builder DSL, scheduler
│ │ └── api.rs object-safe JobApi facade
│ ├── queueflow-api/ axum router + utoipa OpenAPI
│ ├── queueflow-client/ native Rust client (REST + remote worker runtime)
│ └── queueflow-server/ the `queueflow` binary (serve / spec / migrate / CLI)
├── migrations/ sqlx migrations (plain-SQL claim-queue schema)
├── sdk-configs/ openapi-generator configs (on-demand python/go)
├── scripts/generate-sdks.sh drives openapi-generator
└── Makefile
The key design choice is ports & adapters: queueflow-core is written against the JobStore trait
(which doubles as the claim queue), so the engine, the workflow scheduler, and the HTTP API can be
exercised end-to-end against a fast, deterministic in-memory adapter. In production the same code runs
on the PostgreSQL adapter.
use std::sync::Arc;
use queueflow_core::*;
use queueflow_core::task::builtin;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), EngineError> {
let clock = Arc::new(SystemClock);
let store = Arc::new(InMemoryJobStore::new(clock.clone()));
let engine = Engine::builder(store, clock)
.register("echo", builtin::echo())
.register_fn("greet", |p: Map| async move {
let name = p.get("name").and_then(|v| v.as_str()).unwrap_or("world");
Ok(Map::from_iter([("greeting".into(), json!(format!("hello {name}")))]))
})
.build();
let id = engine.enqueue("greet", Map::from_iter([("name".into(), json!("ada"))]), Default::default()).await?;
engine.process_once("default").await?; // drive one job (tests/demos)
println!("{:?}", engine.get_job(&id).await?.result);
Ok(())
}Run the bundled examples:
cargo run --example in_memory_jobs -p queueflow-core
cargo run --example workflow -p queueflow-core# any plain Postgres 13+ works, e.g.:
docker run -d --name pg -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres:16-alpine
export DATABASE_URL=postgres://postgres:postgres@localhost:5432/postgres
cargo run -p queueflow-server -- serve --mode all --workers 10 --api-port 8000The server applies migrations on startup, exposes the REST API on :8000, Prometheus metrics on :9090,
and interactive docs at http://localhost:8000/docs.
make docker # builds ghcr.io/queueflow/queueflow:dev
docker run -p 8000:8000 -p 9090:9090 \
-e DATABASE_URL=postgres://… \
ghcr.io/queueflow/queueflow:dev serveBuild a DAG with the typed builder, or POST the same JSON to /api/v1/workflows:
use queueflow_core::workflow::{WorkflowBuilder, StepBuilder};
use queueflow_core::OnFailure;
let req = WorkflowBuilder::new("order_123")
.step(StepBuilder::new("validate").task("validate_order"))
.step(StepBuilder::new("pay").task("process_payment").after("validate"))
.step(StepBuilder::new("ship").task("create_shipment").after("pay")
.on_failure(OnFailure::Continue))
.build()?; // validates the DAG (cycles -> Err)
let workflow_id = engine.create_workflow(req, None).await?;- Dependency gating — a step is enqueued only once all its
depends_onsteps complete. - Context propagation — each step's result is merged into the workflow context and injected into
downstream payloads under
_context. - Failure policies — per step:
halt(fail the workflow, cancel the rest),skip(skip it and its dependents, continue), orcontinue(leave it failed, keep going; the workflow endspartially_failed). - Cycle detection — validated at creation; a cycle is a
400. - Diagram —
GET /api/v1/workflows/{id}/diagramreturns a Mermaidgraph TD:
graph TD
validate["validate"]
pay["pay"]
ship["ship"]
validate --> pay
pay --> ship
Every /api/v1 route needs a bearer token. (Token validation is currently a placeholder that maps any
non-empty token to a tenant — see Roadmap.)
# Enqueue a job
curl -s -X POST http://localhost:8000/api/v1/jobs \
-H 'Authorization: Bearer dev' -H 'Content-Type: application/json' \
-d '{"task_name":"echo","payload":{"hello":"world"},"config":{"priority":5,"max_retries":3,"timeout":30}}'
# => {"job_id":"…"}
# Fetch it
curl -s http://localhost:8000/api/v1/jobs/<id> -H 'Authorization: Bearer dev'
# List pending jobs
curl -s 'http://localhost:8000/api/v1/jobs?status=pending&limit=20' -H 'Authorization: Bearer dev'
# Create a workflow
curl -s -X POST http://localhost:8000/api/v1/workflows \
-H 'Authorization: Bearer dev' -H 'Content-Type: application/json' \
-d '{"name":"etl","steps":[
{"name":"extract","task_name":"echo"},
{"name":"transform","task_name":"echo","depends_on":["extract"]},
{"name":"load","task_name":"echo","depends_on":["transform"]}
]}'
# Health / metrics
curl -s http://localhost:8000/health
curl -s http://localhost:9090/metrics| Method & path | Description |
|---|---|
POST /api/v1/jobs |
Enqueue a job (send Idempotency-Key to make retries safe) |
POST /api/v1/jobs/batch |
Enqueue up to 1000 jobs in one round trip |
GET /api/v1/jobs |
List jobs (filter by status/queue, paginate; include_total=true for exact counts) |
GET /api/v1/jobs/{id} |
Fetch a job |
GET /api/v1/jobs/{id}/events |
Server-Sent Events stream of status changes until terminal |
POST /api/v1/jobs/{id}/cancel |
Cancel a job (409 if already finished) |
POST /api/v1/queues/{queue}/lease |
Worker protocol: lease jobs (long-poll supported) |
POST /api/v1/jobs/{id}/heartbeat · /complete · /fail |
Worker protocol: extend lease · report outcome |
POST /api/v1/workflows |
Create a workflow |
GET /api/v1/workflows · /{id} · /{id}/cancel · /{id}/diagram |
List / fetch / cancel / diagram |
GET /api/v1/tasks · /stats |
Registered handlers · engine counters (process-local) |
GET /health · /ready · /docs · /openapi.json |
Probes · Swagger UI · spec |
Handlers do not have to be compiled into the server. A worker in any language can drain a queue over HTTP with at-least-once semantics, durable retries, and workflow advancement handled server-side:
# 1. Lease (long-polls up to wait_secs when the queue is empty)
curl -s -X POST http://localhost:8000/api/v1/queues/default/lease \
-H 'Authorization: Bearer dev' -H 'Content-Type: application/json' \
-d '{"max_jobs":1,"lease_secs":60,"wait_secs":20}'
# => {"jobs":[{"job":{...},"lease_token":"550e8400-…"}]}
# 2. (while working) heartbeat to keep the lease; the response carries the
# job's live status, so a mid-run cancellation is observed here
curl -s -X POST http://localhost:8000/api/v1/jobs/<id>/heartbeat \
-H 'Authorization: Bearer dev' -H 'Content-Type: application/json' \
-d '{"lease_token":"550e8400-…","extend_secs":60}'
# => {"status":"running"}
# 3. Report the outcome (or /fail with {"error":"...","retryable":true})
curl -s -X POST http://localhost:8000/api/v1/jobs/<id>/complete \
-H 'Authorization: Bearer dev' -H 'Content-Type: application/json' \
-d '{"lease_token":"550e8400-…","result":{"ok":true}}'In Rust, queueflow-client wraps this in a worker runtime with automatic heartbeating:
use queueflow_client::{Client, Map};
use queueflow_client::worker::{Worker, WorkerOptions};
let client = Client::new("http://localhost:8000", "dev");
Worker::new(client, "default", WorkerOptions::default())
.register("resize-image", |job| async move {
// ... do the work ...
Ok(Map::new())
})
.run()
.await;The queueflow binary doubles as a client for a running server (--server-url /
QUEUEFLOW_SERVER_URL, --token / QUEUEFLOW_TOKEN):
queueflow job create --task echo --payload '{"hello":"world"}' --wait
queueflow job list --status pending --limit 20
queueflow job watch <id>
queueflow workflow create --file etl.json
queueflow workflow diagram <id>
queueflow tasks
queueflow stats| Flag | Env | Default | Meaning |
|---|---|---|---|
--mode |
QUEUEFLOW_MODE |
all |
api, worker, or all |
--database-url |
DATABASE_URL |
— | PostgreSQL connection string |
--api-port |
QUEUEFLOW_API_PORT |
8000 |
REST API port |
--workers |
QUEUEFLOW_WORKERS |
10 |
Workers per queue |
--metrics-port |
QUEUEFLOW_METRICS_PORT |
9090 |
Prometheus port |
--default-queue |
QUEUEFLOW_DEFAULT_QUEUE |
default |
Default queue name |
make test # unit + integration, no services required
make test-pg # opt-in Postgres tests (needs TEST_DATABASE_URL; any plain Postgres 13+)
make clippy # lint (warnings = errors)
make fmt-check # formattingBecause the engine is generic over the ports, the durable-retry, backoff, timeout, dead-letter,
idempotency, and full workflow-orchestration behaviours are all verified deterministically with a
TestClock and in-memory adapters — no Docker, no Postgres. The Postgres adapter is then checked for
parity by the opt-in integration suite.
The OpenAPI spec is generated from the Rust handlers and types (make spec), so it always matches
the server, and it is attached to every GitHub release. The client strategy per language:
-
Rust — the native
queueflow-clientcrate in this workspace. It reuses the engine's own domain types (so it cannot drift), covers every spec operation — the producer surface, the remote worker protocol, the SSE job-event stream, and the health/readiness probes — and is integration-tested against the real router on every CI run. -
TypeScript/Node.js — the hand-written SDK for a more ergonomic developer experience:
qf.jobs.create({ task, payload }), awaitFor()poller, a workflow builder DSL, and typed errors.make check-ts-sdkguards it against spec drift in CI. -
Everything else — generate on demand from the released spec with your own toolchain:
openapi-generator-cli generate -i openapi.json -g python -o ./queueflow-python
make sdks(python, go) remains available for maintained snapshots in the sibling repos, but they are produced at release time rather than continuously published.
make spec # writes spec/openapi.{json,yaml} from the code
make validate-spec # validate via openapi-generator (Docker)
make check-ts-sdk # assert the hand-written TS SDK covers every spec operation- API reference (rustdoc):
cargo doc --open -p queueflow-core - OpenAPI spec:
spec/openapi.yaml/spec/openapi.json, or live at/openapi.jsonand/docs - Runnable examples:
crates/queueflow-core/examples/ - Database schema:
migrations/0001_init.sql - Multi-repo overview:
repository-structure.md
Contributions welcome — these are the planned next steps, roughly in priority order:
- Real authentication — replace the placeholder token check with JWT signature/claims validation
and an API-key store (the seam is
auth::validate_token). Worker-protocol endpoints should get their own credential class (they are deployment infrastructure, not tenant-scoped). - Conditional steps — evaluate a per-step predicate (the
conditioncolumn is reserved but unread today). - Cron jobs — recurring enqueues on a schedule (one-shot
run_atscheduling already ships). - DLQ admin API — list, inspect, and replay dead-lettered jobs.
- Sub-workflows & fan-out — a step that spawns a child workflow or a dynamic batch.
- OpenTelemetry — distributed tracing export alongside the Prometheus metrics.
- Per-tenant rate limiting & quotas.
- Publish — crates.io release and a Helm chart.
git clone https://github.com/sjriddle/queueflow-core
cd queueflow-core
make test && make clippy && make fmt-checkPRs should keep make test, make clippy, and make validate-spec green. New behaviour belongs in a test
against the in-memory adapters; new endpoints/types are reflected in the OpenAPI spec automatically via their
#[utoipa::path] / ToSchema annotations.
- Rust 1.96+
- PostgreSQL 13+ (plain — no extensions; RDS, Cloud SQL, Azure all work)
- Docker (only for
make validate-spec/make sdks/make docker)