Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

271 changes: 271 additions & 0 deletions crates/core/datasets-raw/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,274 @@ where
self.0.provider_name()
}
}

/// A [`BlockStreamer`] wrapper that tries each provider once on recoverable errors.
///
/// Wraps an ordered list of providers. On a recoverable error, the system immediately
/// tries the next provider. After all providers have been tried once without success,
/// the last recoverable error is yielded (for `block_stream`) or returned (for
/// `latest_block`).
///
/// Fatal errors always immediately abort the stream regardless of remaining providers.
#[derive(Clone)]
pub struct BlockStreamerWithFallback<T: BlockStreamer>(Vec<T>);

/// Error returned when attempting to create a [`BlockStreamerWithFallback`] with no providers.
///
/// This occurs when `BlockStreamerWithFallback::new` is called with an empty provider list.
/// Callers should ensure at least one provider is available before constructing the wrapper.
#[derive(Debug, thiserror::Error)]
#[error("providers list must not be empty")]
pub struct EmptyProvidersError;

impl<T: BlockStreamer> BlockStreamerWithFallback<T> {
/// Creates a fallback wrapper from an ordered list of providers.
///
/// The first provider in the list is the primary. On a recoverable error, the
/// system immediately tries the next provider. After all providers have been tried
/// once without success, the last recoverable error is yielded/returned.
///
/// Returns an error if `providers` is empty.
pub fn new(providers: Vec<T>) -> Result<Self, EmptyProvidersError> {
if providers.is_empty() {
return Err(EmptyProvidersError);
}
Ok(Self(providers))
}
}

