Skip to content

Selithrarion/tiny-hyperindex

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

tiny indexer

inspired by Envio's hyperindex
a dev tool that uses extremely fast hypersync custom protocol instead of json-rpc
and which using codegen to transform a simple handler file (written in ReScript/JS) into a rust indexer
im doing only core logic here, no codegen

pulls raw event data from hypersync (ingestion module)
using postgres' UNNEST to quickly insert data in db (storage module)
then using websockets for a realtime connection (realtime module)
inserting in events table and decoding raw data to typed tables transfers/swaps and looking for reorgs
using (tx_hash, log_index) for primary key
exposing a simple axum rest api with health events status last block endpoints (keyset pagination for events)

skipping db optimizations like postgres partitions
envio uses history table with change_type. so able to do reorgs step by step but i have only a single table here
deep reorgs are not supported (if reorg is deeper than our reorg tracker buffer)
no is_finalized logic
have only transfer decoder, have to implement more

high level

---
config:
  look: neo
  theme: mc
---
graph TD
    subgraph "Data Sources"
        HS["HyperSync<br/>(Historical, one-time)"]
        WS["RPC WebSocket<br/>(Real-time, persistent)"]
    end

    subgraph "Indexer Core"
        ING["Ingestion Layer<br/>(hypersync.rs / realtime.rs)"]
        MPSC["mpsc::channel<br/>BlockBatch buffer"]
        BCAST["broadcast::channel<br/>ControlSignal"]
        WL["Storage Writer<br/>writer_loop"]
        RT["ReorgTracker"]
    end

    subgraph "Database"
        PG[("PostgreSQL<br/>indexed_blocks<br/>events<br/>transfers")]
    end

    subgraph "API Layer"
        API["REST API (axum)<br/>/status /events /blocks"]
    end

    %% Data flow
    HS -->|"BlockBatch<br/>(fast historical)"| ING
    WS -->|"Block headers + events<br/>(real-time)"| ING

    ING -->|"BlockBatch"| MPSC
    ING -->|"ControlSignal::Reorg"| BCAST

    %% Reorg handling
    WS -->|"parent_hash check"| RT
    RT -->|"Reorg detected"| BCAST

    %% Writing flow
    MPSC -->|"BlockBatch"| WL
    BCAST -->|"ControlSignal<br/>(Reorg / Shutdown)"| WL

    WL -->|"DB Transaction<br/>(INSERT / UPSERT / DELETE on reorg)"| PG

    %% Startup & API
    PG -->|"last indexed block<br/>(on startup)"| ING
    PG -->|"query results"| API
Loading

low level

---
config:
  look: neo
  theme: mc
---
graph TD
    %% ==================== 1. STARTUP ====================
    subgraph "1. Startup (main())"
        direction TB
        M["main()"]
        PG[("PostgreSQL")]
        
        M -- "1. connect + run migrations" --> PG
        PG -- "2. SELECT MAX(block_number)" --> M
        M -- "3. spawn writer_loop" --> WL["writer_loop\ntokio::spawn"]
        M -- "4. spawn API server" --> API["api::serve\ntokio::spawn"]
        M -- "5. start historical sync" --> HS["hypersync::run()"]
        HS -- "6. historical done → start realtime" --> RT["realtime::run()"]
    end

    %% ==================== 2. HAPPY PATH ====================
    subgraph "2. Happy Path - Normal Block Ingestion"
        direction TB
        SRC["Source (HyperSync / Realtime)"]
        CHAN["mpsc::channel<BlockBatch>"]
        PB["process_batch()"]
        TX["DB Transaction"]
        
        SRC -- "BlockBatch<br/>{blocks, events}" --> CHAN
        CHAN -- "recv()" --> PB
        PB -- "BEGIN TRANSACTION" --> TX
        
        TX -- "bulk_insert_blocks (UNNEST)" --> IB[("indexed_blocks")]
        TX -- "bulk_insert_events (UNNEST)" --> EV[("events")]
        TX -- "decode & bulk_insert_transfers<br/>ON CONFLICT DO NOTHING" --> TR[("transfers")]
        TX -- "COMMIT" --> DONE["✓ Batch processed"]
    end

    %% ==================== 3. REORG PATH ====================
    subgraph "3. Reorg Path"
        direction TB
        REORG["Realtime detects<br/>parent_hash mismatch"]
        BCAST["broadcast::channel<ControlSignal>"]
        HR["handle_reorg()"]
        RTX["DB Transaction"]
        
        REORG -- "ControlSignal::Reorg<br/>{revert_to: u64}" --> BCAST
        BCAST -- "recv()" --> HR
        HR -- "BEGIN TRANSACTION" --> RTX
        
        RTX -- "DELETE FROM indexed_blocks WHERE number > N" --> IB2[("indexed_blocks")]
        RTX -- "DELETE FROM events WHERE block_number > N" --> EV2[("events")]
        RTX -- "DELETE FROM transfers<br/>WHERE event_id IN (deleted events)" --> TR2[("transfers")]
        
        RTX -- "COMMIT" --> RDONE["✓ Reorg completed<br/>ReorgTracker.revert_to(N)"]
    end
    
    %% ==================== 4. DEEP REORG ====================
     subgraph "4. Deep Reorg Path (Recovery Mode)"
         direction TB
         D_REORG["Realtime detects<br/>DeepReorgSuspected"]
         D_RTX[("PostgreSQL")]
         D_BCAST["broadcast::channel<ControlSignal>"]
 
         D_REORG -- "1. Loop backwards from current block" --> D_REORG
         D_REORG -- "2. SELECT hash FROM indexed_blocks<br/>WHERE number = N" --> D_RTX
         D_RTX -- "3. Found common ancestor?" --> D_REORG
         D_REORG -- "4. sends ControlSignal::Reorg" --> D_BCAST
     end

    WL -.->|"writes batches"| CHAN
    BCAST -.->|"sends reorg signal"| HR
Loading

writer loop

---
config:
  look: neo
  theme: mc
---
graph TD
    WL["writer_loop()"]
    
    SEL["tokio::select!"]
    ACC["accumulate batches\n(Vec<BlockBatch>)"]
    FLUSH["flush accumulated batches"]
    PB["process_batch()"]
    HR["handle_reorg()"]

    WL --> SEL

    SEL -- "mpsc recv → BlockBatch" --> ACC
    SEL -- "broadcast recv → Reorg" --> HR
    SEL -- "timeout (500ms)" --> FLUSH

    ACC -- "size >= batch_size" --> FLUSH
    FLUSH --> PB
    HR --> PB
    PB --> WL
Loading

About

eth event indexer from scratch with hypersync

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages