Skip to content

chore(proposer): collect proofs from prover service#3112

Open
leopoldjoy wants to merge 4 commits into
chore/proposer-proof-request-dispatcherfrom
chore/proposer-proof-collector
Open

chore(proposer): collect proofs from prover service#3112
leopoldjoy wants to merge 4 commits into
chore/proposer-proof-request-dispatcherfrom
chore/proposer-proof-collector

Conversation

@leopoldjoy
Copy link
Copy Markdown
Contributor

What

Implements P5 as a stacked PR on top of #3107.

  • Wires the proposer pipeline to dispatch TEE proof requests through prover service with prove_block_range.
  • Tracks accepted prover-service sessions in PipelineState::pending_proofs.
  • Polls get_proof from the coordinator tick and converts successful ProofResult::Tee payloads back into primitive ProofResult::Tee.
  • Preserves sequential L1 submission through the existing proved queue and try_submit path.
  • Keeps the old ProverClient field/logic in place for now so P6 can remove the transitional sync proving path as a focused cleanup.

Test plan

  • cargo +nightly fmt --all --check
  • cargo test -p base-proposer
  • cargo clippy -p base-proposer --all-targets -- -D warnings

Comment thread crates/proof/proposer/src/pipeline.rs Outdated
Comment on lines +567 to +577
async fn collect_proofs(&self, state: &mut PipelineState) {
let targets = state.pending_proofs.keys().copied().collect::<Vec<_>>();

for target in targets {
let Some(session) = state.pending_proofs.get(&target).cloned() else {
continue;
};

let response = match self
.proof_requester
.get_proof(GetProofRequest { session_id: session.session_id.clone() })
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each pending proof is polled sequentially — get_proof is .awaited one at a time in a loop. With max_parallel_proofs sessions pending, this serializes all poll RPCs and tick latency grows linearly with the number of pending sessions.

Consider using futures::stream::iter(...).map(|s| async { ... }).buffer_unordered(n) or JoinSet to poll all pending sessions concurrently, matching the concurrency pattern already used for dispatch tasks.

Comment on lines +593 to +601
ProofStatus::Queued | ProofStatus::Running => {
debug!(
target_block = target,
session_id = %session.session_id,
status = ?response.status,
elapsed = ?session.elapsed(),
"Proof request still pending"
);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no timeout enforcement for pending proof sessions. If the prover service keeps returning Queued/Running indefinitely, the session remains in pending_proofs (and its target in inflight) forever, permanently consuming a proof slot. The started_at/elapsed() fields are recorded but never compared against a deadline.

Consider timing out sessions after a configurable max duration (e.g., reusing the existing prover_timeout config) and routing them through handle_proof_failure so they can be retried or dropped.

Comment thread crates/proof/proposer/src/pipeline.rs Outdated
{
Metrics::safe_head().set(safe_head as f64);
state.prune_stale(recovered.l2_block_number);
self.collect_proofs(state).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect_proofs is only called inside the if let Some(...) guard that requires try_recover_and_plan to succeed. If the L1 or rollup RPC is transiently down, try_recover_and_plan returns None and completed proofs are never collected — even though the prover service is reachable and may have finished work. This delays proof collection (and downstream submission) until the RPC recovers.

Consider hoisting collect_proofs out of the recovery guard so proof results are collected on every tick regardless of recovery state.

…patcher' into chore/proposer-proof-collector

# Conflicts:
#	crates/proof/proposer/src/pipeline.rs
Comment on lines +636 to +648
state.inflight.remove(&target);
state.pending_proofs.remove(&target);
state.retry_counts.remove(&target);
state.proved.insert(target, proof_result);
state.record_gauges();
info!(
target_block = target,
session_id = %session.session_id,
from_block = session.plan.start_block,
retry_count = session.retry_count,
elapsed = ?session.elapsed(),
"Proof completed successfully"
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proof_duration_seconds histogram (declared in metrics.rs:55) is no longer recorded anywhere. The old code had a timed!(Metrics::proof_duration_seconds()) guard around the prove task, but the dispatch refactor removed it. The session.elapsed() value logged here is the right data source — it should also be recorded to the histogram so the metric continues to work:

Suggested change
state.inflight.remove(&target);
state.pending_proofs.remove(&target);
state.retry_counts.remove(&target);
state.proved.insert(target, proof_result);
state.record_gauges();
info!(
target_block = target,
session_id = %session.session_id,
from_block = session.plan.start_block,
retry_count = session.retry_count,
elapsed = ?session.elapsed(),
"Proof completed successfully"
);
Ok(proof_result) => {
let elapsed = session.elapsed();
state.inflight.remove(&target);
state.pending_proofs.remove(&target);
state.retry_counts.remove(&target);
state.proved.insert(target, proof_result);
state.record_gauges();
Metrics::proof_duration_seconds().record(elapsed.as_secs_f64());
info!(
target_block = target,
session_id = %session.session_id,
from_block = session.plan.start_block,
retry_count = session.retry_count,
elapsed = ?elapsed,
"Proof completed successfully"
);

Comment on lines +619 to +643
fn collectable_targets(
&self,
recovered: &RecoveredState,
safe_head: u64,
state: &PipelineState,
) -> Vec<u64> {
let mut cursor =
match recovered.l2_block_number.checked_add(self.config.driver.block_interval) {
Some(cursor) => cursor,
None => return Vec::new(),
};
let mut targets = Vec::new();

while cursor <= safe_head {
if !state.proved.contains_key(&cursor) && state.submitting != Some(cursor) {
targets.push(cursor);
}
cursor = match cursor.checked_add(self.config.driver.block_interval) {
Some(cursor) => cursor,
None => break,
};
}

targets
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collectable_targets includes every block between recovered + interval and safe_head that isn't already in proved or submitting. This means it also includes blocks that haven't been dispatched at all (not in inflight). For those, get_proof will fail with "session not found" and be silently skipped via the debug! + continue branch.

This causes O(safe_head_gap / interval) wasted RPCs per tick. Consider filtering to only poll targets that are in state.inflight:

Suggested change
fn collectable_targets(
&self,
recovered: &RecoveredState,
safe_head: u64,
state: &PipelineState,
) -> Vec<u64> {
let mut cursor =
match recovered.l2_block_number.checked_add(self.config.driver.block_interval) {
Some(cursor) => cursor,
None => return Vec::new(),
};
let mut targets = Vec::new();
while cursor <= safe_head {
if !state.proved.contains_key(&cursor) && state.submitting != Some(cursor) {
targets.push(cursor);
}
cursor = match cursor.checked_add(self.config.driver.block_interval) {
Some(cursor) => cursor,
None => break,
};
}
targets
}
while cursor <= safe_head {
if state.inflight.contains(&cursor)
&& !state.proved.contains_key(&cursor)
&& state.submitting != Some(cursor)
{
targets.push(cursor);
}

Comment on lines +537 to +544
async fn collect_proofs(
&self,
recovered: &RecoveredState,
safe_head: u64,
state: &mut PipelineState,
) {
let targets = self.collectable_targets(recovered, safe_head, state);
let roots = self.fetch_canonical_root_results_with(targets.clone(), false).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each get_proof call is .awaited sequentially in this loop. With max_parallel_proofs sessions pending, tick latency grows linearly. Consider using futures::stream::iter(...).map(|t| async { ... }).buffer_unordered(n) or a JoinSet to poll all pending sessions concurrently, matching the concurrency pattern already used for fetch_canonical_root_results_with (which uses .buffered(self.config.recovery_scan_concurrency)).

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 2, 2026

Review Summary

The core architecture is sound: dispatch proof requests via ProofRequesterDispatcher, track them in inflight, poll completion via collect_proofs, and feed results into the existing provedtry_submit pipeline. The ProofDispatchOutcome enum, handle_dispatch_result, and handle_proof_failure extraction are clean.

Findings

1. collectable_targets polls blocks that were never dispatched (pipeline.rs:619-643)
The function returns every block between recovered + interval and safe_head that isn't in proved or submitting — including blocks with no active session. This results in wasted get_proof RPCs that will always fail with "session not found". Filtering to only poll blocks in state.inflight would eliminate these.

2. Sequential proof polling in collect_proofs (pipeline.rs:537-616)
Each get_proof call is awaited one-at-a-time in a loop. With max_parallel_proofs inflight sessions this serializes all poll RPCs per tick. Consider using buffer_unordered() to match the concurrency pattern already used by fetch_canonical_root_results_with.

3. proof_duration_seconds metric is no longer recorded (noted by prior reviewer)
The timed!(Metrics::proof_duration_seconds()) guard was removed with the old prove path but no replacement was added in the collect path. If P6 is the planned cleanup, tracking this as a known gap would be helpful.

Non-blocking observations

  • prove_tasks JoinSet and prover field are now dead code (never spawned into). PR description notes this is intentional for P6.
  • collect_proofs is only called inside the try_recover_and_plan success guard, so proof collection stalls when L1/rollup RPCs are transiently down even if the prover service has results ready. Worth considering hoisting it out of the guard.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 2, 2026

❌ base-std fork tests: 280 failed, 12 passed

base/base diverges from the base-std spec.

Failing tests

[FAIL: AlreadyActivated(0xecfa0def2c10020caaf65e6155aa69c84b24892aaef76eeac52e0e2b3a0b8601)] setUp() (gas: 0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant