Skip to content

feat(datasets-raw): add provider fallback for block streaming#1995

Open
Theodus wants to merge 1 commit intomainfrom
theodus/rpc-fallback
Open

feat(datasets-raw): add provider fallback for block streaming#1995
Theodus wants to merge 1 commit intomainfrom
theodus/rpc-fallback

Conversation

@Theodus
Copy link
Member

@Theodus Theodus commented Mar 19, 2026

Replace random provider selection with deterministic ordering and add BlockStreamerWithFallback to try each provider once on recoverable errors.

  • Add BlockStreamerWithFallback that iterates providers in order, falling back to the next on recoverable errors and aborting on fatal
  • Add find_providers and create_block_stream_clients to ProvidersRegistry returning all matching providers in name order
  • Add AllClientCreationsFailed error variant to distinguish from ProviderNotFound when providers exist but all fail to connect
  • Replace random shuffle with deterministic lexicographic selection
  • Remove rand dependency from providers-registry

Replace random provider selection with deterministic ordering and add
BlockStreamerWithFallback to try each provider once on recoverable errors.
- Add `BlockStreamerWithFallback` that iterates providers in order,
  falling back to the next on recoverable errors and aborting on fatal
- Add `find_providers` and `create_block_stream_clients` to
  `ProvidersRegistry` returning all matching providers in name order
- Add `AllClientCreationsFailed` error variant to distinguish from
  `ProviderNotFound` when providers exist but all fail to connect
- Replace random shuffle with deterministic lexicographic selection
- Remove `rand` dependency from `providers-registry`
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

Key Concerns

1. Missing retry behavior (potential regression)
The previous code wrapped the client with .with_retry() for progressive backoff on recoverable errors. The new BlockStreamerWithFallback tries each provider once then stops. A transient error affecting all providers simultaneously (e.g., brief network blip) will now fail the job instead of retrying. Clarify if this is intentional.

2. Subtle stream termination logic
When a provider partially succeeds then the next fails recoverably, the fallback may silently end the stream without yielding the error if a subsequent provider's stream ends normally but empty. This could result in incomplete block ranges without any error signal to the caller.

3. Error chain breakage
AllClientCreationsFailed discards all underlying errors — operators lose programmatic access to failure causes. Consider preserving at least the last error as a #[source] field.

Other Findings

  • Panic branches: expect() and [0] indexing in trait impls lack # Panics doc sections. Consider a (primary, fallbacks) struct to eliminate panics via types.
  • Logging: "all providers exhausted" error log is missing error/error_source fields despite having the error in scope.
  • Code duplication: find_provider and find_providers share ~90% identical logic; find_provider could delegate to find_providers.
  • Unreachable error path: BlockStreamerWithFallback::new error in job_impl.rs fabricates an AllClientCreationsFailed but should be unreachable given upstream guarantees.
  • Test structure: Tests should follow the project's mandatory Given/When/Then structure, naming convention, and single-function-per-test rule.

Comment on lines +310 to +343
let mut errored = false;
while let Some(row_result) = inner_stream.next().await {
match row_result.as_ref() {
Ok(rows) => {
next_block = rows.block_num() + 1;
yield row_result;
}
Err(BlockStreamError::Fatal(err)) => {
tracing::error!(
block = %next_block,
provider_name = provider.provider_name(),
error = %err,
error_source = monitoring::logging::error_source(err.as_ref()),
"fatal block stream error"
);
yield row_result;
return;
}
Err(BlockStreamError::Recoverable(err)) => {
tracing::warn!(
block = %next_block,
provider_name = provider.provider_name(),
error = %err,
error_source = monitoring::logging::error_source(err.as_ref()),
"block streaming failed on provider"
);
errored = true;
last_error = Some(row_result);
break;
}
}
}

