inspired by Envio's hyperindex
a dev tool that uses extremely fast hypersync custom protocol instead of json-rpc
and which using codegen to transform a simple handler file (written in ReScript/JS) into a rust indexer
im doing only core logic here, no codegen
pulls raw event data from hypersync (ingestion module)
using postgres' UNNEST to quickly insert data in db (storage module)
then using websockets for a realtime connection (realtime module)
inserting in events table and decoding raw data to typed tables transfers/swaps and looking for reorgs
using (tx_hash, log_index) for primary key
exposing a simple axum rest api with health events status last block endpoints (keyset pagination for events)
skipping db optimizations like postgres partitions
envio uses history table with change_type. so able to do reorgs step by step but i have only a single table here
deep reorgs are not supported (if reorg is deeper than our reorg tracker buffer)
no is_finalized logic
have only transfer decoder, have to implement more
---
config:
look: neo
theme: mc
---
graph TD
subgraph "Data Sources"
HS["HyperSync<br/>(Historical, one-time)"]
WS["RPC WebSocket<br/>(Real-time, persistent)"]
end
subgraph "Indexer Core"
ING["Ingestion Layer<br/>(hypersync.rs / realtime.rs)"]
MPSC["mpsc::channel<br/>BlockBatch buffer"]
BCAST["broadcast::channel<br/>ControlSignal"]
WL["Storage Writer<br/>writer_loop"]
RT["ReorgTracker"]
end
subgraph "Database"
PG[("PostgreSQL<br/>indexed_blocks<br/>events<br/>transfers")]
end
subgraph "API Layer"
API["REST API (axum)<br/>/status /events /blocks"]
end
%% Data flow
HS -->|"BlockBatch<br/>(fast historical)"| ING
WS -->|"Block headers + events<br/>(real-time)"| ING
ING -->|"BlockBatch"| MPSC
ING -->|"ControlSignal::Reorg"| BCAST
%% Reorg handling
WS -->|"parent_hash check"| RT
RT -->|"Reorg detected"| BCAST
%% Writing flow
MPSC -->|"BlockBatch"| WL
BCAST -->|"ControlSignal<br/>(Reorg / Shutdown)"| WL
WL -->|"DB Transaction<br/>(INSERT / UPSERT / DELETE on reorg)"| PG
%% Startup & API
PG -->|"last indexed block<br/>(on startup)"| ING
PG -->|"query results"| API
---
config:
look: neo
theme: mc
---
graph TD
%% ==================== 1. STARTUP ====================
subgraph "1. Startup (main())"
direction TB
M["main()"]
PG[("PostgreSQL")]
M -- "1. connect + run migrations" --> PG
PG -- "2. SELECT MAX(block_number)" --> M
M -- "3. spawn writer_loop" --> WL["writer_loop\ntokio::spawn"]
M -- "4. spawn API server" --> API["api::serve\ntokio::spawn"]
M -- "5. start historical sync" --> HS["hypersync::run()"]
HS -- "6. historical done → start realtime" --> RT["realtime::run()"]
end
%% ==================== 2. HAPPY PATH ====================
subgraph "2. Happy Path - Normal Block Ingestion"
direction TB
SRC["Source (HyperSync / Realtime)"]
CHAN["mpsc::channel<BlockBatch>"]
PB["process_batch()"]
TX["DB Transaction"]
SRC -- "BlockBatch<br/>{blocks, events}" --> CHAN
CHAN -- "recv()" --> PB
PB -- "BEGIN TRANSACTION" --> TX
TX -- "bulk_insert_blocks (UNNEST)" --> IB[("indexed_blocks")]
TX -- "bulk_insert_events (UNNEST)" --> EV[("events")]
TX -- "decode & bulk_insert_transfers<br/>ON CONFLICT DO NOTHING" --> TR[("transfers")]
TX -- "COMMIT" --> DONE["✓ Batch processed"]
end
%% ==================== 3. REORG PATH ====================
subgraph "3. Reorg Path"
direction TB
REORG["Realtime detects<br/>parent_hash mismatch"]
BCAST["broadcast::channel<ControlSignal>"]
HR["handle_reorg()"]
RTX["DB Transaction"]
REORG -- "ControlSignal::Reorg<br/>{revert_to: u64}" --> BCAST
BCAST -- "recv()" --> HR
HR -- "BEGIN TRANSACTION" --> RTX
RTX -- "DELETE FROM indexed_blocks WHERE number > N" --> IB2[("indexed_blocks")]
RTX -- "DELETE FROM events WHERE block_number > N" --> EV2[("events")]
RTX -- "DELETE FROM transfers<br/>WHERE event_id IN (deleted events)" --> TR2[("transfers")]
RTX -- "COMMIT" --> RDONE["✓ Reorg completed<br/>ReorgTracker.revert_to(N)"]
end
%% ==================== 4. DEEP REORG ====================
subgraph "4. Deep Reorg Path (Recovery Mode)"
direction TB
D_REORG["Realtime detects<br/>DeepReorgSuspected"]
D_RTX[("PostgreSQL")]
D_BCAST["broadcast::channel<ControlSignal>"]
D_REORG -- "1. Loop backwards from current block" --> D_REORG
D_REORG -- "2. SELECT hash FROM indexed_blocks<br/>WHERE number = N" --> D_RTX
D_RTX -- "3. Found common ancestor?" --> D_REORG
D_REORG -- "4. sends ControlSignal::Reorg" --> D_BCAST
end
WL -.->|"writes batches"| CHAN
BCAST -.->|"sends reorg signal"| HR
---
config:
look: neo
theme: mc
---
graph TD
WL["writer_loop()"]
SEL["tokio::select!"]
ACC["accumulate batches\n(Vec<BlockBatch>)"]
FLUSH["flush accumulated batches"]
PB["process_batch()"]
HR["handle_reorg()"]
WL --> SEL
SEL -- "mpsc recv → BlockBatch" --> ACC
SEL -- "broadcast recv → Reorg" --> HR
SEL -- "timeout (500ms)" --> FLUSH
ACC -- "size >= batch_size" --> FLUSH
FLUSH --> PB
HR --> PB
PB --> WL