-
Notifications
You must be signed in to change notification settings - Fork 6
feat(datasets-raw): add provider fallback for block streaming #1995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -250,3 +250,274 @@ where | |||||||||||||||||||||||||||||||||||
| self.0.provider_name() | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// A [`BlockStreamer`] wrapper that tries each provider once on recoverable errors. | ||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||
| /// Wraps an ordered list of providers. On a recoverable error, the system immediately | ||||||||||||||||||||||||||||||||||||
| /// tries the next provider. After all providers have been tried once without success, | ||||||||||||||||||||||||||||||||||||
| /// the last recoverable error is yielded (for `block_stream`) or returned (for | ||||||||||||||||||||||||||||||||||||
| /// `latest_block`). | ||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||
| /// Fatal errors always immediately abort the stream regardless of remaining providers. | ||||||||||||||||||||||||||||||||||||
| #[derive(Clone)] | ||||||||||||||||||||||||||||||||||||
| pub struct BlockStreamerWithFallback<T: BlockStreamer>(Vec<T>); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Error returned when attempting to create a [`BlockStreamerWithFallback`] with no providers. | ||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||
| /// This occurs when `BlockStreamerWithFallback::new` is called with an empty provider list. | ||||||||||||||||||||||||||||||||||||
| /// Callers should ensure at least one provider is available before constructing the wrapper. | ||||||||||||||||||||||||||||||||||||
| #[derive(Debug, thiserror::Error)] | ||||||||||||||||||||||||||||||||||||
| #[error("providers list must not be empty")] | ||||||||||||||||||||||||||||||||||||
| pub struct EmptyProvidersError; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| impl<T: BlockStreamer> BlockStreamerWithFallback<T> { | ||||||||||||||||||||||||||||||||||||
| /// Creates a fallback wrapper from an ordered list of providers. | ||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||
| /// The first provider in the list is the primary. On a recoverable error, the | ||||||||||||||||||||||||||||||||||||
| /// system immediately tries the next provider. After all providers have been tried | ||||||||||||||||||||||||||||||||||||
| /// once without success, the last recoverable error is yielded/returned. | ||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||
| /// Returns an error if `providers` is empty. | ||||||||||||||||||||||||||||||||||||
| pub fn new(providers: Vec<T>) -> Result<Self, EmptyProvidersError> { | ||||||||||||||||||||||||||||||||||||
| if providers.is_empty() { | ||||||||||||||||||||||||||||||||||||
| return Err(EmptyProvidersError); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| Ok(Self(providers)) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| impl<T> BlockStreamer for BlockStreamerWithFallback<T> | ||||||||||||||||||||||||||||||||||||
| where | ||||||||||||||||||||||||||||||||||||
| T: BlockStreamer + Send + Sync, | ||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||
| async fn block_stream( | ||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||
| start: BlockNum, | ||||||||||||||||||||||||||||||||||||
| end: BlockNum, | ||||||||||||||||||||||||||||||||||||
| ) -> impl Stream<Item = Result<Rows, BlockStreamError>> + Send { | ||||||||||||||||||||||||||||||||||||
| let providers = self.0; | ||||||||||||||||||||||||||||||||||||
| let mut next_block = start; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| async_stream::stream! { | ||||||||||||||||||||||||||||||||||||
| // Try each provider in order. On a recoverable error, immediately | ||||||||||||||||||||||||||||||||||||
| // try the next provider. If all providers fail, yield the last error. | ||||||||||||||||||||||||||||||||||||
| let mut last_error = None; | ||||||||||||||||||||||||||||||||||||
| for provider in &providers { | ||||||||||||||||||||||||||||||||||||
| let inner_stream = provider.clone().block_stream(next_block, end).await; | ||||||||||||||||||||||||||||||||||||
| futures::pin_mut!(inner_stream); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Drain this provider's stream until it ends or errors. | ||||||||||||||||||||||||||||||||||||
| let mut errored = false; | ||||||||||||||||||||||||||||||||||||
| while let Some(row_result) = inner_stream.next().await { | ||||||||||||||||||||||||||||||||||||
| match row_result.as_ref() { | ||||||||||||||||||||||||||||||||||||
| Ok(rows) => { | ||||||||||||||||||||||||||||||||||||
| next_block = rows.block_num() + 1; | ||||||||||||||||||||||||||||||||||||
| yield row_result; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| Err(BlockStreamError::Fatal(err)) => { | ||||||||||||||||||||||||||||||||||||
| tracing::error!( | ||||||||||||||||||||||||||||||||||||
| block = %next_block, | ||||||||||||||||||||||||||||||||||||
| provider_name = provider.provider_name(), | ||||||||||||||||||||||||||||||||||||
| error = %err, | ||||||||||||||||||||||||||||||||||||
| error_source = monitoring::logging::error_source(err.as_ref()), | ||||||||||||||||||||||||||||||||||||
| "fatal block stream error" | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
| yield row_result; | ||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| Err(BlockStreamError::Recoverable(err)) => { | ||||||||||||||||||||||||||||||||||||
| tracing::warn!( | ||||||||||||||||||||||||||||||||||||
| block = %next_block, | ||||||||||||||||||||||||||||||||||||
| provider_name = provider.provider_name(), | ||||||||||||||||||||||||||||||||||||
| error = %err, | ||||||||||||||||||||||||||||||||||||
| error_source = monitoring::logging::error_source(err.as_ref()), | ||||||||||||||||||||||||||||||||||||
| "block streaming failed on provider" | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
| errored = true; | ||||||||||||||||||||||||||||||||||||
| last_error = Some(row_result); | ||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| if !errored { | ||||||||||||||||||||||||||||||||||||
| // Stream ended normally (no more blocks). Done. | ||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // All providers failed for this block. Yield the last error. | ||||||||||||||||||||||||||||||||||||
| if let Some(err) = last_error { | ||||||||||||||||||||||||||||||||||||
| tracing::error!( | ||||||||||||||||||||||||||||||||||||
| block = %next_block, | ||||||||||||||||||||||||||||||||||||
| "all providers exhausted" | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+349
to
+354
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging (logging-errors.md §1, §2): This
Suggested change
|
||||||||||||||||||||||||||||||||||||
| yield err; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| async fn latest_block( | ||||||||||||||||||||||||||||||||||||
| &mut self, | ||||||||||||||||||||||||||||||||||||
| finalized: bool, | ||||||||||||||||||||||||||||||||||||
| ) -> Result<Option<BlockNum>, LatestBlockError> { | ||||||||||||||||||||||||||||||||||||
| let mut last_err = None; | ||||||||||||||||||||||||||||||||||||
| for provider in &self.0 { | ||||||||||||||||||||||||||||||||||||
| match provider.clone().latest_block(finalized).await { | ||||||||||||||||||||||||||||||||||||
| Ok(block) => return Ok(block), | ||||||||||||||||||||||||||||||||||||
| Err(err) => { | ||||||||||||||||||||||||||||||||||||
| tracing::warn!( | ||||||||||||||||||||||||||||||||||||
| provider_name = provider.provider_name(), | ||||||||||||||||||||||||||||||||||||
| error = %err, | ||||||||||||||||||||||||||||||||||||
| error_source = monitoring::logging::error_source(err.as_ref()), | ||||||||||||||||||||||||||||||||||||
| "failed to get latest block from provider" | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
| last_err = Some(err); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| // SAFETY: The constructor guarantees the provider list is non-empty, so the | ||||||||||||||||||||||||||||||||||||
| // loop above always executes at least once and `last_err` is always `Some`. | ||||||||||||||||||||||||||||||||||||
| Err(last_err.expect("providers list is non-empty")) | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+379
to
+381
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Panic branch (errors-handling.md §1, rust-documentation.md): Alternatively, consider refactoring to eliminate the panic entirely. For example, storing pub struct BlockStreamerWithFallback<T: BlockStreamer> {
primary: T,
fallbacks: Vec<T>,
}This would remove the need for runtime |
||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn bucket_size(&self) -> Option<NonZeroU64> { | ||||||||||||||||||||||||||||||||||||
| // SAFETY: The constructor guarantees the provider list is non-empty. | ||||||||||||||||||||||||||||||||||||
| self.0[0].bucket_size() | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn provider_name(&self) -> &str { | ||||||||||||||||||||||||||||||||||||
| // SAFETY: The constructor guarantees the provider list is non-empty. | ||||||||||||||||||||||||||||||||||||
| self.0[0].provider_name() | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+384
to
+391
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: For example, if provider A fails and B succeeds, metrics/logs using |
||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| #[cfg(test)] | ||||||||||||||||||||||||||||||||||||
| mod tests { | ||||||||||||||||||||||||||||||||||||
| use std::sync::{Arc, Mutex}; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| use futures::StreamExt as _; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| use super::*; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// A minimal [`BlockStreamer`] mock for testing fallback behavior. | ||||||||||||||||||||||||||||||||||||
| #[derive(Clone)] | ||||||||||||||||||||||||||||||||||||
| struct MockStreamer { | ||||||||||||||||||||||||||||||||||||
| name: &'static str, | ||||||||||||||||||||||||||||||||||||
| latest: Result<Option<BlockNum>, &'static str>, | ||||||||||||||||||||||||||||||||||||
| /// If set, `block_stream` yields this single error then ends. The `Arc<Mutex<Option<_>>>` | ||||||||||||||||||||||||||||||||||||
| /// allows the mock to be `Clone` while ensuring only one clone consumes the error. | ||||||||||||||||||||||||||||||||||||
| block_stream_error: Option<Arc<Mutex<Option<BlockStreamError>>>>, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| impl MockStreamer { | ||||||||||||||||||||||||||||||||||||
| fn succeeds(name: &'static str, block: BlockNum) -> Self { | ||||||||||||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||||||||||||
| name, | ||||||||||||||||||||||||||||||||||||
| latest: Ok(Some(block)), | ||||||||||||||||||||||||||||||||||||
| block_stream_error: None, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn fails_recoverable(name: &'static str, msg: &'static str) -> Self { | ||||||||||||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||||||||||||
| name, | ||||||||||||||||||||||||||||||||||||
| latest: Err(msg), | ||||||||||||||||||||||||||||||||||||
| block_stream_error: Some(Arc::new(Mutex::new(Some( | ||||||||||||||||||||||||||||||||||||
| BlockStreamError::Recoverable(msg.into()), | ||||||||||||||||||||||||||||||||||||
| )))), | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn fails_fatal(name: &'static str, msg: &'static str) -> Self { | ||||||||||||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||||||||||||
| name, | ||||||||||||||||||||||||||||||||||||
| latest: Err(msg), | ||||||||||||||||||||||||||||||||||||
| block_stream_error: Some(Arc::new(Mutex::new(Some(BlockStreamError::Fatal( | ||||||||||||||||||||||||||||||||||||
| msg.into(), | ||||||||||||||||||||||||||||||||||||
| ))))), | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| impl BlockStreamer for MockStreamer { | ||||||||||||||||||||||||||||||||||||
| async fn block_stream( | ||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||
| _start: BlockNum, | ||||||||||||||||||||||||||||||||||||
| _end: BlockNum, | ||||||||||||||||||||||||||||||||||||
| ) -> impl Stream<Item = Result<Rows, BlockStreamError>> + Send { | ||||||||||||||||||||||||||||||||||||
| let item = self | ||||||||||||||||||||||||||||||||||||
| .block_stream_error | ||||||||||||||||||||||||||||||||||||
| .and_then(|slot| slot.lock().ok()?.take()); | ||||||||||||||||||||||||||||||||||||
| futures::stream::iter(item.map(Err).into_iter()) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| async fn latest_block( | ||||||||||||||||||||||||||||||||||||
| &mut self, | ||||||||||||||||||||||||||||||||||||
| _finalized: bool, | ||||||||||||||||||||||||||||||||||||
| ) -> Result<Option<BlockNum>, LatestBlockError> { | ||||||||||||||||||||||||||||||||||||
| self.latest | ||||||||||||||||||||||||||||||||||||
| .map_err(|msg| -> LatestBlockError { msg.into() }) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn bucket_size(&self) -> Option<NonZeroU64> { | ||||||||||||||||||||||||||||||||||||
| None | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn provider_name(&self) -> &str { | ||||||||||||||||||||||||||||||||||||
| self.name | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| #[tokio::test] | ||||||||||||||||||||||||||||||||||||
| async fn fallback_tries_next_provider_on_failure() { | ||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Testing (test-functions.md): Per project guidelines, tests must:
This test exercises both
|
||||||||||||||||||||||||||||||||||||
| let make_providers = || { | ||||||||||||||||||||||||||||||||||||
| vec![ | ||||||||||||||||||||||||||||||||||||
| MockStreamer::fails_recoverable("provider_a", "timeout"), | ||||||||||||||||||||||||||||||||||||
| MockStreamer::fails_recoverable("provider_b", "rate limited"), | ||||||||||||||||||||||||||||||||||||
| MockStreamer::succeeds("provider_c", 100), | ||||||||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // latest_block: should fall through to provider_c. | ||||||||||||||||||||||||||||||||||||
| let mut client = | ||||||||||||||||||||||||||||||||||||
| BlockStreamerWithFallback::new(make_providers()).expect("providers list is non-empty"); | ||||||||||||||||||||||||||||||||||||
| let result = client.latest_block(false).await; | ||||||||||||||||||||||||||||||||||||
| assert_eq!( | ||||||||||||||||||||||||||||||||||||
| result.expect("should succeed via third provider"), | ||||||||||||||||||||||||||||||||||||
| Some(100) | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // block_stream: providers a and b yield recoverable errors, then the | ||||||||||||||||||||||||||||||||||||
| // last recoverable error is yielded when all providers are exhausted | ||||||||||||||||||||||||||||||||||||
| // (provider_c's block_stream returns an empty stream, so it "succeeds" | ||||||||||||||||||||||||||||||||||||
| // with no items -- but after the two recoverable errors, the fallback | ||||||||||||||||||||||||||||||||||||
| // loop reaches c which produces no error, and the stream ends normally). | ||||||||||||||||||||||||||||||||||||
| // We verify no fatal abort occurs and the stream ends. | ||||||||||||||||||||||||||||||||||||
| let client = | ||||||||||||||||||||||||||||||||||||
| BlockStreamerWithFallback::new(make_providers()).expect("providers list is non-empty"); | ||||||||||||||||||||||||||||||||||||
| let results: Vec<_> = client.block_stream(0, 10).await.collect().await; | ||||||||||||||||||||||||||||||||||||
| assert!( | ||||||||||||||||||||||||||||||||||||
| results.is_empty(), | ||||||||||||||||||||||||||||||||||||
| "stream should end normally after falling through to a working provider" | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| #[tokio::test] | ||||||||||||||||||||||||||||||||||||
| async fn fatal_error_aborts_without_fallback() { | ||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Testing (test-functions.md): Same as the above test — missing |
||||||||||||||||||||||||||||||||||||
| // block_stream: fatal error from provider_a should abort immediately | ||||||||||||||||||||||||||||||||||||
| // without trying provider_b. | ||||||||||||||||||||||||||||||||||||
| let providers = vec![ | ||||||||||||||||||||||||||||||||||||
| MockStreamer::fails_fatal("provider_a", "invalid data"), | ||||||||||||||||||||||||||||||||||||
| MockStreamer::succeeds("provider_b", 100), | ||||||||||||||||||||||||||||||||||||
| ]; | ||||||||||||||||||||||||||||||||||||
| let client = | ||||||||||||||||||||||||||||||||||||
| BlockStreamerWithFallback::new(providers).expect("providers list is non-empty"); | ||||||||||||||||||||||||||||||||||||
| let results: Vec<_> = client.block_stream(0, 10).await.collect().await; | ||||||||||||||||||||||||||||||||||||
| assert_eq!(results.len(), 1, "should yield exactly one item"); | ||||||||||||||||||||||||||||||||||||
| assert!( | ||||||||||||||||||||||||||||||||||||
| matches!(&results[0], Err(BlockStreamError::Fatal(_))), | ||||||||||||||||||||||||||||||||||||
| "should be a fatal error" | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,6 +184,14 @@ pub enum CreateClientError { | |
| #[error("no provider found for kind '{kind}' and network '{network}'")] | ||
| ProviderNotFound { kind: String, network: NetworkId }, | ||
|
|
||
| /// Providers were found but client creation failed for all of them. | ||
| /// | ||
| /// This occurs when providers matching the kind-network pair exist and their | ||
| /// environment variable substitution succeeds, but every client initialization | ||
| /// fails (e.g., all endpoints are unreachable). | ||
| #[error("all provider client creations failed for kind '{kind}' and network '{network}'")] | ||
| AllClientCreationsFailed { kind: String, network: NetworkId }, | ||
|
Comment on lines
+187
to
+193
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error chain (errors-reporting.md §4, §6): Consider capturing at least the last error as a #[error("all provider client creations failed for kind '{kind}' and network '{network}'")]
AllClientCreationsFailed {
kind: String,
network: NetworkId,
#[source]
last_error: Box<CreateClientError>,
}, |
||
|
|
||
| /// Unsupported provider kind. | ||
| /// | ||
| /// This occurs when the provider kind does not match any supported raw dataset type. | ||
|
|
@@ -197,6 +205,7 @@ impl crate::retryable::RetryableErrorExt for CreateClientError { | |
| Self::ConfigParse { .. } => false, | ||
| Self::ProviderClient { source, .. } => source.is_retryable(), | ||
| Self::ProviderNotFound { .. } => false, | ||
| Self::AllClientCreationsFailed { .. } => true, | ||
| Self::UnsupportedKind { .. } => false, | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug/Logic concern: When provider A succeeds for blocks 0–5, then provider B fails with a recoverable error on block 6, the fallback continues to provider C starting from block 6. But if provider C's stream also ends normally (produces no items because it too has no data past block 5), the loop falls through to the
if !errored { return; }path and the stream ends — silently dropping the recoverable error from provider B.The user never sees that block 6 failed on B, and C simply produced nothing. The
last_errorfrom B is never yielded. This could cause the caller to believe all blocks in [start, end] were successfully streamed when in fact the range was incomplete.Consider: after a provider ends normally with
!erroredbutnext_block <= end, should that be treated as a partial success requiring fallback to the next provider rather than a normal completion?