if !errored {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug/Logic concern: When provider A succeeds for blocks 0–5, then provider B fails with a recoverable error on block 6, the fallback continues to provider C starting from block 6. But if provider C's stream also ends normally (produces no items because it too has no data past block 5), the loop falls through to the if !errored { return; } path and the stream ends — silently dropping the recoverable error from provider B.

The user never sees that block 6 failed on B, and C simply produced nothing. The last_error from B is never yielded. This could cause the caller to believe all blocks in [start, end] were successfully streamed when in fact the range was incomplete.

Consider: after a provider ends normally with !errored but next_block <= end, should that be treated as a partial success requiring fallback to the next provider rather than a normal completion?

Comment on lines +349 to +354
// All providers failed for this block. Yield the last error.
if let Some(err) = last_error {
tracing::error!(
block = %next_block,
"all providers exhausted"
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging (logging-errors.md §1, §2): This tracing::error! is missing the error and error_source fields. The last_error variable is in scope and holds the actual BlockStreamError — its details should be included for observability. Without them, operators see "all providers exhausted" with only a block number and no error context.

Suggested change
// All providers failed for this block. Yield the last error.
if let Some(err) = last_error {
tracing::error!(
block = %next_block,
"all providers exhausted"
);
if let Some(Err(ref err)) = last_error {
tracing::error!(
block = %next_block,
error = %err,
error_source = monitoring::logging::error_source(err.as_ref()),
"all providers exhausted"
);
}
if let Some(err) = last_error {
yield err;
}

Comment on lines +379 to +381
// SAFETY: The constructor guarantees the provider list is non-empty, so the
// loop above always executes at least once and `last_err` is always `Some`.
Err(last_err.expect("providers list is non-empty"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panic branch (errors-handling.md §1, rust-documentation.md): .expect() and direct indexing self.0[0] (in bucket_size/provider_name below) are panic-inducing. The // SAFETY: line comments explain the invariant, but per project guidelines these trait impl methods should have # Panics doc sections.

Alternatively, consider refactoring to eliminate the panic entirely. For example, storing first + rest in the struct would make the non-empty guarantee visible in the type system:

pub struct BlockStreamerWithFallback<T: BlockStreamer> {
    primary: T,
    fallbacks: Vec<T>,
}

This would remove the need for runtime .expect() and [0] indexing entirely.

Comment on lines +384 to +391
fn bucket_size(&self) -> Option<NonZeroU64> {
// SAFETY: The constructor guarantees the provider list is non-empty.
self.0[0].bucket_size()
}

fn provider_name(&self) -> &str {
// SAFETY: The constructor guarantees the provider list is non-empty.
self.0[0].provider_name()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: provider_name() and bucket_size() always delegate to the first (primary) provider. This makes sense for bucket_size() (since all providers for the same dataset kind likely share the same bucket configuration), but is provider_name() semantically correct after a fallback has occurred?

For example, if provider A fails and B succeeds, metrics/logs using provider_name() will still report provider A's name. Consider whether this should reflect the currently-active provider, or if logging the primary is intentional (in which case a doc comment explaining this would help).

}

#[tokio::test]
async fn fallback_tries_next_provider_on_failure() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing (test-functions.md): Per project guidelines, tests must:

  1. Follow the //* Given, //* When, //* Then comment structure
  2. Use the <function_name>_<scenario>_<expected_outcome> naming convention
  3. Test exactly one function per test

This test exercises both latest_block and block_stream in a single test. Consider splitting into separate tests, e.g.:

  • latest_block_all_providers_fail_except_last_returns_last_successful
  • block_stream_recoverable_errors_falls_through_to_working_provider

}

#[tokio::test]
async fn fatal_error_aborts_without_fallback() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing (test-functions.md): Same as the above test — missing //* Given, //* When, //* Then structure, and the name should follow <function_name>_<scenario>_<expected_outcome> format. E.g.: block_stream_fatal_error_aborts_without_trying_next_provider.

Comment on lines +187 to +193
/// Providers were found but client creation failed for all of them.
///
/// This occurs when providers matching the kind-network pair exist and their
/// environment variable substitution succeeds, but every client initialization
/// fails (e.g., all endpoints are unreachable).
#[error("all provider client creations failed for kind '{kind}' and network '{network}'")]
AllClientCreationsFailed { kind: String, network: NetworkId },
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error chain (errors-reporting.md §4, §6): AllClientCreationsFailed discards all underlying errors — it contains only kind and network strings. When operators encounter this error, they have no programmatic access to why client creation failed (only the warn-level logs remain).

Consider capturing at least the last error as a #[source] field, or collecting all errors into a Vec. For example:

#[error("all provider client creations failed for kind '{kind}' and network '{network}'")]
AllClientCreationsFailed {
    kind: String,
    network: NetworkId,
    #[source]
    last_error: Box<CreateClientError>,
},

Comment on lines 148 to +188
@@ -150,38 +152,71 @@ where
) -> Option<(ProviderName, ProviderResolvedConfigRaw)> {
let kind = kind.as_ref();

// Collect matching providers with their names
let mut matching_providers = self
.get_all()
.await
.iter()
.filter_map(|(name, config)| {
// Extract kind and network from config
let header = config.try_into_config::<ConfigHeaderWithNetwork>().ok()?;
if header.kind == kind && header.network == network {
Some((name.clone(), config.clone()))
} else {
None
// The BTreeMap iterator yields entries in lexicographic order by ProviderName.
for (name, config) in self.get_all().await.iter() {
let header = match config.try_into_config::<ConfigHeaderWithNetwork>() {
Ok(h) => h,
Err(_) => continue,
};
if header.kind != kind || header.network != network {
continue;
}
match config.with_env_substitution() {
Ok(resolved) => {
tracing::debug!(
provider_name = %name,
provider_kind = %kind,
provider_network = %network,
"successfully selected provider with environment substitution"
);
return Some((name.clone(), resolved));
}
})
.collect::<Vec<_>>();

if matching_providers.is_empty() {
return None;
Err(err) => {
tracing::warn!(
provider_name = %name,
provider_kind = %kind,
provider_network = %network,
error = %err,
error_source = logging::error_source(&err),
"environment variable substitution failed for provider, trying next"
);
}
}
}

matching_providers.shuffle(&mut rand::rng());
None
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code duplication: find_providers and find_provider share nearly identical logic (iterate BTreeMap, parse header, filter kind/network, try env substitution). The only difference is find_provider returns the first success while find_providers collects all.

Consider having find_provider delegate to find_providers to reduce duplication:

pub async fn find_provider(
    &self,
    kind: impl AsRef<str>,
    network: &NetworkId,
) -> Option<(ProviderName, ProviderResolvedConfigRaw)> {
    self.find_providers(kind, network).await.into_iter().next()
}

This would keep both APIs while eliminating the duplicated iteration/filtering/substitution logic.

Comment on lines +258 to +265
let mut client = BlockStreamerWithFallback::new(clients).map_err(|_err| {
Error::CreateBlockStreamClient(
amp_providers_registry::CreateClientError::AllClientCreationsFailed {
kind: kind_str,
network: network.clone(),
},
)
})?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error fabrication concern: The EmptyProvidersError from BlockStreamerWithFallback::new is discarded via |_err| and replaced with a fabricated AllClientCreationsFailed. But this code path should be unreachable — create_block_stream_clients already guarantees a non-empty Vec on success (it returns Err(AllClientCreationsFailed) when all creations fail, and Err(ProviderNotFound) when none exist).

If this is truly defensive, consider either:

  1. Adding a comment explaining why the guard is needed despite the upstream guarantee
  2. Using .expect("create_block_stream_clients guarantees non-empty") since this is a logic invariant, not a runtime condition
  3. Removing the BlockStreamerWithFallback::new error handling if you can rely on the upstream guarantee

.iter()
.map(|c| c.provider_name().to_string())
.collect();
let mut client = BlockStreamerWithFallback::new(clients).map_err(|_err| {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing with_retry(): The previous code wrapped the client with .with_retry(), which provided automatic retry with progressive backoff on recoverable errors. The new BlockStreamerWithFallback tries each provider once then gives up.

This means a transient error (e.g., a brief network blip affecting all providers simultaneously) that previously would have been retried will now fail the job. Was the removal of retry intentional? If so, it might be worth noting in the PR description. If not, consider composing the two: wrap each individual provider with retry, or wrap the fallback wrapper itself with retry:

// Option A: retry each provider individually
let clients: Vec<_> = clients.into_iter().map(|c| c.with_retry()).collect();
let mut client = BlockStreamerWithFallback::new(clients)...

// Option B: retry the whole fallback chain  
// (would need BlockStreamerWithFallback to implement the retry wrapper)

Copy link
Contributor

@shiyasmohd shiyasmohd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Added some suggestions as comments.

My only concern is, this approach forces us to have a fallback provider.
Eg:- If only one provider is configured, and if it fails, the job process fails instantly without retry (correct me if i'm wrong). This job will the retry only when the scheduler retries the next time. My suggestion would be to add 3 or 5 retries with fallback. Leaving this decision to you. This could be added in a follow up PR or this PR itself.

/// is tried.
///
/// Returns the provider name and resolved configuration.
pub async fn find_provider(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fn could return self.find_providers(kind, network).await.into_iter().next()

Comment on lines 125 to 127
pub async fn create_block_stream_client(
&self,
kind: impl AsRef<str>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fn is dead code now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants