Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ process, no Kafka.
| Relay outbox rows to Kafka / RabbitMQ / NATS / Redis | [Relay to Kafka / RabbitMQ / NATS](usage/relay.md) |
| Understand the architecture before adopting | [How it works](introduction/how-it-works.md) |
| Compare against CDC / Kafka transactions / a hand-rolled outbox | [Comparison](concepts/comparison.md) |
| Deploy to production safely | [Production checklist](operations/checklist.md) |
| Install and write the first publisher / subscriber | [Installation](introduction/installation.md) → [Basic usage](usage/basic.md) |

## Documentation
Expand Down
238 changes: 238 additions & 0 deletions docs/operations/alembic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
# Alembic migrations

The package never creates or migrates your schema — that's
[Alembic](https://alembic.sqlalchemy.org/)'s job. This page shows what
Alembic produces against `make_outbox_table()` and
`make_dlq_table()`, and gives recipes for drift detection and DLQ
retention via partition rotation.

## Initial migration

Add `make_outbox_table(metadata, table_name="outbox")` to whatever
`MetaData` your Alembic `env.py` exposes as `target_metadata`, then
run `alembic revision --autogenerate -m "outbox"`. The captured
`upgrade()` body (SQLAlchemy 2.0.50, Alembic 1.18.4, Postgres 17,
against an empty schema):

```python
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('outbox',
sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True),
sa.Column('attempts_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('deliveries_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('next_attempt_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('first_attempt_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_attempt_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('acquired_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('acquired_token', sa.Uuid(), nullable=True),
sa.Column('timer_id', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index('outbox_lease_idx', 'outbox', ['queue', 'acquired_at'], unique=False,
postgresql_where=sa.text('acquired_token IS NOT NULL'))
op.create_index('outbox_pending_idx', 'outbox', ['queue', 'next_attempt_at'], unique=False,
postgresql_where=sa.text('acquired_token IS NULL'))
op.create_index('outbox_timer_id_uq', 'outbox', ['queue', 'timer_id'], unique=True,
postgresql_where=sa.text('timer_id IS NOT NULL'))
# ### end Alembic commands ###
```

The three indexes carry **load-bearing partial predicates**:

- `outbox_pending_idx` — `(queue, next_attempt_at) WHERE acquired_token IS NULL`.
Branch A of the fetch CTE (unleased rows) is served by this index. The
WHERE clause is written so Postgres' planner recognizes the implied
predicate and uses the partial index; drop the predicate and the planner
falls back to a seq-scan as the table grows.
- `outbox_lease_idx` — `(queue, acquired_at) WHERE acquired_token IS NOT NULL`.
Branch B of the fetch CTE (expired-lease reclaim) is served by this
index. Same story: predicate is load-bearing for fetch performance.
- `outbox_timer_id_uq` — unique `(queue, timer_id) WHERE timer_id IS NOT NULL`.
Backs the `timer_id` dedup contract via
`pg_insert(...).on_conflict_do_nothing(...)`. Without the partial
predicate, the unique constraint applies to all rows and breaks
non-timer publishes.

Columns the operator can mostly ignore: `attempts_count` and
`deliveries_count` (broker book-keeping for "handler invocations" vs
"claims including expired-lease re-claims"), `first_attempt_at` and
`last_attempt_at` (debugging aids on retry-heavy rows). All four are
maintained by the broker; no application code touches them.

The `# please adjust!` comment from Alembic is misleading here —
**don't adjust**. The column types, predicates, and indexes are
exactly what the broker depends on. The
[`validate_schema()`](../usage/schema-validation.md) check will refuse
to start a service whose live DB drifts from this declaration.

## Adding the DLQ after the fact

To opt into [Dead-letter queue](../usage/dlq.md) audit, add
`make_dlq_table(metadata, table_name="outbox_dlq")` to your
`MetaData` and run `alembic revision --autogenerate -m "outbox-dlq"`.
The captured `upgrade()` body:

```python
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('outbox_dlq',
sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column('original_id', sa.BigInteger(), nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True),
sa.Column('deliveries_count', sa.BigInteger(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('failed_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('failure_reason', sa.String(length=64), nullable=False),
sa.Column('last_exception', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index('outbox_dlq_queue_failed_idx', 'outbox_dlq', ['queue', 'failed_at'], unique=False)
# ### end Alembic commands ###
```

This is **purely additive**: no `op.alter_table` against the outbox
table itself, no column add, no constraint flip. The runtime change
that activates the DLQ is the broker's
[atomicity CTE](../usage/dlq.md#atomicity) (DELETE … RETURNING → INSERT
INTO `<dlq>`), driven by `OutboxBroker(..., dlq_table=…)`, not by
schema state.

The non-unique `outbox_dlq_queue_failed_idx` on `(queue, failed_at)`
supports "show me recent failures for queue X" queries and
double-duties as the pruning index when [§ DLQ retention via partition
drop](#dlq-retention-via-partition-drop) converts the table to
partitioned.

## Drift detection in CI

Run [`validate_schema()`](../usage/schema-validation.md) after
`alembic upgrade head` in your CI pipeline. A small standalone
script:

```python
import asyncio

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine

from faststream_outbox import OutboxBroker, make_outbox_table


async def main() -> None:
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://...")
broker = OutboxBroker(engine, outbox_table=outbox_table)
await broker.validate_schema()


asyncio.run(main())
```

Non-zero exit on drift; CI fails before "deploy".

This check is **opt-in for `/health`** and not always-on at
`broker.start()`. The reason: a running migration plus an always-on
validator would race. Operators must be able to roll forward a new
schema version without spinning every pod into a crash loop. The drift
check belongs *between* `alembic upgrade head` and the deploy step,
not inside the running service.

If you also pass `dlq_table=make_dlq_table(metadata)` when
constructing the broker, `validate_schema()` checks both tables in
one call and surfaces drift on either one.

## DLQ retention via partition drop { #dlq-retention-via-partition-drop }

Plain `DELETE FROM outbox_dlq WHERE failed_at < now() - interval '90
days'` works fine for low-volume DLQs (< ~1 GB / month) and needs no
schema change. For higher volume, converting the DLQ to a
range-partitioned table by `failed_at` lets you **drop entire
partitions** instead of deleting row by row — orders-of-magnitude
faster, no vacuum debt.

### One-time migration to partitioned DLQ

```python
# ### Convert outbox_dlq to range-partitioned by failed_at ###

# Rename the existing table out of the way.
op.rename_table('outbox_dlq', 'outbox_dlq_old')

# Create the partitioned parent. The PRIMARY KEY must include the
# partition key, so we extend it to (id, failed_at).
op.execute("""
CREATE TABLE outbox_dlq (
id BIGSERIAL NOT NULL,
original_id BIGINT NOT NULL,
queue VARCHAR(255) NOT NULL,
payload BYTEA NOT NULL,
headers JSONB,
deliveries_count BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
failure_reason VARCHAR(64) NOT NULL,
last_exception TEXT,
PRIMARY KEY (id, failed_at)
) PARTITION BY RANGE (failed_at);
""")

op.execute("""
CREATE INDEX outbox_dlq_queue_failed_idx
ON outbox_dlq (queue, failed_at);
""")

# Create initial partitions covering recent + current month.
op.execute("""
CREATE TABLE outbox_dlq_2026_05 PARTITION OF outbox_dlq
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
CREATE TABLE outbox_dlq_2026_06 PARTITION OF outbox_dlq
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
""")

# Move surviving rows into the partitioned table; rows older than the
# retention window stay in outbox_dlq_old (or get dropped explicitly).
op.execute("""
INSERT INTO outbox_dlq
SELECT * FROM outbox_dlq_old
WHERE failed_at >= '2026-05-01';
""")

op.drop_table('outbox_dlq_old')
```

`validate_schema()` continues to work against the partitioned table —
Alembic's autogenerate ignores partition boundaries when comparing
column shape.

### Monthly cron: create next, drop oldest

Run from a Postgres cron (or pgAgent, or a sidecar job) once a month.
Adjust the retention window to taste:

```sql
DO $$
DECLARE
next_month DATE := date_trunc('month', now() + interval '1 month');
next_month_end DATE := next_month + interval '1 month';
drop_month DATE := date_trunc('month', now() - interval '12 months');
next_name TEXT := 'outbox_dlq_' || to_char(next_month, 'YYYY_MM');
drop_name TEXT := 'outbox_dlq_' || to_char(drop_month, 'YYYY_MM');
BEGIN
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF outbox_dlq
FOR VALUES FROM (%L) TO (%L);',
next_name, next_month, next_month_end
);
EXECUTE format('DROP TABLE IF EXISTS %I;', drop_name);
END $$;
```

The pattern is "always have next month's partition before any row in
it could land" + "drop everything older than the retention window."
Both operations are O(1) regardless of DLQ row count.
83 changes: 83 additions & 0 deletions docs/operations/checklist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Production checklist

Scannable scaffold of pre-launch checks. Each item is one to two lines;
the link points at the existing reference page that owns the full
story.

## Sizing

- [ ] **Engine pool ≥ `Σ subs × (max_workers + 1)`** — every
subscriber holds `max_workers + 1` SQLAlchemy pool connections (one
writer per worker + one fetch) plus one raw asyncpg connection for
`LISTEN`. Sub-budget formula in [Subscriber § Connection
budget](../usage/subscriber.md#connection-budget).
- [ ] **Postgres `max_connections` ≥ `replicas × Σ subs × (max_workers + 1)`**
— the formula is per-process; rolling deploys multiply it.
Failure mode: pods refuse with `FATAL: too many connections`.

## Subscribers

- [ ] **`lease_ttl_seconds` > handler P99 with margin** — otherwise
healthy in-flight handlers race their own lease expiry. The lease
cutoff is server-side `make_interval(...)`, immune to clock skew.
Tuning: [Subscriber § Slow handlers — dedicated
queue](../usage/subscriber.md#slow-handlers-dedicated-queue).
- [ ] **Slow handlers segregated** onto their own subscriber with a
taller `lease_ttl_seconds`. Don't raise it globally — that delays
reclaim of *actually* stuck rows everywhere.
- [ ] **`max_deliveries` set** (or knowingly unbounded). Default is
unbounded; pair with a non-`NoRetry()` retry strategy or
wedge-prone handlers can replay forever.
- [ ] **Retry strategy chosen.** Default
`ExponentialRetry(initial=1, multiplier=2, max=300, attempts=10,
jitter=0.2)` is fine for most. Opt into `NoRetry()` explicitly for
an audit feed.

## DLQ

- [ ] **`dlq_table=` configured** — opt-in but recommended for any
service where terminal failures need forensic recovery. See
[Dead-letter queue](../usage/dlq.md).
- [ ] **Alert on `nacked_terminal` rate vs `dlq_written` divergence**
— persistent divergence means either DLQ schema drift (CTE rolls
back) or `lease_ttl_seconds` too low. See [DLQ § Metric:
dlq_written](../usage/dlq.md#metric-dlq_written).
- [ ] **DLQ retention plan.** Partition by `failed_at` + cron-drop old
partitions, or a simple `DELETE … WHERE failed_at < interval` cron
for low volume. Walk-through: [Alembic migrations § DLQ retention via
partition drop](./alembic.md#dlq-retention-via-partition-drop).

## Drain & lifecycle

- [ ] **`graceful_timeout` ≥ handler P99 + margin** — otherwise
`OutboxSubscriber.stop()` cancels in-flight work and rows are
reclaimed mid-handler.
- [ ] **Kubernetes `terminationGracePeriodSeconds` ≥ broker
`graceful_timeout`** with margin for the parallel-subscriber drain.
The broker gathers subscriber drains in parallel, but k8s
`SIGKILL`s after the grace period regardless.

## Schema

- [ ] **`/health` calls `validate_schema()`** — opt-in; requires the
`[validate]` extra. Do **not** call at `broker.start()` — that
would crash-loop on a pending migration. See [Schema validation §
Where to call it](../usage/schema-validation.md#where-to-call-it).
- [ ] **Outbox `table_name` ≤ ~56 chars** — NOTIFY channel name is
`outbox_<table_name>`. Postgres' 63-char identifier limit silently
truncates longer names and `LISTEN/NOTIFY` short-circuit degrades
to plain polling.

## Observability

- [ ] **`metrics_recorder` set, native middleware registered, or
both** — the recommended setup is both. See [Observability §
Layering](../usage/observability.md#layering-middleware-seam-vs-recorder-seam).
- [ ] **Alert on `lease_lost` rate** — non-zero means
`lease_ttl_seconds < handler P99` for at least one subscriber. See
[Troubleshooting § `event=lease_lost`](./troubleshooting.md#event-lease_lost-recurring-in-logs).
- [ ] **`LISTEN/NOTIFY` fallback warning checked at startup** — if
the asyncpg connection fails (driver missing, permission error),
the subscriber logs once and falls back to polling. Operator
silently lives with up-to-`max_fetch_interval` idle latency
otherwise.
Loading