Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions crates/host-rpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ pub enum RpcHostError {
#[error("missing block {0}")]
MissingBlock(BlockNumberOrTag),

/// Walk exhaustion recovery requires a cached finalized block number,
/// but none has been fetched yet.
#[error("no cached finalized block number for exhaustion recovery")]
NoFinalizedBlock,
/// The first block of a backfill batch does not chain to the last
/// emitted block (parent-hash mismatch). A reorg occurred during the
/// gap between exhaustion and backfill.
#[error("backfill continuity break: parent hash mismatch after exhaustion recovery")]
BackfillContinuityBreak,
}
42 changes: 36 additions & 6 deletions crates/host-rpc/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ pub struct RpcHostNotifier<P> {
/// Maximum number of concurrent RPC block fetches.
max_rpc_concurrency: usize,

/// The last block emitted in a notification, tracked as (number, hash).
/// Used as the authoritative delivery position for exhaustion recovery.
last_emitted: Option<(u64, B256)>,

/// Seconds per slot, used for epoch calculation.
slot_seconds: u64,

Expand All @@ -98,6 +102,7 @@ impl<P> core::fmt::Debug for RpcHostNotifier<P> {
.field("buffer_capacity", &self.buffer_capacity)
.field("max_rpc_concurrency", &self.max_rpc_concurrency)
.field("backfill_from", &self.backfill_from)
.field("last_emitted", &self.last_emitted)
.finish_non_exhaustive()
}
}
Expand Down Expand Up @@ -127,6 +132,7 @@ where
backfill_from: None,
backfill_batch_size,
max_rpc_concurrency,
last_emitted: None,
slot_seconds,
genesis_timestamp,
}
Expand Down Expand Up @@ -436,13 +442,14 @@ where
return Ok(None);
}
WalkResult::Exhausted => {
let finalized = self.cached_finalized.ok_or(RpcHostError::NoFinalizedBlock)?;
let resume_from = self.last_emitted.map(|(n, _)| n + 1);
warn!(
buffer_capacity = self.buffer_capacity,
finalized, "walk exhausted buffer, resetting to backfill from finalized"
?resume_from,
"walk exhausted buffer, resetting to backfill from last emitted"
);
self.chain_view.clear();
self.backfill_from = Some(finalized);
self.backfill_from = resume_from;
self.last_tag_epoch = None;
crate::metrics::record_handle_new_head_duration(start.elapsed());
return Ok(None);
Expand All @@ -451,6 +458,7 @@ where
let count = new_chain.len();
let blocks = self.fetch_blocks_by_hash(&new_chain).await?;
self.extend_view(&new_chain);
self.last_emitted = new_chain.last().copied();
info!(blocks = count, "chain advanced");
crate::metrics::inc_blocks_fetched(count as u64, "frontfill");
HostNotificationKind::ChainCommitted { new: Arc::new(RpcChainSegment::new(blocks)) }
Expand All @@ -460,6 +468,7 @@ where
let depth = old_tip.saturating_sub(fork_number) + 1;
let blocks = self.fetch_blocks_by_hash(&new_chain).await?;
self.reorg_view(fork_number, &new_chain);
self.last_emitted = new_chain.last().copied();
info!(depth, fork_number, new_blocks = count, "chain reorged");
crate::metrics::inc_blocks_fetched(count as u64, "frontfill");
crate::metrics::inc_reorgs(depth);
Expand Down Expand Up @@ -490,8 +499,8 @@ where

/// Process a backfill batch if pending.
///
/// Backfills by number up to `(latest - buffer_capacity)` to leave room
/// for hash-based frontfill of recent blocks.
/// Backfills by number up to `(latest - buffer_capacity / 2)` to leave
/// half the buffer depth for hash-based frontfill of recent blocks.
#[tracing::instrument(
level = "info",
skip_all,
Expand Down Expand Up @@ -519,7 +528,7 @@ where
}
};

let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64);
let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64 / 2);
if from > backfill_ceiling {
self.backfill_from = None;
info!("backfill complete, switching to frontfill");
Expand All @@ -538,10 +547,30 @@ where
}
};

// Verify parent-hash continuity with last emitted block.
if let Some((_, expected_parent)) = self.last_emitted
&& let Some(first) = blocks.first()
&& first.parent_hash() != expected_parent
{
warn!(
expected = %expected_parent,
actual = %first.parent_hash(),
"parent hash mismatch after exhaustion recovery"
);
self.chain_view.clear();
crate::metrics::record_backfill_batch(start.elapsed());
return Some(Err(RpcHostError::BackfillContinuityBreak));
}

let view_entries: Vec<(u64, B256)> =
blocks.iter().map(|b| (b.number(), b.hash())).collect();
self.extend_view(&view_entries);

// Update delivery high-water mark.
if let Some(&(n, h)) = view_entries.last() {
self.last_emitted = Some((n, h));
}

let backfill_done = to >= backfill_ceiling;
if backfill_done {
self.backfill_from = None;
Expand Down Expand Up @@ -617,6 +646,7 @@ where

fn set_head(&mut self, block_number: u64) {
self.backfill_from = Some(block_number);
self.last_emitted = None;
}

fn set_backfill_thresholds(&mut self, max_blocks: Option<u64>) {
Expand Down