Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions crates/blockchain/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{
use ethlambda_crypto::aggregate_proofs;
use ethlambda_state_transition::{
attestation_data_matches_chain, justified_slots_ops, process_block, process_slots,
slot_is_justifiable_after,
};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -338,11 +337,6 @@ fn entry_passes_filters(
{
return Err("target_already_justified");
}
if !is_genesis_self_vote
&& !slot_is_justifiable_after(att_data.target.slot, projected_finalized_slot)
{
return Err("target_not_justifiable");
}
Ok(())
}

Expand Down Expand Up @@ -384,16 +378,14 @@ fn score_entry(
let total = prior_count + new_voters.len();
let crosses_2_3 = 3 * total >= 2 * validator_count;

// 3SF-mini finalization requires the source to lie past the finalized
// boundary (a source at or behind it is already final and must not
// re-finalize) and no slot strictly between source.slot and target.slot to
// still be justifiable (so source and target are consecutive justified
// checkpoints in the projected post-state). Mirrors `try_finalize` in the
// state transition.
// The simple BFT finality condition finalizes the source when it lies past
// the finalized boundary (a source at or behind it is already final and must
// not re-finalize) and the target is its immediate successor, so the two are
// consecutive justified checkpoints in the projected post-state. Mirrors
// `try_finalize` in the state transition.
let finalizes = crosses_2_3
&& att_data.source.slot > projected_finalized_slot
&& (att_data.source.slot + 1..att_data.target.slot)
.all(|s| !slot_is_justifiable_after(s, projected_finalized_slot));
&& att_data.source.slot + 1 == att_data.target.slot;

let tier = if is_genesis_self_vote(att_data) || !crosses_2_3 {
Tier::Build
Expand Down
17 changes: 13 additions & 4 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant, SystemTime};

use ethlambda_network_api::{BlockChainToP2PRef, InitP2P};
use ethlambda_state_transition::is_proposer;
use ethlambda_state_transition::{is_heartbeat_committee_member, is_proposer};
use ethlambda_storage::{ALL_TABLES, Store};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -430,9 +430,18 @@ impl BlockChainServer {

// Publish to gossip network
if let Some(ref p2p) = self.p2p {
let _ = p2p.publish_attestation(signed_attestation).inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
let _ = p2p
.publish_attestation(signed_attestation.clone())
.inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
let head_state = self.store.head_state();
let num_validators = head_state.validators.len() as u64;
if is_heartbeat_committee_member(validator_id, slot, num_validators) {
let _ = p2p.publish_heartbeat_attestation(signed_attestation).inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
}
info!(%slot, %validator_id, "Published attestation");
}
}
Expand Down
41 changes: 17 additions & 24 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use ethlambda_state_transition::{is_proposer, slot_is_justifiable_after};
use ethlambda_state_transition::{is_heartbeat_committee_member, is_proposer};
use ethlambda_storage::{ForkCheckpoints, Store};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -39,10 +39,10 @@ fn accept_new_attestations(store: &mut Store, log_tree: bool) {
/// fork choice tree to the terminal.
pub fn update_head(store: &mut Store, log_tree: bool) {
let blocks = store.get_live_chain();
let attestations = store.extract_latest_known_attestations();
let attestations = store.get_last_slot_votes();
let old_head = store.head();
let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
store.safe_target(),
&blocks,
&attestations,
0,
Expand Down Expand Up @@ -106,13 +106,11 @@ pub fn update_head(store: &mut Store, log_tree: bool) {
/// evidence even when live participation has collapsed: exactly the failure
/// mode safe target is supposed to prevent. See leanSpec PR #680.
fn update_safe_target(store: &mut Store) {
let head_state = store.get_state(&store.head()).expect("head state exists");
let num_validators = head_state.validators.len() as u64;

let min_target_score = (num_validators * 2).div_ceil(3);

let blocks = store.get_live_chain();
let attestations = store.extract_latest_new_attestations();
let attestations = store.get_last_period_votes();
// Use a 2/3 threshold of the number of voting validators
let min_target_score = (attestations.len() as u64 * 2).div_ceil(3);

let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
&blocks,
Expand Down Expand Up @@ -356,6 +354,12 @@ pub fn on_gossip_attestation(
}
metrics::inc_pq_sig_attestation_signatures_valid();

let num_validators = target_state.validators.len() as u64;
// If the validator is in the heartbeat committee, persist the vote for fork choice usage.
if is_heartbeat_committee_member(validator_id, attestation.data.slot, num_validators) {
store.insert_heartbeat_vote(validator_id, attestation.data.clone());
}

// Only aggregators persist the signature for later aggregation at
// interval 2. Non-aggregators drop the validated attestation — they
// still participate in the mesh so peers see the message propagate.
Expand Down Expand Up @@ -626,7 +630,10 @@ pub fn get_attestation_target(store: &Store) -> Checkpoint {
pub fn get_attestation_target_with_checkpoints(
store: &Store,
justified: Checkpoint,
finalized: Checkpoint,
// Unused under the simple BFT finality condition (every slot is justifiable,
// so the target no longer needs a justifiability walk-back). Kept on the
// signature pending the finality redesign.
_finalized: Checkpoint,
) -> Checkpoint {
// Start from current head
let mut target_block_root = store.head();
Expand Down Expand Up @@ -654,20 +661,6 @@ pub fn get_attestation_target_with_checkpoints(
}
}

let finalized_slot = finalized.slot;

// Ensure target is in justifiable slot range
//
// Walk back until we find a slot that satisfies justifiability rules
// relative to the latest finalized checkpoint.
while target_header.slot > finalized_slot
&& !slot_is_justifiable_after(target_header.slot, finalized_slot)
{
target_block_root = target_header.parent_root;
target_header = store
.get_block_header(&target_block_root)
.expect("parent block exists");
}
// Guard: clamp target to justified (not in the spec).
//
// The spec's walk-back has no lower bound, so it can produce attestations
Expand Down
67 changes: 18 additions & 49 deletions crates/blockchain/state_transition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tracing::{info, warn};
pub mod justified_slots_ops;
pub mod metrics;

pub const HEARTBEAT_COMMITTEE_SIZE: usize = 4;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("target slot {target_slot} is in the past (current is {current_slot})")]
Expand Down Expand Up @@ -231,6 +233,21 @@ pub fn is_proposer(validator_index: u64, slot: u64, num_validators: u64) -> bool
current_proposer(slot, num_validators) == Some(validator_index)
}

/// Check if a validator is part of the heartbeat committee for a given slot.
///
/// The heartbeat committee is formed by the proposer and the next N validators.
pub fn is_heartbeat_committee_member(validator_index: u64, slot: u64, num_validators: u64) -> bool {
let Some(proposer) = current_proposer(slot, num_validators) else {
return false;
};
for i in 0..HEARTBEAT_COMMITTEE_SIZE as u64 {
if validator_index == (proposer + i) % num_validators {
return true;
}
}
false
}

/// Apply attestations and update justification/finalization
/// according to the Lean Consensus 3SF-mini rules.
fn process_attestations(
Expand Down Expand Up @@ -396,11 +413,6 @@ fn is_valid_vote(state: &State, data: &AttestationData) -> bool {
return false;
}

// Ensure the target falls on a slot that can be justified after the finalized one.
if !slot_is_justifiable_after(target.slot, state.latest_finalized.slot) {
return false;
}

true
}

Expand All @@ -426,9 +438,7 @@ fn try_finalize(
}

// Consider whether finalization can advance.
if ((source.slot + 1)..target.slot)
.any(|slot| slot_is_justifiable_after(slot, state.latest_finalized.slot))
{
if source.slot + 1 != target.slot {
metrics::inc_finalizations("error");
return;
}
Expand Down Expand Up @@ -537,47 +547,6 @@ pub fn attestation_data_matches_chain(
&& historical_block_hashes[head_slot] == data.head.root
}

/// Checks if the slot is a valid candidate for justification after a given finalized slot.
///
/// According to the 3SF-mini specification, a slot is justifiable if its
/// distance (`delta`) from the last finalized slot is:
/// 1. Less than or equal to 5.
/// 2. A perfect square (e.g., 9, 16, 25...).
/// 3. A pronic number (of the form x^2 + x, e.g., 6, 12, 20...).
///
/// See https://github.com/ethereum/research/blob/c003fe1c1a785797e7b53e3cbf9569b989be6e93/3sf-mini/consensus.py#L52-L54
/// for the 3SF-mini reference.
///
/// For why we have unjustifiable slots, consider that in high-latency
/// scenarios, validators may vote for many different slots, making none of them
/// reach the supermajority threshold. By having unjustifiable slots, we can
/// funnel votes towards only some slots, increasing finalization chances.
pub fn slot_is_justifiable_after(slot: u64, finalized_slot: u64) -> bool {
let Some(delta) = slot.checked_sub(finalized_slot) else {
// Candidate slot must not be before finalized slot
return false;
};
// Rule 1: The first 5 slots after finalization are always justifiable.
//
// Examples: delta = 0, 1, 2, 3, 4, 5
delta <= 5
// Rule 2: Slots at perfect square distances are justifiable.
//
// Examples: delta = 1, 4, 9, 16, 25, 36, 49, 64, ...
// Check: integer square root squared equals delta
|| delta.isqrt().pow(2) == delta
// Rule 3: Slots at pronic number distances are justifiable.
//
// Pronic numbers have the form n(n+1): 2, 6, 12, 20, 30, 42, 56, ...
// Mathematical insight: For pronic delta = n(n+1), we have:
// 4*delta + 1 = 4n(n+1) + 1 = (2n+1)^2
// Check: 4*delta+1 is an odd perfect square
|| delta
.checked_mul(4)
.and_then(|v| v.checked_add(1))
.is_some_and(|val| val.isqrt().pow(2) == val && val % 2 == 1)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 4 additions & 0 deletions crates/net/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use spawned_concurrency::protocol;
pub trait BlockChainToP2P: Send + Sync {
fn publish_block(&self, block: SignedBlock) -> Result<(), ActorError>;
fn publish_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>;
fn publish_heartbeat_attestation(
&self,
attestation: SignedAttestation,
) -> Result<(), ActorError>;
fn publish_aggregated_attestation(
&self,
attestation: SignedAggregatedAttestation,
Expand Down
28 changes: 26 additions & 2 deletions crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
attestation_subnet_topic,
},
};
use crate::{P2PServer, metrics};
use crate::{P2PServer, gossipsub::messages::HEARTBEAT_TOPIC_KIND, metrics};

pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let Event::Message {
Expand Down Expand Up @@ -95,7 +95,10 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
);
}
}
Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => {
Some(kind)
if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX)
|| kind == HEARTBEAT_TOPIC_KIND =>
{
info!(kind = "attestation", peer_count, "P2P message received");
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
Expand Down Expand Up @@ -196,6 +199,27 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) {
);
}

pub async fn publish_heartbeat_attestation(server: &mut P2PServer, attestation: SignedAttestation) {
let slot = attestation.data.slot;
let validator = attestation.validator_id;

// Encode to SSZ
let ssz_bytes = attestation.to_ssz();

// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

// Publish to gossipsub
server
.swarm_handle
.publish(server.heartbeat_topic.clone(), compressed);
info!(
%slot,
validator,
"Published heartbeat attestation to gossipsub"
);
}

pub async fn publish_aggregated_attestation(
server: &mut P2PServer,
attestation: SignedAggregatedAttestation,
Expand Down
11 changes: 11 additions & 0 deletions crates/net/p2p/src/gossipsub/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub const FORK_DIGEST: &str = "12345678";

/// Topic kind for block gossip
pub const BLOCK_TOPIC_KIND: &str = "block";

/// Topic kind for heartbeat gossip
pub const HEARTBEAT_TOPIC_KIND: &str = "heartbeat";

/// Topic kind prefix for per-committee attestation subnets.
///
/// Full topic format: `/leanconsensus/{FORK_DIGEST}/attestation_{subnet_id}/ssz_snappy`
Expand Down Expand Up @@ -38,3 +42,10 @@ pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic
"/leanconsensus/{FORK_DIGEST}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy"
))
}

/// Build a heartbeat gossipsub topic.
pub fn heartbeat_topic() -> libp2p::gossipsub::IdentTopic {
libp2p::gossipsub::IdentTopic::new(format!(
"/leanconsensus/{FORK_DIGEST}/{HEARTBEAT_TOPIC_KIND}/ssz_snappy"
))
}
3 changes: 2 additions & 1 deletion crates/net/p2p/src/gossipsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ mod messages;
pub use encoding::decompress_message;
pub use handler::{
handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block,
publish_heartbeat_attestation,
};
pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic};
pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic, heartbeat_topic};
Loading
Loading