impl<T> BlockStreamer for BlockStreamerWithFallback<T>
where
T: BlockStreamer + Send + Sync,
{
async fn block_stream(
self,
start: BlockNum,
end: BlockNum,
) -> impl Stream<Item = Result<Rows, BlockStreamError>> + Send {
let providers = self.0;
let mut next_block = start;

async_stream::stream! {
// Try each provider in order. On a recoverable error, immediately
// try the next provider. If all providers fail, yield the last error.
let mut last_error = None;
for provider in &providers {
let inner_stream = provider.clone().block_stream(next_block, end).await;
futures::pin_mut!(inner_stream);

// Drain this provider's stream until it ends or errors.
let mut errored = false;
while let Some(row_result) = inner_stream.next().await {
match row_result.as_ref() {
Ok(rows) => {
next_block = rows.block_num() + 1;
yield row_result;
}
Err(BlockStreamError::Fatal(err)) => {
tracing::error!(
block = %next_block,
provider_name = provider.provider_name(),
error = %err,
error_source = monitoring::logging::error_source(err.as_ref()),
"fatal block stream error"
);
yield row_result;
return;
}
Err(BlockStreamError::Recoverable(err)) => {
tracing::warn!(
block = %next_block,
provider_name = provider.provider_name(),
error = %err,
error_source = monitoring::logging::error_source(err.as_ref()),
"block streaming failed on provider"
);
errored = true;
last_error = Some(row_result);
break;
}
}
}

if !errored {
Comment on lines +310 to +343
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug/Logic concern: When provider A succeeds for blocks 0–5, then provider B fails with a recoverable error on block 6, the fallback continues to provider C starting from block 6. But if provider C's stream also ends normally (produces no items because it too has no data past block 5), the loop falls through to the if !errored { return; } path and the stream ends — silently dropping the recoverable error from provider B.

The user never sees that block 6 failed on B, and C simply produced nothing. The last_error from B is never yielded. This could cause the caller to believe all blocks in [start, end] were successfully streamed when in fact the range was incomplete.

Consider: after a provider ends normally with !errored but next_block <= end, should that be treated as a partial success requiring fallback to the next provider rather than a normal completion?

// Stream ended normally (no more blocks). Done.
return;
}
}

// All providers failed for this block. Yield the last error.
if let Some(err) = last_error {
tracing::error!(
block = %next_block,
"all providers exhausted"
);
Comment on lines +349 to +354
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging (logging-errors.md §1, §2): This tracing::error! is missing the error and error_source fields. The last_error variable is in scope and holds the actual BlockStreamError — its details should be included for observability. Without them, operators see "all providers exhausted" with only a block number and no error context.

Suggested change
// All providers failed for this block. Yield the last error.
if let Some(err) = last_error {
tracing::error!(
block = %next_block,
"all providers exhausted"
);
if let Some(Err(ref err)) = last_error {
tracing::error!(
block = %next_block,
error = %err,
error_source = monitoring::logging::error_source(err.as_ref()),
"all providers exhausted"
);
}
if let Some(err) = last_error {
yield err;
}

yield err;
}
}
}

async fn latest_block(
&mut self,
finalized: bool,
) -> Result<Option<BlockNum>, LatestBlockError> {
let mut last_err = None;
for provider in &self.0 {
match provider.clone().latest_block(finalized).await {
Ok(block) => return Ok(block),
Err(err) => {
tracing::warn!(
provider_name = provider.provider_name(),
error = %err,
error_source = monitoring::logging::error_source(err.as_ref()),
"failed to get latest block from provider"
);
last_err = Some(err);
}
}
}
// SAFETY: The constructor guarantees the provider list is non-empty, so the
// loop above always executes at least once and `last_err` is always `Some`.
Err(last_err.expect("providers list is non-empty"))
Comment on lines +379 to +381
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panic branch (errors-handling.md §1, rust-documentation.md): .expect() and direct indexing self.0[0] (in bucket_size/provider_name below) are panic-inducing. The // SAFETY: line comments explain the invariant, but per project guidelines these trait impl methods should have # Panics doc sections.

Alternatively, consider refactoring to eliminate the panic entirely. For example, storing first + rest in the struct would make the non-empty guarantee visible in the type system:

pub struct BlockStreamerWithFallback<T: BlockStreamer> {
    primary: T,
    fallbacks: Vec<T>,
}

This would remove the need for runtime .expect() and [0] indexing entirely.

}

fn bucket_size(&self) -> Option<NonZeroU64> {
// SAFETY: The constructor guarantees the provider list is non-empty.
self.0[0].bucket_size()
}

fn provider_name(&self) -> &str {
// SAFETY: The constructor guarantees the provider list is non-empty.
self.0[0].provider_name()
Comment on lines +384 to +391
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: provider_name() and bucket_size() always delegate to the first (primary) provider. This makes sense for bucket_size() (since all providers for the same dataset kind likely share the same bucket configuration), but is provider_name() semantically correct after a fallback has occurred?

For example, if provider A fails and B succeeds, metrics/logs using provider_name() will still report provider A's name. Consider whether this should reflect the currently-active provider, or if logging the primary is intentional (in which case a doc comment explaining this would help).

}
}

#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};

use futures::StreamExt as _;

use super::*;

/// A minimal [`BlockStreamer`] mock for testing fallback behavior.
#[derive(Clone)]
struct MockStreamer {
name: &'static str,
latest: Result<Option<BlockNum>, &'static str>,
/// If set, `block_stream` yields this single error then ends. The `Arc<Mutex<Option<_>>>`
/// allows the mock to be `Clone` while ensuring only one clone consumes the error.
block_stream_error: Option<Arc<Mutex<Option<BlockStreamError>>>>,
}

impl MockStreamer {
fn succeeds(name: &'static str, block: BlockNum) -> Self {
Self {
name,
latest: Ok(Some(block)),
block_stream_error: None,
}
}

fn fails_recoverable(name: &'static str, msg: &'static str) -> Self {
Self {
name,
latest: Err(msg),
block_stream_error: Some(Arc::new(Mutex::new(Some(
BlockStreamError::Recoverable(msg.into()),
)))),
}
}

fn fails_fatal(name: &'static str, msg: &'static str) -> Self {
Self {
name,
latest: Err(msg),
block_stream_error: Some(Arc::new(Mutex::new(Some(BlockStreamError::Fatal(
msg.into(),
))))),
}
}
}

impl BlockStreamer for MockStreamer {
async fn block_stream(
self,
_start: BlockNum,
_end: BlockNum,
) -> impl Stream<Item = Result<Rows, BlockStreamError>> + Send {
let item = self
.block_stream_error
.and_then(|slot| slot.lock().ok()?.take());
futures::stream::iter(item.map(Err).into_iter())
}

async fn latest_block(
&mut self,
_finalized: bool,
) -> Result<Option<BlockNum>, LatestBlockError> {
self.latest
.map_err(|msg| -> LatestBlockError { msg.into() })
}

fn bucket_size(&self) -> Option<NonZeroU64> {
None
}

fn provider_name(&self) -> &str {
self.name
}
}

#[tokio::test]
async fn fallback_tries_next_provider_on_failure() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing (test-functions.md): Per project guidelines, tests must:

  1. Follow the //* Given, //* When, //* Then comment structure
  2. Use the <function_name>_<scenario>_<expected_outcome> naming convention
  3. Test exactly one function per test

This test exercises both latest_block and block_stream in a single test. Consider splitting into separate tests, e.g.:

  • latest_block_all_providers_fail_except_last_returns_last_successful
  • block_stream_recoverable_errors_falls_through_to_working_provider

let make_providers = || {
vec![
MockStreamer::fails_recoverable("provider_a", "timeout"),
MockStreamer::fails_recoverable("provider_b", "rate limited"),
MockStreamer::succeeds("provider_c", 100),
]
};

// latest_block: should fall through to provider_c.
let mut client =
BlockStreamerWithFallback::new(make_providers()).expect("providers list is non-empty");
let result = client.latest_block(false).await;
assert_eq!(
result.expect("should succeed via third provider"),
Some(100)
);

// block_stream: providers a and b yield recoverable errors, then the
// last recoverable error is yielded when all providers are exhausted
// (provider_c's block_stream returns an empty stream, so it "succeeds"
// with no items -- but after the two recoverable errors, the fallback
// loop reaches c which produces no error, and the stream ends normally).
// We verify no fatal abort occurs and the stream ends.
let client =
BlockStreamerWithFallback::new(make_providers()).expect("providers list is non-empty");
let results: Vec<_> = client.block_stream(0, 10).await.collect().await;
assert!(
results.is_empty(),
"stream should end normally after falling through to a working provider"
);
}

