feat(datasets-raw): add provider fallback for block streaming#1995
feat(datasets-raw): add provider fallback for block streaming#1995
Conversation
Replace random provider selection with deterministic ordering and add BlockStreamerWithFallback to try each provider once on recoverable errors. - Add `BlockStreamerWithFallback` that iterates providers in order, falling back to the next on recoverable errors and aborting on fatal - Add `find_providers` and `create_block_stream_clients` to `ProvidersRegistry` returning all matching providers in name order - Add `AllClientCreationsFailed` error variant to distinguish from `ProviderNotFound` when providers exist but all fail to connect - Replace random shuffle with deterministic lexicographic selection - Remove `rand` dependency from `providers-registry`
There was a problem hiding this comment.
Code Review Summary
Key Concerns
1. Missing retry behavior (potential regression)
The previous code wrapped the client with .with_retry() for progressive backoff on recoverable errors. The new BlockStreamerWithFallback tries each provider once then stops. A transient error affecting all providers simultaneously (e.g., brief network blip) will now fail the job instead of retrying. Clarify if this is intentional.
2. Subtle stream termination logic
When a provider partially succeeds then the next fails recoverably, the fallback may silently end the stream without yielding the error if a subsequent provider's stream ends normally but empty. This could result in incomplete block ranges without any error signal to the caller.
3. Error chain breakage
AllClientCreationsFailed discards all underlying errors — operators lose programmatic access to failure causes. Consider preserving at least the last error as a #[source] field.
Other Findings
- Panic branches:
expect()and[0]indexing in trait impls lack# Panicsdoc sections. Consider a(primary, fallbacks)struct to eliminate panics via types. - Logging: "all providers exhausted" error log is missing
error/error_sourcefields despite having the error in scope. - Code duplication:
find_providerandfind_providersshare ~90% identical logic;find_providercould delegate tofind_providers. - Unreachable error path:
BlockStreamerWithFallback::newerror injob_impl.rsfabricates anAllClientCreationsFailedbut should be unreachable given upstream guarantees. - Test structure: Tests should follow the project's mandatory Given/When/Then structure, naming convention, and single-function-per-test rule.
| let mut errored = false; | ||
| while let Some(row_result) = inner_stream.next().await { | ||
| match row_result.as_ref() { | ||
| Ok(rows) => { | ||
| next_block = rows.block_num() + 1; | ||
| yield row_result; | ||
| } | ||
| Err(BlockStreamError::Fatal(err)) => { | ||
| tracing::error!( | ||
| block = %next_block, | ||
| provider_name = provider.provider_name(), | ||
| error = %err, | ||
| error_source = monitoring::logging::error_source(err.as_ref()), | ||
| "fatal block stream error" | ||
| ); | ||
| yield row_result; | ||
| return; | ||
| } | ||
| Err(BlockStreamError::Recoverable(err)) => { | ||
| tracing::warn!( | ||
| block = %next_block, | ||
| provider_name = provider.provider_name(), | ||
| error = %err, | ||
| error_source = monitoring::logging::error_source(err.as_ref()), | ||
| "block streaming failed on provider" | ||
| ); | ||
| errored = true; | ||
| last_error = Some(row_result); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if !errored { |
There was a problem hiding this comment.
Bug/Logic concern: When provider A succeeds for blocks 0–5, then provider B fails with a recoverable error on block 6, the fallback continues to provider C starting from block 6. But if provider C's stream also ends normally (produces no items because it too has no data past block 5), the loop falls through to the if !errored { return; } path and the stream ends — silently dropping the recoverable error from provider B.
The user never sees that block 6 failed on B, and C simply produced nothing. The last_error from B is never yielded. This could cause the caller to believe all blocks in [start, end] were successfully streamed when in fact the range was incomplete.
Consider: after a provider ends normally with !errored but next_block <= end, should that be treated as a partial success requiring fallback to the next provider rather than a normal completion?
| // All providers failed for this block. Yield the last error. | ||
| if let Some(err) = last_error { | ||
| tracing::error!( | ||
| block = %next_block, | ||
| "all providers exhausted" | ||
| ); |
There was a problem hiding this comment.
Logging (logging-errors.md §1, §2): This tracing::error! is missing the error and error_source fields. The last_error variable is in scope and holds the actual BlockStreamError — its details should be included for observability. Without them, operators see "all providers exhausted" with only a block number and no error context.
| // All providers failed for this block. Yield the last error. | |
| if let Some(err) = last_error { | |
| tracing::error!( | |
| block = %next_block, | |
| "all providers exhausted" | |
| ); | |
| if let Some(Err(ref err)) = last_error { | |
| tracing::error!( | |
| block = %next_block, | |
| error = %err, | |
| error_source = monitoring::logging::error_source(err.as_ref()), | |
| "all providers exhausted" | |
| ); | |
| } | |
| if let Some(err) = last_error { | |
| yield err; | |
| } |
| // SAFETY: The constructor guarantees the provider list is non-empty, so the | ||
| // loop above always executes at least once and `last_err` is always `Some`. | ||
| Err(last_err.expect("providers list is non-empty")) |
There was a problem hiding this comment.
Panic branch (errors-handling.md §1, rust-documentation.md): .expect() and direct indexing self.0[0] (in bucket_size/provider_name below) are panic-inducing. The // SAFETY: line comments explain the invariant, but per project guidelines these trait impl methods should have # Panics doc sections.
Alternatively, consider refactoring to eliminate the panic entirely. For example, storing first + rest in the struct would make the non-empty guarantee visible in the type system:
pub struct BlockStreamerWithFallback<T: BlockStreamer> {
primary: T,
fallbacks: Vec<T>,
}This would remove the need for runtime .expect() and [0] indexing entirely.
| fn bucket_size(&self) -> Option<NonZeroU64> { | ||
| // SAFETY: The constructor guarantees the provider list is non-empty. | ||
| self.0[0].bucket_size() | ||
| } | ||
|
|
||
| fn provider_name(&self) -> &str { | ||
| // SAFETY: The constructor guarantees the provider list is non-empty. | ||
| self.0[0].provider_name() |
There was a problem hiding this comment.
Question: provider_name() and bucket_size() always delegate to the first (primary) provider. This makes sense for bucket_size() (since all providers for the same dataset kind likely share the same bucket configuration), but is provider_name() semantically correct after a fallback has occurred?
For example, if provider A fails and B succeeds, metrics/logs using provider_name() will still report provider A's name. Consider whether this should reflect the currently-active provider, or if logging the primary is intentional (in which case a doc comment explaining this would help).
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn fallback_tries_next_provider_on_failure() { |
There was a problem hiding this comment.
Testing (test-functions.md): Per project guidelines, tests must:
- Follow the
//* Given,//* When,//* Thencomment structure - Use the
<function_name>_<scenario>_<expected_outcome>naming convention - Test exactly one function per test
This test exercises both latest_block and block_stream in a single test. Consider splitting into separate tests, e.g.:
latest_block_all_providers_fail_except_last_returns_last_successfulblock_stream_recoverable_errors_falls_through_to_working_provider
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn fatal_error_aborts_without_fallback() { |
There was a problem hiding this comment.
Testing (test-functions.md): Same as the above test — missing //* Given, //* When, //* Then structure, and the name should follow <function_name>_<scenario>_<expected_outcome> format. E.g.: block_stream_fatal_error_aborts_without_trying_next_provider.
| /// Providers were found but client creation failed for all of them. | ||
| /// | ||
| /// This occurs when providers matching the kind-network pair exist and their | ||
| /// environment variable substitution succeeds, but every client initialization | ||
| /// fails (e.g., all endpoints are unreachable). | ||
| #[error("all provider client creations failed for kind '{kind}' and network '{network}'")] | ||
| AllClientCreationsFailed { kind: String, network: NetworkId }, |
There was a problem hiding this comment.
Error chain (errors-reporting.md §4, §6): AllClientCreationsFailed discards all underlying errors — it contains only kind and network strings. When operators encounter this error, they have no programmatic access to why client creation failed (only the warn-level logs remain).
Consider capturing at least the last error as a #[source] field, or collecting all errors into a Vec. For example:
#[error("all provider client creations failed for kind '{kind}' and network '{network}'")]
AllClientCreationsFailed {
kind: String,
network: NetworkId,
#[source]
last_error: Box<CreateClientError>,
},| @@ -150,38 +152,71 @@ where | |||
| ) -> Option<(ProviderName, ProviderResolvedConfigRaw)> { | |||
| let kind = kind.as_ref(); | |||
|
|
|||
| // Collect matching providers with their names | |||
| let mut matching_providers = self | |||
| .get_all() | |||
| .await | |||
| .iter() | |||
| .filter_map(|(name, config)| { | |||
| // Extract kind and network from config | |||
| let header = config.try_into_config::<ConfigHeaderWithNetwork>().ok()?; | |||
| if header.kind == kind && header.network == network { | |||
| Some((name.clone(), config.clone())) | |||
| } else { | |||
| None | |||
| // The BTreeMap iterator yields entries in lexicographic order by ProviderName. | |||
| for (name, config) in self.get_all().await.iter() { | |||
| let header = match config.try_into_config::<ConfigHeaderWithNetwork>() { | |||
| Ok(h) => h, | |||
| Err(_) => continue, | |||
| }; | |||
| if header.kind != kind || header.network != network { | |||
| continue; | |||
| } | |||
| match config.with_env_substitution() { | |||
| Ok(resolved) => { | |||
| tracing::debug!( | |||
| provider_name = %name, | |||
| provider_kind = %kind, | |||
| provider_network = %network, | |||
| "successfully selected provider with environment substitution" | |||
| ); | |||
| return Some((name.clone(), resolved)); | |||
| } | |||
| }) | |||
| .collect::<Vec<_>>(); | |||
|
|
|||
| if matching_providers.is_empty() { | |||
| return None; | |||
| Err(err) => { | |||
| tracing::warn!( | |||
| provider_name = %name, | |||
| provider_kind = %kind, | |||
| provider_network = %network, | |||
| error = %err, | |||
| error_source = logging::error_source(&err), | |||
| "environment variable substitution failed for provider, trying next" | |||
| ); | |||
| } | |||
| } | |||
| } | |||
|
|
|||
| matching_providers.shuffle(&mut rand::rng()); | |||
| None | |||
| } | |||
There was a problem hiding this comment.
Code duplication: find_providers and find_provider share nearly identical logic (iterate BTreeMap, parse header, filter kind/network, try env substitution). The only difference is find_provider returns the first success while find_providers collects all.
Consider having find_provider delegate to find_providers to reduce duplication:
pub async fn find_provider(
&self,
kind: impl AsRef<str>,
network: &NetworkId,
) -> Option<(ProviderName, ProviderResolvedConfigRaw)> {
self.find_providers(kind, network).await.into_iter().next()
}This would keep both APIs while eliminating the duplicated iteration/filtering/substitution logic.
| let mut client = BlockStreamerWithFallback::new(clients).map_err(|_err| { | ||
| Error::CreateBlockStreamClient( | ||
| amp_providers_registry::CreateClientError::AllClientCreationsFailed { | ||
| kind: kind_str, | ||
| network: network.clone(), | ||
| }, | ||
| ) | ||
| })?; |
There was a problem hiding this comment.
Error fabrication concern: The EmptyProvidersError from BlockStreamerWithFallback::new is discarded via |_err| and replaced with a fabricated AllClientCreationsFailed. But this code path should be unreachable — create_block_stream_clients already guarantees a non-empty Vec on success (it returns Err(AllClientCreationsFailed) when all creations fail, and Err(ProviderNotFound) when none exist).
If this is truly defensive, consider either:
- Adding a comment explaining why the guard is needed despite the upstream guarantee
- Using
.expect("create_block_stream_clients guarantees non-empty")since this is a logic invariant, not a runtime condition - Removing the
BlockStreamerWithFallback::newerror handling if you can rely on the upstream guarantee
| .iter() | ||
| .map(|c| c.provider_name().to_string()) | ||
| .collect(); | ||
| let mut client = BlockStreamerWithFallback::new(clients).map_err(|_err| { |
There was a problem hiding this comment.
Missing with_retry(): The previous code wrapped the client with .with_retry(), which provided automatic retry with progressive backoff on recoverable errors. The new BlockStreamerWithFallback tries each provider once then gives up.
This means a transient error (e.g., a brief network blip affecting all providers simultaneously) that previously would have been retried will now fail the job. Was the removal of retry intentional? If so, it might be worth noting in the PR description. If not, consider composing the two: wrap each individual provider with retry, or wrap the fallback wrapper itself with retry:
// Option A: retry each provider individually
let clients: Vec<_> = clients.into_iter().map(|c| c.with_retry()).collect();
let mut client = BlockStreamerWithFallback::new(clients)...
// Option B: retry the whole fallback chain
// (would need BlockStreamerWithFallback to implement the retry wrapper)
shiyasmohd
left a comment
There was a problem hiding this comment.
LGTM. Added some suggestions as comments.
My only concern is, this approach forces us to have a fallback provider.
Eg:- If only one provider is configured, and if it fails, the job process fails instantly without retry (correct me if i'm wrong). This job will the retry only when the scheduler retries the next time. My suggestion would be to add 3 or 5 retries with fallback. Leaving this decision to you. This could be added in a follow up PR or this PR itself.
| /// is tried. | ||
| /// | ||
| /// Returns the provider name and resolved configuration. | ||
| pub async fn find_provider( |
There was a problem hiding this comment.
this fn could return self.find_providers(kind, network).await.into_iter().next()
| pub async fn create_block_stream_client( | ||
| &self, | ||
| kind: impl AsRef<str>, |
There was a problem hiding this comment.
this fn is dead code now
Replace random provider selection with deterministic ordering and add BlockStreamerWithFallback to try each provider once on recoverable errors.
BlockStreamerWithFallbackthat iterates providers in order, falling back to the next on recoverable errors and aborting on fatalfind_providersandcreate_block_stream_clientstoProvidersRegistryreturning all matching providers in name orderAllClientCreationsFailederror variant to distinguish fromProviderNotFoundwhen providers exist but all fail to connectranddependency fromproviders-registry