chore(proposer): collect proofs from prover service#3112
Conversation
| async fn collect_proofs(&self, state: &mut PipelineState) { | ||
| let targets = state.pending_proofs.keys().copied().collect::<Vec<_>>(); | ||
|
|
||
| for target in targets { | ||
| let Some(session) = state.pending_proofs.get(&target).cloned() else { | ||
| continue; | ||
| }; | ||
|
|
||
| let response = match self | ||
| .proof_requester | ||
| .get_proof(GetProofRequest { session_id: session.session_id.clone() }) |
There was a problem hiding this comment.
Each pending proof is polled sequentially — get_proof is .awaited one at a time in a loop. With max_parallel_proofs sessions pending, this serializes all poll RPCs and tick latency grows linearly with the number of pending sessions.
Consider using futures::stream::iter(...).map(|s| async { ... }).buffer_unordered(n) or JoinSet to poll all pending sessions concurrently, matching the concurrency pattern already used for dispatch tasks.
| ProofStatus::Queued | ProofStatus::Running => { | ||
| debug!( | ||
| target_block = target, | ||
| session_id = %session.session_id, | ||
| status = ?response.status, | ||
| elapsed = ?session.elapsed(), | ||
| "Proof request still pending" | ||
| ); | ||
| } |
There was a problem hiding this comment.
There's no timeout enforcement for pending proof sessions. If the prover service keeps returning Queued/Running indefinitely, the session remains in pending_proofs (and its target in inflight) forever, permanently consuming a proof slot. The started_at/elapsed() fields are recorded but never compared against a deadline.
Consider timing out sessions after a configurable max duration (e.g., reusing the existing prover_timeout config) and routing them through handle_proof_failure so they can be retried or dropped.
| { | ||
| Metrics::safe_head().set(safe_head as f64); | ||
| state.prune_stale(recovered.l2_block_number); | ||
| self.collect_proofs(state).await; |
There was a problem hiding this comment.
collect_proofs is only called inside the if let Some(...) guard that requires try_recover_and_plan to succeed. If the L1 or rollup RPC is transiently down, try_recover_and_plan returns None and completed proofs are never collected — even though the prover service is reachable and may have finished work. This delays proof collection (and downstream submission) until the RPC recovers.
Consider hoisting collect_proofs out of the recovery guard so proof results are collected on every tick regardless of recovery state.
…patcher' into chore/proposer-proof-collector # Conflicts: # crates/proof/proposer/src/pipeline.rs
| state.inflight.remove(&target); | ||
| state.pending_proofs.remove(&target); | ||
| state.retry_counts.remove(&target); | ||
| state.proved.insert(target, proof_result); | ||
| state.record_gauges(); | ||
| info!( | ||
| target_block = target, | ||
| session_id = %session.session_id, | ||
| from_block = session.plan.start_block, | ||
| retry_count = session.retry_count, | ||
| elapsed = ?session.elapsed(), | ||
| "Proof completed successfully" | ||
| ); |
There was a problem hiding this comment.
The proof_duration_seconds histogram (declared in metrics.rs:55) is no longer recorded anywhere. The old code had a timed!(Metrics::proof_duration_seconds()) guard around the prove task, but the dispatch refactor removed it. The session.elapsed() value logged here is the right data source — it should also be recorded to the histogram so the metric continues to work:
| state.inflight.remove(&target); | |
| state.pending_proofs.remove(&target); | |
| state.retry_counts.remove(&target); | |
| state.proved.insert(target, proof_result); | |
| state.record_gauges(); | |
| info!( | |
| target_block = target, | |
| session_id = %session.session_id, | |
| from_block = session.plan.start_block, | |
| retry_count = session.retry_count, | |
| elapsed = ?session.elapsed(), | |
| "Proof completed successfully" | |
| ); | |
| Ok(proof_result) => { | |
| let elapsed = session.elapsed(); | |
| state.inflight.remove(&target); | |
| state.pending_proofs.remove(&target); | |
| state.retry_counts.remove(&target); | |
| state.proved.insert(target, proof_result); | |
| state.record_gauges(); | |
| Metrics::proof_duration_seconds().record(elapsed.as_secs_f64()); | |
| info!( | |
| target_block = target, | |
| session_id = %session.session_id, | |
| from_block = session.plan.start_block, | |
| retry_count = session.retry_count, | |
| elapsed = ?elapsed, | |
| "Proof completed successfully" | |
| ); |
…patcher' into chore/proposer-proof-collector
| fn collectable_targets( | ||
| &self, | ||
| recovered: &RecoveredState, | ||
| safe_head: u64, | ||
| state: &PipelineState, | ||
| ) -> Vec<u64> { | ||
| let mut cursor = | ||
| match recovered.l2_block_number.checked_add(self.config.driver.block_interval) { | ||
| Some(cursor) => cursor, | ||
| None => return Vec::new(), | ||
| }; | ||
| let mut targets = Vec::new(); | ||
|
|
||
| while cursor <= safe_head { | ||
| if !state.proved.contains_key(&cursor) && state.submitting != Some(cursor) { | ||
| targets.push(cursor); | ||
| } | ||
| cursor = match cursor.checked_add(self.config.driver.block_interval) { | ||
| Some(cursor) => cursor, | ||
| None => break, | ||
| }; | ||
| } | ||
|
|
||
| targets | ||
| } |
There was a problem hiding this comment.
collectable_targets includes every block between recovered + interval and safe_head that isn't already in proved or submitting. This means it also includes blocks that haven't been dispatched at all (not in inflight). For those, get_proof will fail with "session not found" and be silently skipped via the debug! + continue branch.
This causes O(safe_head_gap / interval) wasted RPCs per tick. Consider filtering to only poll targets that are in state.inflight:
| fn collectable_targets( | |
| &self, | |
| recovered: &RecoveredState, | |
| safe_head: u64, | |
| state: &PipelineState, | |
| ) -> Vec<u64> { | |
| let mut cursor = | |
| match recovered.l2_block_number.checked_add(self.config.driver.block_interval) { | |
| Some(cursor) => cursor, | |
| None => return Vec::new(), | |
| }; | |
| let mut targets = Vec::new(); | |
| while cursor <= safe_head { | |
| if !state.proved.contains_key(&cursor) && state.submitting != Some(cursor) { | |
| targets.push(cursor); | |
| } | |
| cursor = match cursor.checked_add(self.config.driver.block_interval) { | |
| Some(cursor) => cursor, | |
| None => break, | |
| }; | |
| } | |
| targets | |
| } | |
| while cursor <= safe_head { | |
| if state.inflight.contains(&cursor) | |
| && !state.proved.contains_key(&cursor) | |
| && state.submitting != Some(cursor) | |
| { | |
| targets.push(cursor); | |
| } |
| async fn collect_proofs( | ||
| &self, | ||
| recovered: &RecoveredState, | ||
| safe_head: u64, | ||
| state: &mut PipelineState, | ||
| ) { | ||
| let targets = self.collectable_targets(recovered, safe_head, state); | ||
| let roots = self.fetch_canonical_root_results_with(targets.clone(), false).await; |
There was a problem hiding this comment.
Each get_proof call is .awaited sequentially in this loop. With max_parallel_proofs sessions pending, tick latency grows linearly. Consider using futures::stream::iter(...).map(|t| async { ... }).buffer_unordered(n) or a JoinSet to poll all pending sessions concurrently, matching the concurrency pattern already used for fetch_canonical_root_results_with (which uses .buffered(self.config.recovery_scan_concurrency)).
Review SummaryThe core architecture is sound: dispatch proof requests via Findings1. 2. Sequential proof polling in 3. Non-blocking observations
|
❌ base-std fork tests: 280 failed, 12 passedbase/base diverges from the base-std spec. Failing tests[FAIL: AlreadyActivated(0xecfa0def2c10020caaf65e6155aa69c84b24892aaef76eeac52e0e2b3a0b8601)] setUp() (gas: 0) |
What
Implements P5 as a stacked PR on top of #3107.
prove_block_range.PipelineState::pending_proofs.get_prooffrom the coordinator tick and converts successfulProofResult::Teepayloads back into primitiveProofResult::Tee.provedqueue andtry_submitpath.ProverClientfield/logic in place for now so P6 can remove the transitional sync proving path as a focused cleanup.Test plan
cargo +nightly fmt --all --checkcargo test -p base-proposercargo clippy -p base-proposer --all-targets -- -D warnings