#[tokio::test]
async fn fatal_error_aborts_without_fallback() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing (test-functions.md): Same as the above test — missing //* Given, //* When, //* Then structure, and the name should follow <function_name>_<scenario>_<expected_outcome> format. E.g.: block_stream_fatal_error_aborts_without_trying_next_provider.

// block_stream: fatal error from provider_a should abort immediately
// without trying provider_b.
let providers = vec![
MockStreamer::fails_fatal("provider_a", "invalid data"),
MockStreamer::succeeds("provider_b", 100),
];
let client =
BlockStreamerWithFallback::new(providers).expect("providers list is non-empty");
let results: Vec<_> = client.block_stream(0, 10).await.collect().await;
assert_eq!(results.len(), 1, "should yield exactly one item");
assert!(
matches!(&results[0], Err(BlockStreamError::Fatal(_))),
"should be a fatal error"
);
}
}
1 change: 0 additions & 1 deletion crates/core/providers-registry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ futures.workspace = true
monitoring = { path = "../monitoring" }
object_store.workspace = true
parking_lot = "0.12.4"
rand.workspace = true
thiserror.workspace = true
tokio.workspace = true
toml = { workspace = true, features = ["preserve_order"] }
Expand Down
9 changes: 9 additions & 0 deletions crates/core/providers-registry/src/client/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ pub enum CreateClientError {
#[error("no provider found for kind '{kind}' and network '{network}'")]
ProviderNotFound { kind: String, network: NetworkId },

/// Providers were found but client creation failed for all of them.
///
/// This occurs when providers matching the kind-network pair exist and their
/// environment variable substitution succeeds, but every client initialization
/// fails (e.g., all endpoints are unreachable).
#[error("all provider client creations failed for kind '{kind}' and network '{network}'")]
AllClientCreationsFailed { kind: String, network: NetworkId },
Comment on lines +187 to +193
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error chain (errors-reporting.md §4, §6): AllClientCreationsFailed discards all underlying errors — it contains only kind and network strings. When operators encounter this error, they have no programmatic access to why client creation failed (only the warn-level logs remain).

Consider capturing at least the last error as a #[source] field, or collecting all errors into a Vec. For example:

#[error("all provider client creations failed for kind '{kind}' and network '{network}'")]
AllClientCreationsFailed {
    kind: String,
    network: NetworkId,
    #[source]
    last_error: Box<CreateClientError>,
},


/// Unsupported provider kind.
///
/// This occurs when the provider kind does not match any supported raw dataset type.
Expand All @@ -197,6 +205,7 @@ impl crate::retryable::RetryableErrorExt for CreateClientError {
Self::ConfigParse { .. } => false,
Self::ProviderClient { source, .. } => source.is_retryable(),
Self::ProviderNotFound { .. } => false,
Self::AllClientCreationsFailed { .. } => true,
Self::UnsupportedKind { .. } => false,
}
}
Expand Down
Loading
Loading