diff --git a/Cargo.lock b/Cargo.lock index b43ff87c4..bb3617ecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1169,7 +1169,6 @@ dependencies = [ "monitoring", "object_store", "parking_lot", - "rand 0.9.2", "thiserror 2.0.18", "tokio", "toml", diff --git a/crates/core/datasets-raw/src/client.rs b/crates/core/datasets-raw/src/client.rs index 304b2f316..a4150e582 100644 --- a/crates/core/datasets-raw/src/client.rs +++ b/crates/core/datasets-raw/src/client.rs @@ -250,3 +250,274 @@ where self.0.provider_name() } } + +/// A [`BlockStreamer`] wrapper that tries each provider once on recoverable errors. +/// +/// Wraps an ordered list of providers. On a recoverable error, the system immediately +/// tries the next provider. After all providers have been tried once without success, +/// the last recoverable error is yielded (for `block_stream`) or returned (for +/// `latest_block`). +/// +/// Fatal errors always immediately abort the stream regardless of remaining providers. +#[derive(Clone)] +pub struct BlockStreamerWithFallback(Vec); + +/// Error returned when attempting to create a [`BlockStreamerWithFallback`] with no providers. +/// +/// This occurs when `BlockStreamerWithFallback::new` is called with an empty provider list. +/// Callers should ensure at least one provider is available before constructing the wrapper. +#[derive(Debug, thiserror::Error)] +#[error("providers list must not be empty")] +pub struct EmptyProvidersError; + +impl BlockStreamerWithFallback { + /// Creates a fallback wrapper from an ordered list of providers. + /// + /// The first provider in the list is the primary. On a recoverable error, the + /// system immediately tries the next provider. After all providers have been tried + /// once without success, the last recoverable error is yielded/returned. + /// + /// Returns an error if `providers` is empty. + pub fn new(providers: Vec) -> Result { + if providers.is_empty() { + return Err(EmptyProvidersError); + } + Ok(Self(providers)) + } +} + +impl BlockStreamer for BlockStreamerWithFallback +where + T: BlockStreamer + Send + Sync, +{ + async fn block_stream( + self, + start: BlockNum, + end: BlockNum, + ) -> impl Stream> + Send { + let providers = self.0; + let mut next_block = start; + + async_stream::stream! { + // Try each provider in order. On a recoverable error, immediately + // try the next provider. If all providers fail, yield the last error. + let mut last_error = None; + for provider in &providers { + let inner_stream = provider.clone().block_stream(next_block, end).await; + futures::pin_mut!(inner_stream); + + // Drain this provider's stream until it ends or errors. + let mut errored = false; + while let Some(row_result) = inner_stream.next().await { + match row_result.as_ref() { + Ok(rows) => { + next_block = rows.block_num() + 1; + yield row_result; + } + Err(BlockStreamError::Fatal(err)) => { + tracing::error!( + block = %next_block, + provider_name = provider.provider_name(), + error = %err, + error_source = monitoring::logging::error_source(err.as_ref()), + "fatal block stream error" + ); + yield row_result; + return; + } + Err(BlockStreamError::Recoverable(err)) => { + tracing::warn!( + block = %next_block, + provider_name = provider.provider_name(), + error = %err, + error_source = monitoring::logging::error_source(err.as_ref()), + "block streaming failed on provider" + ); + errored = true; + last_error = Some(row_result); + break; + } + } + } + + if !errored { + // Stream ended normally (no more blocks). Done. + return; + } + } + + // All providers failed for this block. Yield the last error. + if let Some(err) = last_error { + tracing::error!( + block = %next_block, + "all providers exhausted" + ); + yield err; + } + } + } + + async fn latest_block( + &mut self, + finalized: bool, + ) -> Result, LatestBlockError> { + let mut last_err = None; + for provider in &self.0 { + match provider.clone().latest_block(finalized).await { + Ok(block) => return Ok(block), + Err(err) => { + tracing::warn!( + provider_name = provider.provider_name(), + error = %err, + error_source = monitoring::logging::error_source(err.as_ref()), + "failed to get latest block from provider" + ); + last_err = Some(err); + } + } + } + // SAFETY: The constructor guarantees the provider list is non-empty, so the + // loop above always executes at least once and `last_err` is always `Some`. + Err(last_err.expect("providers list is non-empty")) + } + + fn bucket_size(&self) -> Option { + // SAFETY: The constructor guarantees the provider list is non-empty. + self.0[0].bucket_size() + } + + fn provider_name(&self) -> &str { + // SAFETY: The constructor guarantees the provider list is non-empty. + self.0[0].provider_name() + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use futures::StreamExt as _; + + use super::*; + + /// A minimal [`BlockStreamer`] mock for testing fallback behavior. + #[derive(Clone)] + struct MockStreamer { + name: &'static str, + latest: Result, &'static str>, + /// If set, `block_stream` yields this single error then ends. The `Arc>>` + /// allows the mock to be `Clone` while ensuring only one clone consumes the error. + block_stream_error: Option>>>, + } + + impl MockStreamer { + fn succeeds(name: &'static str, block: BlockNum) -> Self { + Self { + name, + latest: Ok(Some(block)), + block_stream_error: None, + } + } + + fn fails_recoverable(name: &'static str, msg: &'static str) -> Self { + Self { + name, + latest: Err(msg), + block_stream_error: Some(Arc::new(Mutex::new(Some( + BlockStreamError::Recoverable(msg.into()), + )))), + } + } + + fn fails_fatal(name: &'static str, msg: &'static str) -> Self { + Self { + name, + latest: Err(msg), + block_stream_error: Some(Arc::new(Mutex::new(Some(BlockStreamError::Fatal( + msg.into(), + ))))), + } + } + } + + impl BlockStreamer for MockStreamer { + async fn block_stream( + self, + _start: BlockNum, + _end: BlockNum, + ) -> impl Stream> + Send { + let item = self + .block_stream_error + .and_then(|slot| slot.lock().ok()?.take()); + futures::stream::iter(item.map(Err).into_iter()) + } + + async fn latest_block( + &mut self, + _finalized: bool, + ) -> Result, LatestBlockError> { + self.latest + .map_err(|msg| -> LatestBlockError { msg.into() }) + } + + fn bucket_size(&self) -> Option { + None + } + + fn provider_name(&self) -> &str { + self.name + } + } + + #[tokio::test] + async fn fallback_tries_next_provider_on_failure() { + let make_providers = || { + vec![ + MockStreamer::fails_recoverable("provider_a", "timeout"), + MockStreamer::fails_recoverable("provider_b", "rate limited"), + MockStreamer::succeeds("provider_c", 100), + ] + }; + + // latest_block: should fall through to provider_c. + let mut client = + BlockStreamerWithFallback::new(make_providers()).expect("providers list is non-empty"); + let result = client.latest_block(false).await; + assert_eq!( + result.expect("should succeed via third provider"), + Some(100) + ); + + // block_stream: providers a and b yield recoverable errors, then the + // last recoverable error is yielded when all providers are exhausted + // (provider_c's block_stream returns an empty stream, so it "succeeds" + // with no items -- but after the two recoverable errors, the fallback + // loop reaches c which produces no error, and the stream ends normally). + // We verify no fatal abort occurs and the stream ends. + let client = + BlockStreamerWithFallback::new(make_providers()).expect("providers list is non-empty"); + let results: Vec<_> = client.block_stream(0, 10).await.collect().await; + assert!( + results.is_empty(), + "stream should end normally after falling through to a working provider" + ); + } + + #[tokio::test] + async fn fatal_error_aborts_without_fallback() { + // block_stream: fatal error from provider_a should abort immediately + // without trying provider_b. + let providers = vec![ + MockStreamer::fails_fatal("provider_a", "invalid data"), + MockStreamer::succeeds("provider_b", 100), + ]; + let client = + BlockStreamerWithFallback::new(providers).expect("providers list is non-empty"); + let results: Vec<_> = client.block_stream(0, 10).await.collect().await; + assert_eq!(results.len(), 1, "should yield exactly one item"); + assert!( + matches!(&results[0], Err(BlockStreamError::Fatal(_))), + "should be a fatal error" + ); + } +} diff --git a/crates/core/providers-registry/Cargo.toml b/crates/core/providers-registry/Cargo.toml index 3c295e3ed..a595d6675 100644 --- a/crates/core/providers-registry/Cargo.toml +++ b/crates/core/providers-registry/Cargo.toml @@ -16,7 +16,6 @@ futures.workspace = true monitoring = { path = "../monitoring" } object_store.workspace = true parking_lot = "0.12.4" -rand.workspace = true thiserror.workspace = true tokio.workspace = true toml = { workspace = true, features = ["preserve_order"] } diff --git a/crates/core/providers-registry/src/client/block_stream.rs b/crates/core/providers-registry/src/client/block_stream.rs index f7fb60a60..d47fa6e2d 100644 --- a/crates/core/providers-registry/src/client/block_stream.rs +++ b/crates/core/providers-registry/src/client/block_stream.rs @@ -184,6 +184,14 @@ pub enum CreateClientError { #[error("no provider found for kind '{kind}' and network '{network}'")] ProviderNotFound { kind: String, network: NetworkId }, + /// Providers were found but client creation failed for all of them. + /// + /// This occurs when providers matching the kind-network pair exist and their + /// environment variable substitution succeeds, but every client initialization + /// fails (e.g., all endpoints are unreachable). + #[error("all provider client creations failed for kind '{kind}' and network '{network}'")] + AllClientCreationsFailed { kind: String, network: NetworkId }, + /// Unsupported provider kind. /// /// This occurs when the provider kind does not match any supported raw dataset type. @@ -197,6 +205,7 @@ impl crate::retryable::RetryableErrorExt for CreateClientError { Self::ConfigParse { .. } => false, Self::ProviderClient { source, .. } => source.is_retryable(), Self::ProviderNotFound { .. } => false, + Self::AllClientCreationsFailed { .. } => true, Self::UnsupportedKind { .. } => false, } } diff --git a/crates/core/providers-registry/src/lib.rs b/crates/core/providers-registry/src/lib.rs index 9a77935d7..c174b4c17 100644 --- a/crates/core/providers-registry/src/lib.rs +++ b/crates/core/providers-registry/src/lib.rs @@ -24,7 +24,6 @@ use amp_providers_evm_rpc::{kind::EvmRpcProviderKind, provider as evm_rpc_provid use datasets_common::network_id::NetworkId; use monitoring::{logging, telemetry::metrics::Meter}; use object_store::ObjectStore; -use rand::seq::SliceRandom as _; mod client; pub mod retryable; @@ -122,7 +121,7 @@ where /// Find a provider and create a block stream client for the given kind and network. /// /// This method combines provider lookup and client creation in one operation. - /// Providers are tried in random order to distribute load. + /// Providers are tried in deterministic order by name. pub async fn create_block_stream_client( &self, kind: impl AsRef, @@ -141,7 +140,10 @@ where /// Find a provider by kind and network, applying environment variable substitution. /// - /// Providers are tried in random order to distribute load. + /// Matching providers are tried in deterministic order by provider name (lexicographic). + /// If a provider's environment variable substitution fails, the next provider in order + /// is tried. + /// /// Returns the provider name and resolved configuration. pub async fn find_provider( &self, @@ -150,38 +152,71 @@ where ) -> Option<(ProviderName, ProviderResolvedConfigRaw)> { let kind = kind.as_ref(); - // Collect matching providers with their names - let mut matching_providers = self - .get_all() - .await - .iter() - .filter_map(|(name, config)| { - // Extract kind and network from config - let header = config.try_into_config::().ok()?; - if header.kind == kind && header.network == network { - Some((name.clone(), config.clone())) - } else { - None + // The BTreeMap iterator yields entries in lexicographic order by ProviderName. + for (name, config) in self.get_all().await.iter() { + let header = match config.try_into_config::() { + Ok(h) => h, + Err(_) => continue, + }; + if header.kind != kind || header.network != network { + continue; + } + match config.with_env_substitution() { + Ok(resolved) => { + tracing::debug!( + provider_name = %name, + provider_kind = %kind, + provider_network = %network, + "successfully selected provider with environment substitution" + ); + return Some((name.clone(), resolved)); } - }) - .collect::>(); - - if matching_providers.is_empty() { - return None; + Err(err) => { + tracing::warn!( + provider_name = %name, + provider_kind = %kind, + provider_network = %network, + error = %err, + error_source = logging::error_source(&err), + "environment variable substitution failed for provider, trying next" + ); + } + } } - matching_providers.shuffle(&mut rand::rng()); + None + } + + /// Find all providers matching a kind and network, applying environment variable substitution. + /// + /// Returns all matching providers in deterministic order by provider name (lexicographic). + /// Providers whose environment variable substitution fails are skipped with a warning. + /// This is useful when callers need a fallback list of providers to try in sequence. + pub async fn find_providers( + &self, + kind: impl AsRef, + network: &NetworkId, + ) -> Vec<(ProviderName, ProviderResolvedConfigRaw)> { + let kind = kind.as_ref(); + let mut result = Vec::new(); - 'try_find_provider: for (name, config) in matching_providers { + for (name, config) in self.get_all().await.iter() { + let header = match config.try_into_config::() { + Ok(h) => h, + Err(_) => continue, + }; + if header.kind != kind || header.network != network { + continue; + } match config.with_env_substitution() { Ok(resolved) => { tracing::debug!( provider_name = %name, provider_kind = %kind, provider_network = %network, - "successfully selected provider with environment substitution" + "resolved provider with environment substitution" ); - return Some((name, resolved)); + result.push((name.clone(), resolved)); } Err(err) => { tracing::warn!( @@ -190,14 +225,58 @@ where provider_network = %network, error = %err, error_source = logging::error_source(&err), - "environment variable substitution failed for provider, trying next" + "environment variable substitution failed for provider, skipping" ); - continue 'try_find_provider; } } } - None + result + } + + /// Find all providers and create block stream clients for the given kind and network. + /// + /// Returns all successfully created clients in deterministic order by provider name. + /// Providers whose client creation fails are skipped with a warning. + /// Returns an error only if no providers are found or all client creations fail. + pub async fn create_block_stream_clients( + &self, + kind: impl AsRef, + network: &NetworkId, + meter: Option<&Meter>, + ) -> Result, CreateClientError> { + let kind_str = kind.as_ref(); + let providers = self.find_providers(&kind, network).await; + if providers.is_empty() { + return Err(CreateClientError::ProviderNotFound { + kind: kind_str.to_string(), + network: network.clone(), + }); + } + + let mut clients = Vec::new(); + for (name, config) in providers { + match client::block_stream::create(name.clone(), config, meter).await { + Ok(c) => clients.push(c), + Err(err) => { + tracing::warn!( + provider_name = %name, + error = %err, + error_source = logging::error_source(&err), + "block stream client creation failed for provider" + ); + } + } + } + + if clients.is_empty() { + return Err(CreateClientError::AllClientCreationsFailed { + kind: kind_str.to_string(), + network: network.clone(), + }); + } + + Ok(clients) } /// Create an EVM RPC client for the given network. diff --git a/crates/core/worker-datasets-raw/src/job_impl.rs b/crates/core/worker-datasets-raw/src/job_impl.rs index 1edc0a3a8..36b6dd3ca 100644 --- a/crates/core/worker-datasets-raw/src/job_impl.rs +++ b/crates/core/worker-datasets-raw/src/job_impl.rs @@ -102,7 +102,7 @@ use datasets_common::{ table_name::TableName, }; use datasets_raw::{ - client::{BlockStreamer as _, BlockStreamerExt as _, LatestBlockError}, + client::{BlockStreamer as _, BlockStreamerWithFallback, LatestBlockError}, dataset::Dataset as RawDataset, }; @@ -243,16 +243,28 @@ pub async fn execute( let kind = dataset.kind(); let network = dataset.network(); - let mut client = ctx + let kind_str = kind.to_string(); + let clients = ctx .ethcall_udfs_cache .providers_registry() - .create_block_stream_client(kind, network, metrics.as_ref().map(|m| m.meter())) + .create_block_stream_clients(kind, network, metrics.as_ref().map(|m| m.meter())) .await - .map_err(Error::CreateBlockStreamClient)? - .with_retry(); + .map_err(Error::CreateBlockStreamClient)?; - let provider_name = client.provider_name().to_string(); - tracing::info!("connected to provider: {provider_name}"); + let provider_names: Vec<_> = clients + .iter() + .map(|c| c.provider_name().to_string()) + .collect(); + let mut client = BlockStreamerWithFallback::new(clients).map_err(|_err| { + Error::CreateBlockStreamClient( + amp_providers_registry::CreateClientError::AllClientCreationsFailed { + kind: kind_str, + network: network.clone(), + }, + ) + })?; + + tracing::info!(providers = %provider_names.join(", "), "connected to providers"); let start = dataset.start_block().unwrap_or(0); let resolved = resolve_end_block(&end, start, client.latest_block(finalized_blocks_only)) @@ -261,7 +273,7 @@ pub async fn execute( let end = match resolved { ResolvedEndBlock::NoDataAvailable => { - tracing::warn!("no blocks available from provider: {provider_name}"); + tracing::warn!(providers = %provider_names.join(", "), "no blocks available from providers"); return Ok(()); } ResolvedEndBlock::Continuous => None, diff --git a/docs/feat/provider-registry.md b/docs/feat/provider-registry.md index fc8f8a413..e69312ca2 100644 --- a/docs/feat/provider-registry.md +++ b/docs/feat/provider-registry.md @@ -29,14 +29,14 @@ The Providers Registry manages external data source provider configurations (RPC This document uses two key terms that describe different parts of the data pipeline: -- **Provider**: An external service or endpoint that supplies blockchain data (e.g., a Firehose gRPC server, an EVM JSON-RPC node, a Solana RPC endpoint). The registry stores *configurations* for these external services. +- **Provider**: An external service or endpoint that supplies blockchain data (e.g., a Firehose gRPC server, an EVM JSON-RPC node, a Solana RPC endpoint). The registry stores _configurations_ for these external services. - **Client**: Internal code that connects to a provider and consumes its data. Clients are created by the registry from provider configurations. Examples include `BlockStreamClient` (for extracting blocks) and underlying protocol-specific clients like `evm_rpc_datasets::Client` or `firehose_datasets::Client`. > [!NOTE] > **EVM RPC Provider naming**: The `EvmRpcProvider` type follows [Alloy's naming convention](https://docs.rs/alloy-provider/latest/alloy_provider/) where "provider" refers to the RPC interface abstraction. In this codebase, it functions as a client that consumes data from an external EVM RPC endpoint. -**Relationship**: Provider configurations define *where* to connect; clients implement *how* to connect and extract data. +**Relationship**: Provider configurations define _where_ to connect; clients implement _how_ to connect and extract data. ## Architecture @@ -67,30 +67,31 @@ The registry creates clients that consume data from external provider endpoints. ### Configuration Cache Strategy -Provider configurations are cached in memory using a lazy-loaded, write-through strategy. +Provider configurations are cached in memory using a lazy-loaded, write-through strategy. This avoids repeated object store reads for frequently accessed configurations, ensures writes remain consistent without complex invalidation logic, and provides robustness against object store connection issues once configurations are loaded. **Read Path (Lazy Loading)** + 1. On first read request, check if cache is empty 2. If empty, enumerate all configuration files from object store 3. Parse each file into provider configuration and populate cache atomically 4. Return data from cache (subsequent reads skip object store) **Write Path (Write-Through)** + 1. Registration: Write TOML to object store first, then update cache 2. Deletion: Remove from object store first (idempotent), then remove from cache ### Provider Selection Strategy -When multiple providers exist for the same kind and network, the registry uses random selection to distribute load. -This prevents any single provider from becoming a bottleneck and provides natural load balancing without additional coordination infrastructure. +When multiple providers exist for the same kind and network, the registry selects providers in deterministic order by name (lexicographic). +The first provider whose environment variable substitution succeeds is selected. Since provider names determine the ordering, operators can influence priority through naming (e.g., prefixing names with `01_`, `02_` to control the sequence). ### Environment Variable Substitution -Provider configurations support `${VAR}` syntax for referencing environment variables. +Provider configurations support `${VAR}` syntax for referencing environment variables. Substitution occurs at load time before parsing, allowing secrets like API tokens to be injected from the environment rather than stored in configuration files. - ## Implementation ### Caching Behavior diff --git a/docs/feat/provider.md b/docs/feat/provider.md index 42fc2549f..e37c9baba 100644 --- a/docs/feat/provider.md +++ b/docs/feat/provider.md @@ -35,26 +35,26 @@ Dataset Manifest → Provider Resolution → Provider Config → Blockchain Conn ### Benefits -| Benefit | Description | -|---------|-------------| -| **Reusability** | Multiple datasets share the same provider | -| **Flexibility** | Switch endpoints without modifying datasets | -| **Load Balancing** | Random selection among matching providers | -| **Security** | Credentials isolated from dataset definitions | +| Benefit | Description | +| ------------------ | ------------------------------------------------- | +| **Reusability** | Multiple datasets share the same provider | +| **Flexibility** | Switch endpoints without modifying datasets | +| **Prioritization** | Deterministic selection by provider name ordering | +| **Security** | Credentials isolated from dataset definitions | ### Resolution Flow 1. Dataset requests provider by `(kind, network)` tuple 2. System finds all matching providers -3. Providers are shuffled for load balancing +3. Providers are tried in deterministic order by name 4. Each is tried with environment variable substitution 5. First successful connection is used ## Provider Types -| Kind | Protocol | Use Case | -|------|----------|----------| -| `evm-rpc` | JSON-RPC | EVM-compatible chains via HTTP/WS/IPC | -| `firehose` | gRPC | High-throughput streaming from StreamingFast | -| `solana` | JSON-RPC + CAR | Solana blockchain with Old Faithful archive | -| `static` | Object Store | Static CSV-backed datasets via amp-object-store | +| Kind | Protocol | Use Case | +| ---------- | -------------- | ----------------------------------------------- | +| `evm-rpc` | JSON-RPC | EVM-compatible chains via HTTP/WS/IPC | +| `firehose` | gRPC | High-throughput streaming from StreamingFast | +| `solana` | JSON-RPC + CAR | Solana blockchain with Old Faithful archive | +| `static` | Object Store | Static CSV-backed datasets via amp-object-store |