diff --git a/Cargo.lock b/Cargo.lock index 0f844698d..d79b3a111 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1195,6 +1195,7 @@ dependencies = [ "futures", "governor 0.10.4", "headers", + "memmap2", "monitoring", "pretty_assertions", "prost", @@ -6932,6 +6933,15 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "memmap2" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714098028fe011992e1c3962653c96b2d578c4b4bce9036e15ff220319b1e0e3" +dependencies = [ + "libc", +] + [[package]] name = "memmem" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index b88f55cd5..1cf0d68ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,6 +116,7 @@ hex = "0.4.3" indicatif = "0.18" indoc = "2.0.6" itertools = "0.14.0" +memmap2 = "0.9" opentelemetry = { version = "0.31", features = ["trace"] } opentelemetry-otlp = { version = "0.31", features = [ "grpc-tonic", diff --git a/crates/core/providers-solana/Cargo.toml b/crates/core/providers-solana/Cargo.toml index 2dc5f4d3c..b4ca4fb4a 100644 --- a/crates/core/providers-solana/Cargo.toml +++ b/crates/core/providers-solana/Cargo.toml @@ -23,6 +23,7 @@ datasets-raw = { path = "../datasets-raw" } futures.workspace = true fs-err.workspace = true governor.workspace = true +memmap2.workspace = true headers = { workspace = true } monitoring = { path = "../monitoring" } prost.workspace = true diff --git a/crates/core/providers-solana/examples/solana_car_download.rs b/crates/core/providers-solana/examples/solana_car_download.rs index 02f30515b..3c5012558 100644 --- a/crates/core/providers-solana/examples/solana_car_download.rs +++ b/crates/core/providers-solana/examples/solana_car_download.rs @@ -11,6 +11,7 @@ use std::{ time::{Duration, Instant}, }; +use amp_providers_solana::old_faithful::{car_download_url, car_filename}; use anyhow::Context; use backon::{ExponentialBuilder, Retryable}; use clap::Parser; @@ -89,7 +90,7 @@ async fn main() -> anyhow::Result<()> { .build()?; for epoch in cli.start_epoch..=end_epoch { - let dest = output_dir.join(format!("epoch-{epoch}.car")); + let dest = output_dir.join(car_filename(epoch)); let result = (|| ensure_car_file_exists(epoch, &reqwest, &dest)) .retry(ExponentialBuilder::new().without_max_times()) .when(should_retry) @@ -287,13 +288,6 @@ enum CarDownloadError { PartialDownloadNotSupported, } -/// Generates the Old Faithful CAR download URL for the given epoch. -/// -/// Reference: . -fn car_download_url(epoch: solana_clock::Epoch) -> String { - format!("https://files.old-faithful.net/{epoch}/epoch-{epoch}.car") -} - fn log_download_progress( epoch: solana_clock::Epoch, bytes_downloaded: u64, diff --git a/crates/core/providers-solana/examples/solana_compare.rs b/crates/core/providers-solana/examples/solana_compare.rs index f18619780..49c39a4fe 100644 --- a/crates/core/providers-solana/examples/solana_compare.rs +++ b/crates/core/providers-solana/examples/solana_compare.rs @@ -7,7 +7,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use amp_providers_solana::{ commitment_config, config::SolanaProviderConfig, non_empty_of1_slot, non_empty_rpc_slot, - of1_client, rpc_client, tables, + old_faithful, rpc_client, tables, }; use anyhow::Context; use backon::{ExponentialBuilder, Retryable}; @@ -81,11 +81,13 @@ async fn main() -> anyhow::Result<()> { commitment: Some(commitment_config(provider_cfg.commitment)), }; - let of1_stream = of1_client::stream( + let of1_stream = old_faithful::stream( start_slot, end_slot, reqwest, rpc_client.clone(), + // Local CAR archive source, None = use official archive. + None, get_block_config, // Metrics, we don't need to record them. None, diff --git a/crates/core/providers-solana/src/client.rs b/crates/core/providers-solana/src/client.rs index 9cd8cfa2b..a15877a02 100644 --- a/crates/core/providers-solana/src/client.rs +++ b/crates/core/providers-solana/src/client.rs @@ -10,6 +10,7 @@ use std::{collections::HashSet, sync::Mutex}; use std::{ num::{NonZeroU32, NonZeroU64}, + path::PathBuf, str::FromStr, sync::Arc, }; @@ -27,7 +28,7 @@ use solana_clock::Slot; use crate::{ config::UseArchive, error::{SlotConversionError, SlotConversionResult}, - metrics, of1_client, rpc_client, tables, + metrics, old_faithful, rpc_client, tables, }; /// Marker string that appears in Solana transaction log message lists when logs @@ -44,6 +45,7 @@ pub struct Client { network: NetworkId, provider_name: ProviderName, use_archive: UseArchive, + archive_dir: Option, commitment: CommitmentConfig, /// Monitor which epochs are currently being streamed to prevent overlapping /// streams. This is only used in debug builds as an additional safety check @@ -61,6 +63,7 @@ impl Client { network: NetworkId, provider_name: ProviderName, use_archive: UseArchive, + archive_dir: Option, commitment: CommitmentConfig, meter: Option<&monitoring::telemetry::metrics::Meter>, ) -> Self { @@ -94,6 +97,7 @@ impl Client { network, provider_name, use_archive, + archive_dir, commitment, #[cfg(debug_assertions)] epochs_in_progress: Arc::new(Mutex::new(HashSet::new())), @@ -102,7 +106,7 @@ impl Client { /// Core implementation of the block streaming logic, parameterized over the historical block stream source. /// For production use-cases the historical block stream will come from the Old Faithful archive (using the - /// [`of1_client`] module) and for testing we can provide a custom stream of decoded slots directly. + /// [`old_faithful`] module) and for testing we can provide a custom stream of decoded slots directly. /// /// NOTE: The reason this is marked as `pub` is because it is used in integration tests /// in the `tests` crate. @@ -116,7 +120,7 @@ impl Client { get_transaction_config: rpc_client::rpc_config::RpcTransactionConfig, ) -> impl Stream> where - T: Stream>, + T: Stream>, { async_stream::stream! { // Helper macro to simplify error handling and early returns in the stream. @@ -286,16 +290,17 @@ impl BlockStreamer for Client { let metrics = self .metrics .clone() - .map(|registry| of1_client::MetricsContext { + .map(|registry| old_faithful::metrics::Context { registry, provider: self.provider_name.clone(), network: self.network.clone(), }); - of1_client::stream( + old_faithful::stream( start, end, self.reqwest.clone(), self.main_rpc_client.clone(), + self.archive_dir.clone(), get_block_config, metrics, #[cfg(debug_assertions)] @@ -337,12 +342,12 @@ impl BlockStreamer for Client { } } -/// Converts [of1_client::DecodedSlot] to [tables::NonEmptySlot]. This conversion can fail if any +/// Converts [old_faithful::DecodedSlot] to [tables::NonEmptySlot]. This conversion can fail if any /// of the decoded fields do not match the expected format/values. pub fn non_empty_of1_slot( - slot: of1_client::DecodedSlot, + slot: old_faithful::DecodedSlot, ) -> SlotConversionResult { - let of1_client::DecodedSlot { + let old_faithful::DecodedSlot { slot, parent_slot, blockhash, @@ -625,7 +630,7 @@ mod tests { use url::Url; use super::Client; - use crate::{config::UseArchive, of1_client, rpc_client}; + use crate::{config::UseArchive, old_faithful, rpc_client}; #[tokio::test] async fn historical_blocks_only() { @@ -647,6 +652,7 @@ mod tests { network, provider_name, UseArchive::Auto, + None, // Archive source CommitmentConfig::finalized(), None, // Meter ); @@ -657,7 +663,7 @@ mod tests { // Stream the entire range as historical blocks. let historical = async_stream::stream! { for slot in start..=end { - yield Ok(of1_client::DecodedSlot::dummy(slot)); + yield Ok(old_faithful::DecodedSlot::dummy(slot)); } }; diff --git a/crates/core/providers-solana/src/config.rs b/crates/core/providers-solana/src/config.rs index b905102de..e9e35e2c6 100644 --- a/crates/core/providers-solana/src/config.rs +++ b/crates/core/providers-solana/src/config.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU32; +use std::{num::NonZeroU32, path::PathBuf}; use amp_providers_common::{network_id::NetworkId, redacted::Redacted}; use headers::{HeaderName, HeaderValue}; @@ -40,10 +40,17 @@ pub struct SolanaProviderConfig { /// Optional rate limit for RPC calls per second. pub max_rpc_calls_per_second: Option, - /// Controls when to use the Solana archive for historical data. + /// Controls when to use the Old Faithful CAR archive for historical data. #[serde(default)] pub use_archive: UseArchive, + /// Optional local path to the Old Faithful CAR archive directory for historical data. + /// + /// The directory is expected to have pre-downloaded CAR files that follow the naming + /// pattern established in [crate::old_faithful::car_filename]. This will be ensured + /// if the `solana-car-download` example is used to populate the directory. + pub archive_dir: Option, + /// Commitment level for RPC requests. #[serde(default)] pub commitment: CommitmentLevel, diff --git a/crates/core/providers-solana/src/error.rs b/crates/core/providers-solana/src/error.rs index e0b21afa4..67551515e 100644 --- a/crates/core/providers-solana/src/error.rs +++ b/crates/core/providers-solana/src/error.rs @@ -2,8 +2,6 @@ use amp_providers_common::network_id::NetworkId; use datasets_raw::{client::BlockStreamError, rows::TableRowError}; use yellowstone_faithful_car_parser as car_parser; -use crate::of1_client; - pub type SlotConversionResult = Result; pub type RowConversionResult = Result; @@ -253,7 +251,7 @@ impl SlotConversionError { /// (Content Addressable aRchive) files. These errors cover RPC communication, /// file handling, and CAR parsing failures. #[derive(Debug, thiserror::Error)] -pub enum Of1StreamError { +pub enum OldFaithfulStreamError { /// Failed to communicate with the Solana RPC client. /// /// This occurs when querying the RPC for slot information or block data, @@ -267,7 +265,7 @@ pub enum Of1StreamError { /// CAR files, which may include HTTP errors, unsupported range requests, or /// other file access problems. #[error("failed to stream CAR file")] - FileStream(#[source] of1_client::CarReaderError), + FileStream(#[source] CarReaderError), /// Encountered an unexpected node type while reading a block from CAR. /// @@ -376,37 +374,67 @@ pub enum Of1StreamError { BlocktimeOverflow { slot: u64, blocktime: u64 }, } -impl From for BlockStreamError { - fn from(value: Of1StreamError) -> Self { +impl From for BlockStreamError { + fn from(value: OldFaithfulStreamError) -> Self { // There is no catch-all here on purpose, to force consideration of // each error type when mapping to recoverable vs fatal. match value { - Of1StreamError::FileStream(of1_client::CarReaderError::Io(_)) - | Of1StreamError::FileStream(of1_client::CarReaderError::Http( + OldFaithfulStreamError::FileStream(CarReaderError::Io(_)) + | OldFaithfulStreamError::FileStream(CarReaderError::Http( reqwest::StatusCode::NOT_FOUND, )) - | Of1StreamError::FileStream(of1_client::CarReaderError::RangeRequestUnsupported) - | Of1StreamError::UnexpectedNode { .. } - | Of1StreamError::MissingNode { .. } - | Of1StreamError::RewardSlotMismatch { .. } - | Of1StreamError::Zstd { .. } - | Of1StreamError::Bincode(_) - | Of1StreamError::DecodeField { .. } - | Of1StreamError::NodeParse(_) - | Of1StreamError::DataframeReassembly(_) - | Of1StreamError::DecodeBase58(_) - | Of1StreamError::TryIntoArray { .. } - | Of1StreamError::BlocktimeOverflow { .. } => BlockStreamError::Fatal(value.into()), - - Of1StreamError::RpcClient(_) - | Of1StreamError::FileStream(of1_client::CarReaderError::Http(_)) - | Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => { + | OldFaithfulStreamError::FileStream(CarReaderError::RangeRequestUnsupported) + | OldFaithfulStreamError::UnexpectedNode { .. } + | OldFaithfulStreamError::MissingNode { .. } + | OldFaithfulStreamError::RewardSlotMismatch { .. } + | OldFaithfulStreamError::Zstd { .. } + | OldFaithfulStreamError::Bincode(_) + | OldFaithfulStreamError::DecodeField { .. } + | OldFaithfulStreamError::NodeParse(_) + | OldFaithfulStreamError::DataframeReassembly(_) + | OldFaithfulStreamError::DecodeBase58(_) + | OldFaithfulStreamError::TryIntoArray { .. } + | OldFaithfulStreamError::BlocktimeOverflow { .. } => { + BlockStreamError::Fatal(value.into()) + } + + OldFaithfulStreamError::RpcClient(_) + | OldFaithfulStreamError::FileStream(CarReaderError::Http(_)) + | OldFaithfulStreamError::FileStream(CarReaderError::Reqwest(_)) => { BlockStreamError::Recoverable(value.into()) } } } } +/// Errors that can occur during streaming of Solana blocks from Old Faithful v1 (OF1) CAR files. +#[derive(Debug, thiserror::Error)] +pub enum CarReaderError { + /// IO error when reading the CAR file. + /// + /// This can occur due to network issues, file corruption, or other problems when + /// accessing the CAR file. + #[error("IO error: {0}")] + Io(#[source] std::io::Error), + /// Reqwest error when connecting to or reading from the CAR file. + /// + /// This can occur due to network issues, timeouts, or other problems when making + /// HTTP requests to access the CAR file. + #[error("Reqwest error: {0}")] + Reqwest(#[source] reqwest::Error), + /// The server responded with an HTTP error status code when trying to access the CAR file. + /// + /// This can occur due to network issues, server problems, or if the CAR file is not available. + #[error("HTTP error: {0}")] + Http(reqwest::StatusCode), + /// The server does not support HTTP range requests. + /// + /// This is a non-recoverable error because the Old Faithful reader relies on range requests + /// to be able to resume interrupted downloads without re-downloading the entire CAR. + #[error("server does not support range requests")] + RangeRequestUnsupported, +} + /// Error during Solana client initialization or operation. #[derive(Debug, thiserror::Error)] pub enum ClientError { diff --git a/crates/core/providers-solana/src/lib.rs b/crates/core/providers-solana/src/lib.rs index c3ed7db73..7dc8cc608 100644 --- a/crates/core/providers-solana/src/lib.rs +++ b/crates/core/providers-solana/src/lib.rs @@ -27,7 +27,7 @@ pub mod config; pub mod error; pub mod kind; pub mod metrics; -pub mod of1_client; +pub mod old_faithful; pub mod rpc_client; pub mod tables; @@ -103,6 +103,7 @@ pub fn client( config.network, name, config.use_archive, + config.archive_dir, commitment, meter, ); diff --git a/crates/core/providers-solana/src/of1_client.rs b/crates/core/providers-solana/src/of1_client.rs deleted file mode 100644 index ad83f6fec..000000000 --- a/crates/core/providers-solana/src/of1_client.rs +++ /dev/null @@ -1,799 +0,0 @@ -#[cfg(debug_assertions)] -use std::{collections::HashSet, sync::Mutex}; -use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; - -use amp_providers_common::{network_id::NetworkId, provider_name::ProviderName}; -use futures::{FutureExt, Stream, StreamExt}; -use yellowstone_faithful_car_parser as car_parser; - -use crate::{error::Of1StreamError, metrics, rpc_client}; - -const TX_STATUS_META_FIELD: &str = "transaction status meta"; -const BLOCK_REWARDS_FIELD: &str = "block rewards"; - -pub type DecodedTransactionStatusMeta = DecodedField< - solana_storage_proto::confirmed_block::TransactionStatusMeta, - solana_storage_proto::StoredTransactionStatusMetaVersioned, ->; - -pub type DecodedBlockRewards = DecodedField< - solana_storage_proto::confirmed_block::Rewards, - solana_storage_proto::StoredExtendedRewards, ->; - -pub enum DecodedField { - Proto(P), - Bincode(B), -} - -#[derive(Default)] -pub struct DecodedSlot { - pub slot: solana_clock::Slot, - pub parent_slot: solana_clock::Slot, - pub blockhash: [u8; 32], - pub prev_blockhash: [u8; 32], - pub block_height: Option, - pub blocktime: i64, - pub transactions: Vec, - pub transaction_metas: Vec>, - pub block_rewards: Option, -} - -impl DecodedSlot { - /// Create a dummy `DecodedSlot` with the given slot number and default values for all - /// other fields. This can be used for testing or as a placeholder when only the slot - /// number is relevant. - /// - /// NOTE: The reason this is marked as `pub` is because it is used in integration tests - /// in the `tests` crate. - #[doc(hidden)] - pub fn dummy(slot: solana_clock::Slot) -> Self { - Self { - slot, - parent_slot: slot.saturating_sub(1), - ..Default::default() - } - } -} - -/// Context for OF1 streaming that can be passed to functions that need to report metrics. -#[derive(Debug, Clone)] -pub struct MetricsContext { - pub provider: ProviderName, - pub network: NetworkId, - pub registry: Arc, -} - -/// Create a stream of decoded slots for the given epoch by reading from the -/// corresponding CAR file downloaded from the Old Faithful archive. -#[allow(clippy::too_many_arguments)] -pub fn stream( - start: solana_clock::Slot, - end: solana_clock::Slot, - reqwest: Arc, - solana_rpc_client: Arc, - get_block_config: rpc_client::rpc_config::RpcBlockConfig, - metrics: Option, - #[cfg(debug_assertions)] epochs_in_progress: Arc>>, -) -> impl Stream> { - async_stream::stream! { - // Pre-fetch the initial previous block hash via JSON-RPC so that we don't have to - // (potentially) read multiple CAR files to find it. - let mut prev_blockhash = if start == 0 { - // Known previous blockhash for genesis mainnet block. - bs58::decode("4sGjMW1sUnHzSxGspuhpqLDx6wiyjNtZAMdL4VZHirAn") - .into_vec() - .map(TryInto::try_into) - .map_err(Of1StreamError::DecodeBase58)? - .map_err(|vec: Vec<_>| Of1StreamError::TryIntoArray { - expected_len: 32, actual_len: vec.len() - })? - } else { - let mut slot = start; - loop { - let metrics = metrics.clone().map(|m| m.registry); - let resp = solana_rpc_client - .get_block(slot, get_block_config, metrics) - .await; - - match resp { - Ok(block) => { - break bs58::decode(block.previous_blockhash) - .into_vec() - .map(TryInto::try_into) - .map_err(Of1StreamError::DecodeBase58)? - .map_err(|vec: Vec<_>| Of1StreamError::TryIntoArray { - expected_len: 32, actual_len: vec.len() - })?; - } - Err(e) if rpc_client::is_block_missing_err(&e) => slot += 1, - Err(e) => { - yield Err(Of1StreamError::RpcClient(e)); - return; - } - } - } - }; - - let start_epoch = start / solana_clock::DEFAULT_SLOTS_PER_EPOCH; - let end_epoch = end / solana_clock::DEFAULT_SLOTS_PER_EPOCH; - - for epoch in start_epoch..=end_epoch { - #[cfg(debug_assertions)] - let _guard = epoch_supervision::Guard::new(epochs_in_progress.as_ref(), epoch); - - let reader = CarReader::new( - epoch, - reqwest.clone(), - metrics.clone() - ); - let mut node_reader = car_parser::node::NodeReader::new(reader); - - while let Some(slot) = read_next_slot(&mut node_reader, prev_blockhash) - .await - .transpose() - { - let slot = match slot { - Ok(slot) => slot, - // IO errors from the node reader could come from the underlying `CarReader`. - // Try to downcast to `CarReaderError` to determine how to map into `Of1StreamError`. - // - // NOTE: There should be no retry logic here because the `CarReader` should - // handle all retry logic internally and only return an error when a non-recoverable - // error occurs. - Err(Of1StreamError::NodeParse(car_parser::node::NodeError::Io(io_err))) => { - match io_err.downcast::() { - // No more CAR files available, not an error. - Ok(CarReaderError::Http(reqwest::StatusCode::NOT_FOUND)) => {}, - // Non-recoverable error from the `CarReader`. - Ok(car_err) => { - yield Err(Of1StreamError::FileStream(car_err)); - } - // Non-recoverable IO error from the `NodeParser`. - Err(io_err) => { - let original_err = Of1StreamError::NodeParse( - car_parser::node::NodeError::Io(io_err) - ); - yield Err(original_err); - } - }; - return; - } - Err(e) => { - yield Err(e); - return; - } - }; - prev_blockhash = slot.blockhash; - - if slot.slot < start { - // Skip blocks before the start of the requested range. - continue; - } - - match slot.slot.cmp(&end) { - std::cmp::Ordering::Less => { - yield Ok(slot); - } - std::cmp::Ordering::Equal => { - yield Ok(slot); - return; - } - std::cmp::Ordering::Greater => { - return; - } - } - } - } - } -} - -/// Errors that can occur during streaming of Solana blocks from Old Faithful -/// v1 (OF1) CAR files. -#[derive(Debug, thiserror::Error)] -pub enum CarReaderError { - /// IO error when reading the CAR file. - /// - /// This can occur due to network issues, file corruption, or other problems when - /// accessing the CAR file. - #[error("IO error: {0}")] - Io(#[source] std::io::Error), - /// Reqwest error when connecting to or reading from the CAR file. - /// - /// This can occur due to network issues, timeouts, or other problems when making - /// HTTP requests to access the CAR file. - #[error("Reqwest error: {0}")] - Reqwest(#[source] reqwest::Error), - /// The server responded with an HTTP error status code when trying to access the CAR file. - /// - /// This can occur due to network issues, server problems, or if the CAR file is not available. - #[error("HTTP error: {0}")] - Http(reqwest::StatusCode), - /// The server does not support HTTP range requests. - /// - /// This is a non-recoverable error because the [`CarReader`] relies on range - /// requests to be able to resume interrupted downloads without re-downloading - /// the entire CAR. - #[error("server does not support range requests")] - RangeRequestUnsupported, -} - -/// Read an entire block worth of nodes from the given node reader and decode them into -/// a [DecodedSlot]. -/// -/// Inspired by the Old Faithful CAR parser example: -/// -async fn read_next_slot( - node_reader: &mut car_parser::node::NodeReader, - prev_blockhash: [u8; 32], -) -> Result, Of1StreamError> { - // Once we reach `Node::Block`, the node map will contain all of the nodes needed to reassemble - // that block. - let nodes = car_parser::node::Nodes::read_until_block(node_reader) - .await - .map_err(Of1StreamError::NodeParse)?; - - let block = match nodes.nodes.last() { - // Expected block node. - Some((_, car_parser::node::Node::Block(block))) => block, - // Reached end of CAR file. - None | Some((_, car_parser::node::Node::Epoch(_))) => return Ok(None), - Some((cid, node)) => { - return Err(Of1StreamError::UnexpectedNode { - kind: node.kind(), - cid: (*cid).into(), - }); - } - }; - - let mut transactions = Vec::new(); - let mut transaction_metas = Vec::new(); - - for entry_cid in &block.entries { - let Some(car_parser::node::Node::Entry(entry)) = nodes.nodes.get(entry_cid) else { - return Err(Of1StreamError::MissingNode { - expected: "entry", - cid: entry_cid.to_string(), - }); - }; - for tx_cid in &entry.transactions { - let Some(car_parser::node::Node::Transaction(tx)) = nodes.nodes.get(tx_cid) else { - return Err(Of1StreamError::MissingNode { - expected: "transaction", - cid: tx_cid.to_string(), - }); - }; - - nodes - .reassemble_dataframes(&tx.data) - .map_err(Of1StreamError::DataframeReassembly) - .and_then(|tx_df| bincode::deserialize(&tx_df).map_err(Of1StreamError::Bincode)) - .map(|tx| { - transactions.push(tx); - })?; - nodes - .reassemble_dataframes(&tx.metadata) - .map_err(Of1StreamError::DataframeReassembly) - .and_then(|meta_df| { - if meta_df.is_empty() { - Ok(None) - } else { - zstd_decompress(TX_STATUS_META_FIELD, &meta_df) - .and_then(|meta| decode_tx_meta(block.slot, &meta)) - .map(Some) - } - }) - .map(|meta| { - transaction_metas.push(meta); - })?; - } - } - - let block_rewards = nodes - .nodes - .get(&block.rewards) - .map(|rewards| { - let car_parser::node::Node::Rewards(rewards) = rewards else { - return Err(Of1StreamError::UnexpectedNode { - kind: rewards.kind(), - cid: block.rewards.to_string(), - }); - }; - if rewards.slot != block.slot { - return Err(Of1StreamError::RewardSlotMismatch { - expected: block.slot, - found: rewards.slot, - }); - } - - nodes - .reassemble_dataframes(&rewards.data) - .map_err(Of1StreamError::DataframeReassembly) - .and_then(|block_rewards_df| { - zstd_decompress(BLOCK_REWARDS_FIELD, &block_rewards_df) - }) - .and_then(|rewards_df| { - decode_proto_or_bincode(block.slot, BLOCK_REWARDS_FIELD, rewards_df.as_slice()) - }) - }) - .transpose()?; - - let blockhash = - { - // Hash of the last entry has the same value as that block's `blockhash` in - // CAR files. - let last_entry_cid = block.entries.last().expect("at least one entry"); - let last_entry_node = nodes.nodes.get(last_entry_cid); - let Some(car_parser::node::Node::Entry(last_entry)) = last_entry_node else { - return Err(Of1StreamError::MissingNode { - expected: "entry", - cid: last_entry_cid.to_string(), - }); - }; - last_entry.hash.clone().try_into().map_err(|vec: Vec| { - Of1StreamError::TryIntoArray { - expected_len: 32, - actual_len: vec.len(), - } - })? - }; - - let blocktime = - block - .meta - .blocktime - .try_into() - .map_err(|_| Of1StreamError::BlocktimeOverflow { - slot: block.slot, - blocktime: block.meta.blocktime, - })?; - - let block = DecodedSlot { - slot: block.slot, - parent_slot: block.meta.parent_slot, - blockhash, - prev_blockhash, - block_height: block.meta.block_height, - blocktime, - transactions, - transaction_metas, - block_rewards, - }; - - Ok(Some(block)) -} - -/// Attempt to decode a field read from a CAR file as either protobuf or bincode encoded. -/// Fail if both decoding attempts fail. All fields that need to be decoded this way are -/// ZSTD compressed, so the input data to this function is expected to already be -/// decompressed. -/// -/// For some epochs transaction metadata / block rewards are stored as protobuf encoded, -/// while for others they are stored as bincode encoded. This function handles both cases. -fn decode_proto_or_bincode( - slot: solana_clock::Slot, - field_name: &'static str, - decompressed_input: &[u8], -) -> Result, Of1StreamError> -where - P: prost::Message + Default, - B: serde::de::DeserializeOwned, -{ - match prost::Message::decode(decompressed_input).map(DecodedField::Proto) { - Ok(data_proto) => Ok(data_proto), - Err(prost_err) => { - match bincode::deserialize(decompressed_input).map(DecodedField::Bincode) { - Ok(data_bincode) => Ok(data_bincode), - Err(bincode_err) => { - let err = Of1StreamError::DecodeField { - slot, - field_name, - prost_err: prost_err.to_string(), - bincode_err: bincode_err.to_string(), - }; - Err(err) - } - } - } - } -} - -/// Decode transaction metadata that may be encoded in either protobuf or bincode format, -/// depending on the epoch. Bincode deserialization handles multiple legacy formats internally -/// via [`solana_storage_proto::StoredTransactionStatusMetaVersioned`]. -/// -/// Transaction metadata passed in should already be ZSTD decompressed. -fn decode_tx_meta( - slot: solana_clock::Slot, - decompressed_tx_meta: &[u8], -) -> Result { - // Try protobuf first. - match prost::Message::decode(decompressed_tx_meta) { - Ok(proto_meta) => Ok(DecodedField::Proto(proto_meta)), - Err(prost_err) => { - // Try all bincode versions (current, legacy v2, legacy v1). - match solana_storage_proto::StoredTransactionStatusMetaVersioned::from_bincode( - decompressed_tx_meta, - ) { - Ok(meta) => Ok(DecodedField::Bincode(meta)), - Err(bincode_err) => { - let err = Of1StreamError::DecodeField { - slot, - field_name: TX_STATUS_META_FIELD, - prost_err: prost_err.to_string(), - bincode_err: bincode_err.to_string(), - }; - // Logging the full decompressed transaction metadata can be helpful for - // debugging decoding issues, even though it can be large and clutter the - // logs. - tracing::error!( - data = ?decompressed_tx_meta, - error = ?err, - error_source = monitoring::logging::error_source(&err), - "failed to decode transaction status meta" - ); - - Err(err) - } - } - } - } -} - -fn zstd_decompress(field_name: &'static str, input: &[u8]) -> Result, Of1StreamError> { - zstd::decode_all(input).map_err(|err| Of1StreamError::Zstd { - field_name, - error: err.to_string(), - }) -} - -type ConnectFuture = Pin> + Send>>; -type ByteStream = Pin> + Send>>; -type BackoffFuture = Pin>; - -struct ByteStreamMonitor { - epoch: solana_clock::Epoch, - bytes_read_chunk: u64, - started_at: std::time::Instant, - provider: ProviderName, - network: NetworkId, - registry: Arc, -} - -impl ByteStreamMonitor { - const BYTES_READ_RECORD_THRESHOLD: u64 = 10 * 1024 * 1024; // 10 MiB - - /// Record the number of bytes read and report to metrics if the reporting threshold is reached. - fn record_bytes_read(&mut self, n: u64) { - self.bytes_read_chunk += n; - - if self.bytes_read_chunk >= Self::BYTES_READ_RECORD_THRESHOLD { - self.registry.record_of1_car_download_bytes( - self.bytes_read_chunk, - self.epoch, - &self.provider, - &self.network, - ); - self.bytes_read_chunk = 0; - } - } - - /// Record any remaining bytes read that didn't reach the reporting threshold. - fn flush_bytes_read(&mut self) { - if self.bytes_read_chunk > 0 { - self.registry.record_of1_car_download_bytes( - self.bytes_read_chunk, - self.epoch, - &self.provider, - &self.network, - ); - self.bytes_read_chunk = 0; - } - } - - fn record_car_download(&mut self) { - let elapsed = self.started_at.elapsed().as_secs_f64(); - self.registry - .record_of1_car_download(elapsed, self.epoch, &self.provider, &self.network); - self.flush_bytes_read(); - } - - fn record_car_download_error(&mut self) { - self.registry - .record_of1_car_download_error(self.epoch, &self.provider, &self.network); - } -} - -impl Drop for ByteStreamMonitor { - fn drop(&mut self) { - self.flush_bytes_read(); - } -} - -struct MonitoredByteStream { - stream: ByteStream, - monitor: Option, -} - -impl MonitoredByteStream { - fn new( - stream: impl Stream> + Send + 'static, - epoch: solana_clock::Epoch, - metrics: Option, - ) -> Self { - let stream = Box::pin(stream); - let monitor = metrics.map(|metrics| ByteStreamMonitor { - epoch, - bytes_read_chunk: 0, - started_at: std::time::Instant::now(), - provider: metrics.provider, - network: metrics.network, - registry: metrics.registry, - }); - Self { stream, monitor } - } -} - -impl Stream for MonitoredByteStream { - type Item = ::Item; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let this = self.get_mut(); - let poll = this.stream.poll_next_unpin(cx); - - if let Some(m) = this.monitor.as_mut() { - match &poll { - std::task::Poll::Ready(Some(Ok(bytes))) => { - m.record_bytes_read(bytes.len() as u64); - } - std::task::Poll::Ready(Some(Err(_))) => { - m.record_car_download_error(); - } - std::task::Poll::Ready(None) => { - m.record_car_download(); - } - _ => {} - } - } - - poll - } -} - -enum ReaderState { - /// A single in-flight HTTP request to (re)connect. - Connect(ConnectFuture), - /// We have an active byte stream. - Stream(MonitoredByteStream), - /// We are waiting until a backoff deadline before attempting reconnect. - Backoff(BackoffFuture), -} - -struct CarReader { - url: String, - epoch: solana_clock::Epoch, - reqwest: Arc, - state: ReaderState, - overflow: Vec, - bytes_read_total: u64, - - // Backoff control - reconnect_attempt: u32, - max_backoff: Duration, - base_backoff: Duration, - - metrics: Option, -} - -impl CarReader { - fn new( - epoch: solana_clock::Epoch, - reqwest: Arc, - metrics: Option, - ) -> Self { - let url = car_download_url(epoch); - let connect_fut = get_with_range_header(reqwest.clone(), url.clone(), 0); - - Self { - url, - epoch, - reqwest, - state: ReaderState::Connect(Box::pin(connect_fut)), - overflow: Vec::new(), - bytes_read_total: 0, - reconnect_attempt: 0, - base_backoff: Duration::from_millis(100), - max_backoff: Duration::from_secs(30), - metrics, - } - } - - fn schedule_backoff(&mut self, err: CarReaderError) { - self.reconnect_attempt = self.reconnect_attempt.saturating_add(1); - let backoff = compute_backoff(self.base_backoff, self.max_backoff, self.reconnect_attempt); - - let backoff_str = format!("{:.1}s", backoff.as_secs_f32()); - tracing::warn!( - epoch = self.epoch, - bytes_read = self.bytes_read_total, - attempt = self.reconnect_attempt, - error = ?err, - error_source = monitoring::logging::error_source(&err), - backoff = %backoff_str, - "CAR reader failed; scheduled retry" - ); - - let fut = tokio::time::sleep(backoff); - self.state = ReaderState::Backoff(Box::pin(fut)); - } -} - -impl tokio::io::AsyncRead for CarReader { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - let this = self.get_mut(); - - // Drain overflow first. - if !this.overflow.is_empty() { - let to_copy = this.overflow.len().min(buf.remaining()); - buf.put_slice(&this.overflow[..to_copy]); - this.overflow.drain(..to_copy); - return std::task::Poll::Ready(Ok(())); - } - - // Retry loop, return on successful read, EOF, or non-recoverable error (RangeRequestUnsupported). - loop { - match &mut this.state { - ReaderState::Connect(fut) => match fut.as_mut().poll(cx) { - std::task::Poll::Ready(Ok(resp)) => { - let status = resp.status(); - // Handle error codes. - match status { - reqwest::StatusCode::NOT_FOUND => { - let err = std::io::Error::other(CarReaderError::Http(status)); - return std::task::Poll::Ready(Err(err)); - } - status if !status.is_success() => { - this.schedule_backoff(CarReaderError::Http(status)); - continue; - } - _ => {} - } - - // Handle partial content. - if this.bytes_read_total > 0 - && status != reqwest::StatusCode::PARTIAL_CONTENT - { - let e = std::io::Error::other(CarReaderError::RangeRequestUnsupported); - return std::task::Poll::Ready(Err(e)); - } - - // Initial connection succeeded, start reading the byte stream. - this.reconnect_attempt = 0; - let stream = MonitoredByteStream::new( - resp.bytes_stream(), - this.epoch, - this.metrics.clone(), - ); - this.state = ReaderState::Stream(stream); - } - std::task::Poll::Ready(Err(e)) => { - this.schedule_backoff(CarReaderError::Reqwest(e)); - } - std::task::Poll::Pending => return std::task::Poll::Pending, - }, - ReaderState::Stream(stream) => match stream.poll_next_unpin(cx) { - // Reached EOF. - std::task::Poll::Ready(None) => { - return std::task::Poll::Ready(Ok(())); - } - // Read some bytes, account for possible overflow. - std::task::Poll::Ready(Some(Ok(bytes))) => { - let n_read = bytes.len(); - let to_copy = n_read.min(buf.remaining()); - - buf.put_slice(&bytes[..to_copy]); - this.overflow.extend_from_slice(&bytes[to_copy..]); - this.bytes_read_total += n_read as u64; - - return std::task::Poll::Ready(Ok(())); - } - std::task::Poll::Ready(Some(Err(e))) => { - this.schedule_backoff(CarReaderError::Reqwest(e)); - } - std::task::Poll::Pending => return std::task::Poll::Pending, - }, - ReaderState::Backoff(fut) => match fut.poll_unpin(cx) { - std::task::Poll::Ready(()) => { - let fut = get_with_range_header( - this.reqwest.clone(), - this.url.clone(), - this.bytes_read_total, - ); - this.state = ReaderState::Connect(Box::pin(fut)); - } - std::task::Poll::Pending => return std::task::Poll::Pending, - }, - } - } - } -} - -async fn get_with_range_header( - reqwest: Arc, - url: String, - offset: u64, -) -> Result { - let mut req = reqwest.get(&url); - if offset > 0 { - req = req.header(reqwest::header::RANGE, format!("bytes={offset}-")); - } - - req.send().await -} - -fn compute_backoff(base: Duration, cap: Duration, attempt: u32) -> Duration { - // attempt=1 => base, attempt=2 => 2*base, attempt=3 => 4*base, ... - let factor = 1u64 << attempt.saturating_sub(1).min(30); - let backoff = base.saturating_mul(factor as u32); - backoff.min(cap) -} - -/// Generates the Old Faithful CAR download URL for the given epoch. -/// -/// Reference: . -fn car_download_url(epoch: solana_clock::Epoch) -> String { - format!("https://files.old-faithful.net/{epoch}/epoch-{epoch}.car") -} - -#[cfg(debug_assertions)] -mod epoch_supervision { - use super::{HashSet, Mutex}; - - /// Guard that tracks in-progress epochs to detect overlapping Solana streams in debug builds. - /// - /// # Panics - /// - /// Panics if an attempt is made to [create](Guard::new) a guard for an epoch that is already - /// in progress, or if a guard is dropped for an epoch that is not currently in progress. - pub struct Guard<'a> { - epoch: solana_clock::Epoch, - in_progress_epochs: &'a Mutex>, - } - - impl<'a> Guard<'a> { - pub fn new( - in_progress_epochs: &'a Mutex>, - epoch: solana_clock::Epoch, - ) -> Self { - let mut epochs_in_progress = in_progress_epochs.lock().unwrap(); - let is_new = epochs_in_progress.insert(epoch); - assert!( - is_new, - "epoch {epoch} already in progress, overlapping Solana streams are not allowed" - ); - Self { - epoch, - in_progress_epochs, - } - } - } - - impl<'a> Drop for Guard<'a> { - fn drop(&mut self) { - let mut epochs_in_progress = self.in_progress_epochs.lock().unwrap(); - let removed = epochs_in_progress.remove(&self.epoch); - assert!( - removed, - "epoch {} was not in progress during drop, this should never happen", - self.epoch - ); - } - } -} diff --git a/crates/core/providers-solana/src/old_faithful.rs b/crates/core/providers-solana/src/old_faithful.rs new file mode 100644 index 000000000..c8fb811d0 --- /dev/null +++ b/crates/core/providers-solana/src/old_faithful.rs @@ -0,0 +1,188 @@ +//! Old Faithful v1 (OF1) block streaming for Solana. +//! +//! Streams decoded Solana blocks from [Old Faithful] CAR archive files, either +//! from a remote HTTP source or from a local directory of pre-downloaded files. +//! Each epoch is stored as a single CAR file containing a DAG of blocks, +//! entries, transactions, and rewards that are reassembled into [`DecodedSlot`] +//! values. +//! +//! [Old Faithful]: https://docs.old-faithful.net + +mod decode; +#[cfg(debug_assertions)] +mod epoch_supervision; +pub mod metrics; +mod reader; + +#[cfg(debug_assertions)] +use std::{collections::HashSet, sync::Mutex}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +pub use decode::{DecodedBlockRewards, DecodedField, DecodedSlot, DecodedTransactionStatusMeta}; +use futures::Stream; +use yellowstone_faithful_car_parser as car_parser; + +use self::{decode::read_and_decode_slot, metrics::MonitoredAsyncRead}; +use crate::{ + error::{CarReaderError, OldFaithfulStreamError}, + rpc_client, +}; + +/// Create a stream of decoded slots for the given epoch by reading from the +/// corresponding CAR file downloaded from the Old Faithful archive. +#[allow(clippy::too_many_arguments)] +pub fn stream( + start: solana_clock::Slot, + end: solana_clock::Slot, + reqwest: Arc, + solana_rpc_client: Arc, + archive_dir: Option, + get_block_config: rpc_client::rpc_config::RpcBlockConfig, + metrics: Option, + #[cfg(debug_assertions)] epochs_in_progress: Arc>>, +) -> impl Stream> { + async_stream::stream! { + // Pre-fetch the initial previous block hash via JSON-RPC so that we don't have to + // (potentially) read multiple CAR files to find it. + let mut prev_blockhash = if start == 0 { + // Known previous blockhash for genesis mainnet block. + bs58::decode("4sGjMW1sUnHzSxGspuhpqLDx6wiyjNtZAMdL4VZHirAn") + .into_vec() + .map(TryInto::try_into) + .map_err(OldFaithfulStreamError::DecodeBase58)? + .map_err(|vec: Vec<_>| OldFaithfulStreamError::TryIntoArray { + expected_len: 32, actual_len: vec.len() + })? + } else { + let mut slot = start; + loop { + let metrics = metrics.clone().map(|m| m.registry); + let resp = solana_rpc_client + .get_block(slot, get_block_config, metrics) + .await; + + match resp { + Ok(block) => { + break bs58::decode(block.previous_blockhash) + .into_vec() + .map(TryInto::try_into) + .map_err(OldFaithfulStreamError::DecodeBase58)? + .map_err(|vec: Vec<_>| OldFaithfulStreamError::TryIntoArray { + expected_len: 32, actual_len: vec.len() + })?; + } + Err(e) if rpc_client::is_block_missing_err(&e) => slot += 1, + Err(e) => { + yield Err(OldFaithfulStreamError::RpcClient(e)); + return; + } + } + } + }; + + let start_epoch = start / solana_clock::DEFAULT_SLOTS_PER_EPOCH; + let end_epoch = end / solana_clock::DEFAULT_SLOTS_PER_EPOCH; + + for epoch in start_epoch..=end_epoch { + #[cfg(debug_assertions)] + let _guard = epoch_supervision::Guard::new(epochs_in_progress.as_ref(), epoch); + + + let mut node_reader = make_reader(epoch, reqwest.clone(), archive_dir.as_deref()) + .map(|r| MonitoredAsyncRead::new(r, epoch, metrics.clone())) + .map(car_parser::node::NodeReader::new) + .map_err(OldFaithfulStreamError::FileStream)?; + + while let Some(slot) = read_and_decode_slot(&mut node_reader, prev_blockhash) + .await + .transpose() + { + let slot = match slot { + Ok(slot) => slot, + // IO errors from the node reader could come from the underlying CAR reader. + // Try to downcast to `CarReaderError` to determine how to map into + // `OldFaithfulStreamError`. + // + // NOTE: There should be no retry logic here because the CAR reader should handle + // all retry logic internally and only return an error when a non-recoverable + // error occurs. + Err(OldFaithfulStreamError::NodeParse(car_parser::node::NodeError::Io(io_err))) => { + match io_err.downcast::() { + // No more CAR files available, not an error. + Ok(CarReaderError::Http(reqwest::StatusCode::NOT_FOUND)) => {}, + // Non-recoverable error from the `CarReader`. + Ok(car_err) => { + yield Err(OldFaithfulStreamError::FileStream(car_err)); + } + // Non-recoverable IO error from the `NodeParser`. + Err(io_err) => { + let original_err = OldFaithfulStreamError::NodeParse( + car_parser::node::NodeError::Io(io_err) + ); + yield Err(original_err); + } + }; + return; + } + Err(e) => { + yield Err(e); + return; + } + }; + prev_blockhash = slot.blockhash; + + if slot.slot < start { + // Skip blocks before the start of the requested range. + continue; + } + + match slot.slot.cmp(&end) { + std::cmp::Ordering::Less => { + yield Ok(slot); + } + std::cmp::Ordering::Equal => { + yield Ok(slot); + return; + } + std::cmp::Ordering::Greater => { + return; + } + } + } + } + } +} + +/// Generates a filename that follows a consistent naming convention for Old Faithful CAR files. +/// +/// Reference: . +pub fn car_filename(epoch: solana_clock::Epoch) -> String { + format!("epoch-{epoch}.car") +} + +/// Generates the Old Faithful CAR download URL for the given epoch. +/// +/// Reference: . +pub fn car_download_url(epoch: solana_clock::Epoch) -> String { + format!("https://files.old-faithful.net/{epoch}/epoch-{epoch}.car") +} + +fn make_reader( + epoch: solana_clock::Epoch, + reqwest: Arc, + archive_dir: Option<&Path>, +) -> Result, CarReaderError> { + match archive_dir { + Some(dir) => { + let r = reader::local(epoch, dir).map_err(CarReaderError::Io)?; + Ok(Box::new(r)) + } + None => { + let r = reader::remote(epoch, reqwest); + Ok(Box::new(r)) + } + } +} diff --git a/crates/core/providers-solana/src/old_faithful/decode.rs b/crates/core/providers-solana/src/old_faithful/decode.rs new file mode 100644 index 000000000..8614353d9 --- /dev/null +++ b/crates/core/providers-solana/src/old_faithful/decode.rs @@ -0,0 +1,291 @@ +//! Decoding of CAR file nodes into [`DecodedSlot`] values. +//! +//! CAR files store block data as a DAG of nodes (blocks, entries, transactions, +//! rewards). This module reassembles those nodes, decompresses ZSTD-compressed +//! fields, and decodes protobuf or bincode payloads into the types used by the +//! rest of the pipeline. + +use yellowstone_faithful_car_parser as car_parser; + +use crate::error::OldFaithfulStreamError; + +const TX_STATUS_META_FIELD: &str = "transaction status meta"; +const BLOCK_REWARDS_FIELD: &str = "block rewards"; + +pub type DecodedTransactionStatusMeta = DecodedField< + solana_storage_proto::confirmed_block::TransactionStatusMeta, + solana_storage_proto::StoredTransactionStatusMetaVersioned, +>; + +pub type DecodedBlockRewards = DecodedField< + solana_storage_proto::confirmed_block::Rewards, + solana_storage_proto::StoredExtendedRewards, +>; + +pub enum DecodedField { + Proto(P), + Bincode(B), +} + +#[derive(Default)] +pub struct DecodedSlot { + pub slot: solana_clock::Slot, + pub parent_slot: solana_clock::Slot, + pub blockhash: [u8; 32], + pub prev_blockhash: [u8; 32], + pub block_height: Option, + pub blocktime: i64, + pub transactions: Vec, + pub transaction_metas: Vec>, + pub block_rewards: Option, +} + +impl DecodedSlot { + /// Create a dummy `DecodedSlot` with the given slot number and default values for all + /// other fields. This can be used for testing or as a placeholder when only the slot + /// number is relevant. + /// + /// NOTE: The reason this is marked as `pub` is because it is used in integration tests + /// in the `tests` crate. + #[doc(hidden)] + pub fn dummy(slot: solana_clock::Slot) -> Self { + Self { + slot, + parent_slot: slot.saturating_sub(1), + ..Default::default() + } + } +} + +/// Read an entire block worth of nodes from the given node reader and decode them into +/// a [DecodedSlot]. +/// +/// Inspired by the Old Faithful CAR parser example: +/// +pub(crate) async fn read_and_decode_slot( + node_reader: &mut car_parser::node::NodeReader, + prev_blockhash: [u8; 32], +) -> Result, OldFaithfulStreamError> { + // Once we reach `Node::Block`, the node map will contain all of the nodes needed to reassemble + // that block. + let nodes = car_parser::node::Nodes::read_until_block(node_reader) + .await + .map_err(OldFaithfulStreamError::NodeParse)?; + + let block = match nodes.nodes.last() { + // Expected block node. + Some((_, car_parser::node::Node::Block(block))) => block, + // Reached end of CAR file. + None | Some((_, car_parser::node::Node::Epoch(_))) => return Ok(None), + Some((cid, node)) => { + return Err(OldFaithfulStreamError::UnexpectedNode { + kind: node.kind(), + cid: (*cid).into(), + }); + } + }; + + let mut transactions = Vec::new(); + let mut transaction_metas = Vec::new(); + + for entry_cid in &block.entries { + let Some(car_parser::node::Node::Entry(entry)) = nodes.nodes.get(entry_cid) else { + return Err(OldFaithfulStreamError::MissingNode { + expected: "entry", + cid: entry_cid.to_string(), + }); + }; + for tx_cid in &entry.transactions { + let Some(car_parser::node::Node::Transaction(tx)) = nodes.nodes.get(tx_cid) else { + return Err(OldFaithfulStreamError::MissingNode { + expected: "transaction", + cid: tx_cid.to_string(), + }); + }; + + nodes + .reassemble_dataframes(&tx.data) + .map_err(OldFaithfulStreamError::DataframeReassembly) + .and_then(|tx_df| { + bincode::deserialize(&tx_df).map_err(OldFaithfulStreamError::Bincode) + }) + .map(|tx| { + transactions.push(tx); + })?; + nodes + .reassemble_dataframes(&tx.metadata) + .map_err(OldFaithfulStreamError::DataframeReassembly) + .and_then(|meta_df| { + if meta_df.is_empty() { + Ok(None) + } else { + zstd_decompress(TX_STATUS_META_FIELD, &meta_df) + .and_then(|meta| decode_tx_status_meta(block.slot, &meta)) + .map(Some) + } + }) + .map(|meta| { + transaction_metas.push(meta); + })?; + } + } + + let block_rewards = nodes + .nodes + .get(&block.rewards) + .map(|rewards| { + let car_parser::node::Node::Rewards(rewards) = rewards else { + return Err(OldFaithfulStreamError::UnexpectedNode { + kind: rewards.kind(), + cid: block.rewards.to_string(), + }); + }; + if rewards.slot != block.slot { + return Err(OldFaithfulStreamError::RewardSlotMismatch { + expected: block.slot, + found: rewards.slot, + }); + } + + nodes + .reassemble_dataframes(&rewards.data) + .map_err(OldFaithfulStreamError::DataframeReassembly) + .and_then(|block_rewards_df| { + zstd_decompress(BLOCK_REWARDS_FIELD, &block_rewards_df) + }) + .and_then(|rewards_df| { + decode_proto_or_bincode(block.slot, BLOCK_REWARDS_FIELD, rewards_df.as_slice()) + }) + }) + .transpose()?; + + let blockhash = { + // Hash of the last entry has the same value as that block's `blockhash` in + // CAR files. + let last_entry_cid = block.entries.last().expect("at least one entry"); + let last_entry_node = nodes.nodes.get(last_entry_cid); + let Some(car_parser::node::Node::Entry(last_entry)) = last_entry_node else { + return Err(OldFaithfulStreamError::MissingNode { + expected: "entry", + cid: last_entry_cid.to_string(), + }); + }; + last_entry.hash.clone().try_into().map_err(|vec: Vec| { + OldFaithfulStreamError::TryIntoArray { + expected_len: 32, + actual_len: vec.len(), + } + })? + }; + + let blocktime = + block + .meta + .blocktime + .try_into() + .map_err(|_| OldFaithfulStreamError::BlocktimeOverflow { + slot: block.slot, + blocktime: block.meta.blocktime, + })?; + + let block = DecodedSlot { + slot: block.slot, + parent_slot: block.meta.parent_slot, + blockhash, + prev_blockhash, + block_height: block.meta.block_height, + blocktime, + transactions, + transaction_metas, + block_rewards, + }; + + Ok(Some(block)) +} + +/// Attempt to decode a field read from a CAR file as either protobuf or bincode encoded. +/// Fail if both decoding attempts fail. All fields that need to be decoded this way are +/// ZSTD compressed, so the input data to this function is expected to already be +/// decompressed. +/// +/// For some epochs transaction metadata / block rewards are stored as protobuf encoded, +/// while for others they are stored as bincode encoded. This function handles both cases. +fn decode_proto_or_bincode( + slot: solana_clock::Slot, + field_name: &'static str, + decompressed_input: &[u8], +) -> Result, OldFaithfulStreamError> +where + P: prost::Message + Default, + B: serde::de::DeserializeOwned, +{ + match prost::Message::decode(decompressed_input).map(DecodedField::Proto) { + Ok(data_proto) => Ok(data_proto), + Err(prost_err) => { + match bincode::deserialize(decompressed_input).map(DecodedField::Bincode) { + Ok(data_bincode) => Ok(data_bincode), + Err(bincode_err) => { + let err = OldFaithfulStreamError::DecodeField { + slot, + field_name, + prost_err: prost_err.to_string(), + bincode_err: bincode_err.to_string(), + }; + Err(err) + } + } + } + } +} + +/// Decode transaction metadata that may be encoded in either protobuf or bincode format, +/// depending on the epoch. Bincode deserialization handles multiple legacy formats internally +/// via [`solana_storage_proto::StoredTransactionStatusMetaVersioned`]. +/// +/// Transaction metadata passed in should already be ZSTD decompressed. +fn decode_tx_status_meta( + slot: solana_clock::Slot, + decompressed_tx_meta: &[u8], +) -> Result { + // Try protobuf first. + match prost::Message::decode(decompressed_tx_meta) { + Ok(proto_meta) => Ok(DecodedField::Proto(proto_meta)), + Err(prost_err) => { + // Try all bincode versions (current, legacy v2, legacy v1). + match solana_storage_proto::StoredTransactionStatusMetaVersioned::from_bincode( + decompressed_tx_meta, + ) { + Ok(meta) => Ok(DecodedField::Bincode(meta)), + Err(bincode_err) => { + let err = OldFaithfulStreamError::DecodeField { + slot, + field_name: TX_STATUS_META_FIELD, + prost_err: prost_err.to_string(), + bincode_err: bincode_err.to_string(), + }; + // Logging the full decompressed transaction metadata can be helpful for + // debugging decoding issues, even though it can be large and clutter the + // logs. + tracing::error!( + data = ?decompressed_tx_meta, + error = ?err, + error_source = monitoring::logging::error_source(&err), + "failed to decode transaction status meta" + ); + + Err(err) + } + } + } + } +} + +fn zstd_decompress( + field_name: &'static str, + input: &[u8], +) -> Result, OldFaithfulStreamError> { + zstd::decode_all(input).map_err(|err| OldFaithfulStreamError::Zstd { + field_name, + error: err.to_string(), + }) +} diff --git a/crates/core/providers-solana/src/old_faithful/epoch_supervision.rs b/crates/core/providers-solana/src/old_faithful/epoch_supervision.rs new file mode 100644 index 000000000..7be3fe032 --- /dev/null +++ b/crates/core/providers-solana/src/old_faithful/epoch_supervision.rs @@ -0,0 +1,48 @@ +//! Debug-only guard that detects overlapping epoch streams. +//! +//! When two streams attempt to process the same epoch concurrently, the +//! [`Guard`] will panic, surfacing the bug early in development rather than +//! producing silently corrupted data. + +use std::{collections::HashSet, sync::Mutex}; + +/// Guard that tracks in-progress epochs to detect overlapping Solana streams in debug builds. +/// +/// # Panics +/// +/// Panics if an attempt is made to [create](Guard::new) a guard for an epoch that is already +/// in progress, or if a guard is dropped for an epoch that is not currently in progress. +pub(crate) struct Guard<'a> { + epoch: solana_clock::Epoch, + in_progress_epochs: &'a Mutex>, +} + +impl<'a> Guard<'a> { + pub(crate) fn new( + in_progress_epochs: &'a Mutex>, + epoch: solana_clock::Epoch, + ) -> Self { + let mut epochs_in_progress = in_progress_epochs.lock().unwrap(); + let is_new = epochs_in_progress.insert(epoch); + assert!( + is_new, + "epoch {epoch} already in progress, overlapping Solana streams are not allowed" + ); + Self { + epoch, + in_progress_epochs, + } + } +} + +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + let mut epochs_in_progress = self.in_progress_epochs.lock().unwrap(); + let removed = epochs_in_progress.remove(&self.epoch); + assert!( + removed, + "epoch {} was not in progress during drop, this should never happen", + self.epoch + ); + } +} diff --git a/crates/core/providers-solana/src/old_faithful/metrics.rs b/crates/core/providers-solana/src/old_faithful/metrics.rs new file mode 100644 index 000000000..92e553c6b --- /dev/null +++ b/crates/core/providers-solana/src/old_faithful/metrics.rs @@ -0,0 +1,126 @@ +//! Metrics instrumentation for Old Faithful CAR file streaming. +//! +//! Provides [`MonitoredAsyncRead`], an [`AsyncRead`](tokio::io::AsyncRead) +//! wrapper that records byte throughput, download duration, and error counts +//! for each epoch's CAR file read. + +use std::{pin::Pin, sync::Arc}; + +use amp_providers_common::{network_id::NetworkId, provider_name::ProviderName}; + +/// Context for OF1 streaming that can be passed to functions that need to report metrics. +#[derive(Debug, Clone)] +pub struct Context { + pub provider: ProviderName, + pub network: NetworkId, + pub registry: Arc, +} + +pub(crate) struct MonitoredAsyncRead { + inner: R, + monitor: Option, +} + +impl MonitoredAsyncRead { + pub(crate) fn new(inner: R, epoch: solana_clock::Epoch, metrics: Option) -> Self { + let monitor = metrics.map(|metrics| AsyncReadMonitor { + epoch, + bytes_read_chunk: 0, + started_at: std::time::Instant::now(), + provider: metrics.provider, + network: metrics.network, + registry: metrics.registry, + }); + Self { inner, monitor } + } +} + +impl tokio::io::AsyncRead for MonitoredAsyncRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let before = buf.filled().len(); + let result = Pin::new(&mut this.inner).poll_read(cx, buf); + + if let Some(m) = this.monitor.as_mut() { + match &result { + std::task::Poll::Ready(Ok(())) => { + let bytes_read = (buf.filled().len() - before) as u64; + if bytes_read > 0 { + m.record_bytes_read(bytes_read); + } else { + m.record_car_download(); + } + } + std::task::Poll::Ready(Err(_)) => { + m.record_car_download_error(); + } + std::task::Poll::Pending => {} + } + } + + result + } +} + +struct AsyncReadMonitor { + epoch: solana_clock::Epoch, + bytes_read_chunk: u64, + started_at: std::time::Instant, + provider: ProviderName, + network: NetworkId, + registry: Arc, +} + +impl AsyncReadMonitor { + const BYTES_READ_RECORD_THRESHOLD: u64 = 10 * 1024 * 1024; // 10 MiB + + /// Record the number of bytes read and report to metrics if the reporting threshold is reached. + fn record_bytes_read(&mut self, n: u64) { + self.bytes_read_chunk += n; + + if self.bytes_read_chunk >= Self::BYTES_READ_RECORD_THRESHOLD { + self.registry.record_of1_car_download_bytes( + self.bytes_read_chunk, + self.epoch, + &self.provider, + &self.network, + ); + self.bytes_read_chunk = 0; + } + } + + /// Record any remaining bytes read that didn't reach the reporting threshold. + fn flush_bytes_read(&mut self) { + if self.bytes_read_chunk > 0 { + self.registry.record_of1_car_download_bytes( + self.bytes_read_chunk, + self.epoch, + &self.provider, + &self.network, + ); + self.bytes_read_chunk = 0; + } + } + + fn record_car_download(&mut self) { + let elapsed = self.started_at.elapsed().as_secs_f64(); + self.registry + .record_of1_car_download(elapsed, self.epoch, &self.provider, &self.network); + self.flush_bytes_read(); + } + + fn record_car_download_error(&mut self) { + self.registry + .record_of1_car_download_error(self.epoch, &self.provider, &self.network); + } +} + +impl Drop for AsyncReadMonitor { + fn drop(&mut self) { + self.flush_bytes_read(); + } +} diff --git a/crates/core/providers-solana/src/old_faithful/reader.rs b/crates/core/providers-solana/src/old_faithful/reader.rs new file mode 100644 index 000000000..8dbbefdfa --- /dev/null +++ b/crates/core/providers-solana/src/old_faithful/reader.rs @@ -0,0 +1,237 @@ +//! [`AsyncRead`](tokio::io::AsyncRead) implementations for reading CAR files. +//! +//! Provides two readers: +//! - [`LocalCarReader`]: memory-maps a local CAR file for zero-copy reads. +//! - [`RemoteCarReader`]: streams a CAR file over HTTP with automatic retry, +//! exponential backoff, and resumption via `Range` headers. + +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; + +use futures::{FutureExt, Stream, StreamExt}; + +use crate::{ + error::CarReaderError, + old_faithful::{car_download_url, car_filename}, +}; + +type ConnectFuture = Pin> + Send>>; +type ByteStream = Pin> + Send>>; +type BackoffFuture = Pin>; + +pub(crate) fn remote( + epoch: solana_clock::Epoch, + reqwest: Arc, +) -> impl tokio::io::AsyncRead + Send { + RemoteCarReader::new(epoch, reqwest) +} + +pub(crate) fn local( + epoch: solana_clock::Epoch, + archive_dir: &std::path::Path, +) -> std::io::Result { + LocalCarReader::new(epoch, archive_dir) +} + +enum ReaderState { + /// A single in-flight HTTP request to (re)connect. + Connect(ConnectFuture), + /// We have an active byte stream. + Stream(ByteStream), + /// We are waiting until a backoff deadline before attempting reconnect. + Backoff(BackoffFuture), +} + +struct RemoteCarReader { + epoch: solana_clock::Epoch, + reqwest: Arc, + state: ReaderState, + overflow: Vec, + bytes_read_total: u64, + + // Backoff control + reconnect_attempt: u32, + max_backoff: Duration, + base_backoff: Duration, +} + +impl RemoteCarReader { + fn new(epoch: solana_clock::Epoch, reqwest: Arc) -> Self { + let connect_fut = request_car_file_with_offset(reqwest.clone(), epoch, 0); + + Self { + epoch, + reqwest, + state: ReaderState::Connect(Box::pin(connect_fut)), + overflow: Vec::new(), + bytes_read_total: 0, + reconnect_attempt: 0, + base_backoff: Duration::from_millis(100), + max_backoff: Duration::from_secs(30), + } + } + + fn schedule_backoff(&mut self, err: CarReaderError) { + let backoff = { + let attempts_made = self.reconnect_attempt.min(30); + // attempts_made=0 => base, attempts_made=1 => 2*base, attempts_made=2 => 4*base, ... + let factor = 1 << attempts_made; + let backoff = self.base_backoff.saturating_mul(factor as u32); + backoff.min(self.max_backoff) + }; + self.reconnect_attempt = self.reconnect_attempt.saturating_add(1); + + let backoff_str = format!("{:.1}s", backoff.as_secs_f32()); + tracing::warn!( + epoch = self.epoch, + bytes_read = self.bytes_read_total, + attempt = self.reconnect_attempt, + error = ?err, + error_source = monitoring::logging::error_source(&err), + backoff = %backoff_str, + "CAR reader failed; scheduled retry" + ); + + let fut = tokio::time::sleep(backoff); + self.state = ReaderState::Backoff(Box::pin(fut)); + } +} + +impl tokio::io::AsyncRead for RemoteCarReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + // Drain overflow first. + if !this.overflow.is_empty() { + let to_copy = this.overflow.len().min(buf.remaining()); + buf.put_slice(&this.overflow[..to_copy]); + this.overflow.drain(..to_copy); + return std::task::Poll::Ready(Ok(())); + } + + // Retry loop, return on successful read, EOF, or non-recoverable error (RangeRequestUnsupported). + loop { + match &mut this.state { + ReaderState::Connect(fut) => match fut.as_mut().poll(cx) { + std::task::Poll::Ready(Ok(resp)) => { + let status = resp.status(); + // Handle error codes. + match status { + reqwest::StatusCode::NOT_FOUND => { + let err = std::io::Error::other(CarReaderError::Http(status)); + return std::task::Poll::Ready(Err(err)); + } + status if !status.is_success() => { + this.schedule_backoff(CarReaderError::Http(status)); + continue; + } + _ => {} + } + + // Handle partial content. + if this.bytes_read_total > 0 + && status != reqwest::StatusCode::PARTIAL_CONTENT + { + let e = std::io::Error::other(CarReaderError::RangeRequestUnsupported); + return std::task::Poll::Ready(Err(e)); + } + + // Initial connection succeeded, start reading the byte stream. + this.reconnect_attempt = 0; + this.state = ReaderState::Stream(Box::pin(resp.bytes_stream())); + } + std::task::Poll::Ready(Err(e)) => { + this.schedule_backoff(CarReaderError::Reqwest(e)); + } + std::task::Poll::Pending => return std::task::Poll::Pending, + }, + ReaderState::Stream(stream) => match stream.poll_next_unpin(cx) { + // Reached EOF. + std::task::Poll::Ready(None) => { + return std::task::Poll::Ready(Ok(())); + } + // Read some bytes, account for possible overflow. + std::task::Poll::Ready(Some(Ok(bytes))) => { + let n_read = bytes.len(); + let to_copy = n_read.min(buf.remaining()); + + buf.put_slice(&bytes[..to_copy]); + this.overflow.extend_from_slice(&bytes[to_copy..]); + this.bytes_read_total += n_read as u64; + + return std::task::Poll::Ready(Ok(())); + } + std::task::Poll::Ready(Some(Err(e))) => { + this.schedule_backoff(CarReaderError::Reqwest(e)); + } + std::task::Poll::Pending => return std::task::Poll::Pending, + }, + ReaderState::Backoff(fut) => match fut.poll_unpin(cx) { + std::task::Poll::Ready(()) => { + let fut = request_car_file_with_offset( + this.reqwest.clone(), + this.epoch, + this.bytes_read_total, + ); + this.state = ReaderState::Connect(Box::pin(fut)); + } + std::task::Poll::Pending => return std::task::Poll::Pending, + }, + } + } + } +} + +struct LocalCarReader { + cursor: std::io::Cursor, +} + +impl LocalCarReader { + fn new(epoch: solana_clock::Epoch, archive_dir: &std::path::Path) -> std::io::Result { + let path = archive_dir.join(car_filename(epoch)); + let file = fs_err::File::open(&path)?; + // SAFETY: We rely on the file not being modified while mapped. This is acceptable + // because CAR archive files are immutable once written. + let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? }; + Ok(Self { + cursor: std::io::Cursor::new(mmap), + }) + } +} + +impl tokio::io::AsyncRead for LocalCarReader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let pos = this.cursor.position() as usize; + let src = this.cursor.get_ref(); + + if pos >= src.len() { + return std::task::Poll::Ready(Ok(())); + } + + let remaining = &src[pos..]; + let to_copy = remaining.len().min(buf.remaining()); + buf.put_slice(&remaining[..to_copy]); + this.cursor.set_position((pos + to_copy) as u64); + std::task::Poll::Ready(Ok(())) + } +} + +async fn request_car_file_with_offset( + reqwest: Arc, + epoch: solana_clock::Epoch, + offset: u64, +) -> Result { + let mut req = reqwest.get(car_download_url(epoch)); + if offset > 0 { + req = req.header(reqwest::header::RANGE, format!("bytes={offset}-")); + } + req.send().await +} diff --git a/crates/core/providers-solana/src/tables/block_rewards.rs b/crates/core/providers-solana/src/tables/block_rewards.rs index b5104a40b..06076d8fb 100644 --- a/crates/core/providers-solana/src/tables/block_rewards.rs +++ b/crates/core/providers-solana/src/tables/block_rewards.rs @@ -14,7 +14,7 @@ use solana_clock::Slot; use crate::{ error::{RowConversionError, RowConversionResult}, - of1_client, rpc_client, tables, + old_faithful, rpc_client, tables, }; pub const TABLE_NAME: &str = "block_rewards"; @@ -170,17 +170,17 @@ pub struct BlockRewards { impl BlockRewards { pub(crate) fn from_of1_rewards( slot: Slot, - rewards: Option, + rewards: Option, ) -> RowConversionResult { let rewards: Vec = rewards .map(|rewards| { let rewards = match rewards { - of1_client::DecodedField::Proto(proto_rewards) => proto_rewards + old_faithful::DecodedField::Proto(proto_rewards) => proto_rewards .rewards .into_iter() .map(TryInto::try_into) .collect::>()?, - of1_client::DecodedField::Bincode(bincode_rewards) => { + old_faithful::DecodedField::Bincode(bincode_rewards) => { bincode_rewards.into_iter().map(Into::into).collect() } }; diff --git a/crates/core/providers-solana/src/tables/transactions.rs b/crates/core/providers-solana/src/tables/transactions.rs index 55b2d6267..368946f84 100644 --- a/crates/core/providers-solana/src/tables/transactions.rs +++ b/crates/core/providers-solana/src/tables/transactions.rs @@ -19,7 +19,7 @@ use solana_clock::Slot; use crate::{ error::{RowConversionError, RowConversionResult}, - of1_client, rpc_client, tables, + old_faithful, rpc_client, tables, }; pub const TABLE_NAME: &str = "transactions"; @@ -139,16 +139,16 @@ impl Transaction { tx_index: u32, tx_version: TransactionVersion, of1_tx_signatures: Vec, - of1_tx_meta: Option, + of1_tx_meta: Option, ) -> RowConversionResult { let signatures = of1_tx_signatures.iter().map(|s| s.to_string()).collect(); let transaction_status_meta = of1_tx_meta .map(|meta| match meta { - of1_client::DecodedTransactionStatusMeta::Proto(proto_meta) => { + old_faithful::DecodedTransactionStatusMeta::Proto(proto_meta) => { TransactionStatusMeta::from_proto_meta(slot, tx_index, proto_meta) } - of1_client::DecodedTransactionStatusMeta::Bincode(stored_meta) => { + old_faithful::DecodedTransactionStatusMeta::Bincode(stored_meta) => { TransactionStatusMeta::from_stored_meta(slot, tx_index, stored_meta.into()) } }) diff --git a/crates/extractors/solana/README.md b/crates/extractors/solana/README.md index 5e523d68b..18dd424f5 100644 --- a/crates/extractors/solana/README.md +++ b/crates/extractors/solana/README.md @@ -60,7 +60,16 @@ This extractor treats Solana slots as block numbers for compatibility with the ` ```toml kind = "solana" network = "mainnet" -of1_car_directory = "path/to/local/car/files" + +# Archive mode: "always" (default), "auto", or "never" +# - "always": Always use archive, even for recent data +# - "auto": RPC for recent slots, archive for historical +# - "never": Never use archive, RPC-only mode +use_archive = "always" + +# Archive dir: Optional local directory for pre-downloaded +# CAR files (if not using Old Faithful directly) +# archive_dir = "path/to/pre-downloaded/car/files" [rpc_provider_info] url = "https://api.mainnet-beta.solana.com" @@ -70,6 +79,8 @@ url = "https://api.mainnet-beta.solana.com" # [fallback_rpc_provider_info] # Optional: used to fill truncated log messages # url = "https://another-rpc.example.com" # auth_token = "your-token" + +max_rpc_calls_per_second = 50 ``` **Configuration Options**: @@ -81,10 +92,9 @@ url = "https://api.mainnet-beta.solana.com" | `rpc_provider_info.auth_header` | No | Custom header name for auth (default: `Authorization: Bearer`) | | `rpc_provider_info.auth_token` | No | Authentication token for RPC requests | | `fallback_rpc_provider_info` | No | Fallback RPC endpoint for filling truncated log messages (same fields as `rpc_provider_info`) | -| `of1_car_directory` | Yes | Local directory for caching Old Faithful CAR files | | `max_rpc_calls_per_second` | No | Rate limit for RPC calls (applies to main RPC only) | -| `keep_of1_car_files` | No | Whether to retain downloaded CAR files after use (default: `false`) | | `use_archive` | No | Archive usage mode: `always` (default), `never`, or `auto` | +| `archive_dir` | No | Directory for pre-downloaded CAR files (if not using Old Faithful downloads) | | `start_block` | No | Starting slot number for extraction (set in the manifest) | | `finalized_blocks_only` | No | Whether to only extract finalized blocks (set in the manifest) | | `commitment` | No | Commitment level for Solana RPC requests: `finalized` (default), `processed` or `confirmed` | @@ -97,11 +107,11 @@ The extractor downloads epoch-based CAR (Content Addressable aRchive) files from - **File Format**: `epoch-.car` - **Epoch Size**: 432,000 slots per epoch (~2 days at 400ms slot time) -CAR files are automatically downloaded on-demand and cached locally. Downloads support resumption on failure and exponential backoff retries. When `keep_of1_car_files` is `false`, files are deleted once no longer needed. +CAR files are streamed from the archive and processed in memory. The extractor also supports using pre-downloaded CAR files from a local directory specified by `archive_dir`. This allows users to manage their own archive downloads and avoid redundant network usage. ### Warning -Due to the large size of Solana CAR files, ensure sufficient disk space is available in the specified `of1_car_directory`. +Due to the large size of Solana CAR files, ensure sufficient disk space is available in the specified `archive_dir`, if choosing pre-downloaded CAR files route. ## Utilities @@ -109,6 +119,10 @@ Due to the large size of Solana CAR files, ensure sufficient disk space is avail A companion example (`examples/solana_compare.rs`) that compares block data from Old Faithful CAR files against the RPC endpoint for the same epoch. Useful for validating data consistency between the two sources. +### solana-car-download + +A utility for downloading Solana epoch CAR files directly from Old Faithful, with support for resuming interrupted downloads and retrying on failures. This can be used to pre-populate the `archive_dir` with CAR files before running the extractor. + ## JSON Schema Generation JSON schemas for Solana dataset manifests can be generated using the companion `solana-gen` crate: diff --git a/docs/code/extractors.md b/docs/code/extractors.md index f376f4f21..c55b055af 100644 --- a/docs/code/extractors.md +++ b/docs/code/extractors.md @@ -191,7 +191,7 @@ pub struct ProviderConfig { **Extractor-specific fields (examples):** - EVM-RPC: `concurrent_request_limit`, `rpc_batch_size`, `rate_limit_per_minute`, `fetch_receipts_per_tx` -- Solana: `rpc_provider_url`, `of1_car_directory`, `keep_of1_car_files`, `use_archive` +- Solana: `rpc_provider_info`, `use_archive`, `archive_dir` - Firehose: `token` (authentication) ## FACTORY FUNCTION PATTERN diff --git a/docs/feat/provider-solana.md b/docs/feat/provider-solana.md index 8b3f2e0bc..f6309e9be 100644 --- a/docs/feat/provider-solana.md +++ b/docs/feat/provider-solana.md @@ -39,16 +39,18 @@ For the complete field reference, see the [config schema](../schemas/providers/s kind = "solana" network = "mainnet" rpc_provider_url = "${SOLANA_MAINNET_RPC_URL}" -of1_car_directory = "${SOLANA_OF1_CAR_DIRECTORY}" # Archive mode: "always" (default), "auto", or "never" # - "always": Always use archive, even for recent data -# - "auto": RPC for recent slots (last 10k), archive for historical +# - "auto": RPC for recent slots, archive for historical # - "never": Never use archive, RPC-only mode use_archive = "always" +# Archive dir: Optional local directory for pre-downloaded +# CAR files (if not using Old Faithful directly) +# archive_dir = "path/to/pre-downloaded/car/files" + max_rpc_calls_per_second = 50 -keep_of1_car_files = false ``` ## Architecture @@ -64,21 +66,21 @@ The provider supports three archive modes controlled by the `use_archive` config ``` Historical Data Real-time Data ↓ ↓ -Old Faithful Archive → CAR files → RPC endpoint - ↓ ↓ - Download epoch CAR getBlock calls +Old Faithful Archive → CAR files → RPC endpoint ↓ ↓ - Process locally Stream blocks + Stream epoch CAR getBlock calls ↓ ↓ - Delete CAR (optional) Continuous sync + Process in-memory Stream blocks + ↓ + Continuous sync ``` ### Data Sources -| Stage | Source | Data | -|-------|--------|------| +| Stage | Source | Data | +| ---------- | --------------------------------------- | --------------------------------- | | Historical | Old Faithful (`files.old-faithful.net`) | Archived epoch CAR files (~745GB) | -| Real-time | Solana RPC | Live blocks via JSON-RPC | +| Real-time | Solana RPC | Live blocks via JSON-RPC | ## Usage @@ -86,19 +88,15 @@ Old Faithful Archive → CAR files → RPC endpoint ```bash export SOLANA_MAINNET_RPC_URL="https://api.mainnet-beta.solana.com" -export SOLANA_OF1_CAR_DIRECTORY="/data/solana/car" ``` ### CAR File Management -By default, CAR files are deleted after processing to save disk space: +Solana extractor supports reading pre-downloaded CAR files from a local directory. +To use this feature, set the `archive_dir` configuration field to the path where your CAR files are stored: ```toml -# Delete CAR files after processing (default) -keep_of1_car_files = false - -# Retain CAR files for debugging or reprocessing -keep_of1_car_files = true +archive_dir = "path/to/pre-downloaded/car/files" ``` ### Rate Limiting @@ -114,12 +112,12 @@ max_rpc_calls_per_second = 50 ### Extracted Tables -| Table | Key Fields | -|-------|------------| +| Table | Key Fields | +| --------------- | ------------------------------------------------------- | | `block_headers` | slot, parent_slot, block_hash, block_height, block_time | -| `transactions` | slot, tx_index, signatures, status, fee, balances | -| `messages` | slot, tx_index, message fields | -| `instructions` | slot, tx_index, program_id_index, accounts, data | +| `transactions` | slot, tx_index, signatures, status, fee, balances | +| `messages` | slot, tx_index, message fields | +| `instructions` | slot, tx_index, program_id_index, accounts, data | ### Slot Handling diff --git a/docs/schemas/providers/solana.spec.json b/docs/schemas/providers/solana.spec.json index e519e3a3a..4615f9a16 100644 --- a/docs/schemas/providers/solana.spec.json +++ b/docs/schemas/providers/solana.spec.json @@ -4,6 +4,13 @@ "description": "Solana provider configuration for parsing TOML config.\n\nThis structure defines the parameters required to connect to a Solana\nRPC endpoint for blockchain data extraction. The `kind` field validates\nthat the config belongs to a `solana` provider at deserialization time.", "type": "object", "properties": { + "archive_dir": { + "description": "Optional local path to the Old Faithful CAR archive directory for historical data.\n\nThe directory is expected to have pre-downloaded CAR files that follow the naming\npattern established in [crate::old_faithful::car_filename]. This will be ensured\nif the `solana-car-download` example is used to populate the directory.", + "type": [ + "string", + "null" + ] + }, "commitment": { "description": "Commitment level for RPC requests.", "$ref": "#/$defs/CommitmentLevel" @@ -41,7 +48,7 @@ "$ref": "#/$defs/RpcProviderConnectionConfig" }, "use_archive": { - "description": "Controls when to use the Solana archive for historical data.", + "description": "Controls when to use the Old Faithful CAR archive for historical data.", "$ref": "#/$defs/UseArchive" } }, diff --git a/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs b/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs index 8924f2ab7..15e7689b3 100644 --- a/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs +++ b/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs @@ -1,7 +1,7 @@ use amp_providers_solana::{ Client, config::UseArchive, - of1_client, + old_faithful, rpc_client::{self, rpc_config::CommitmentConfig}, }; use futures::TryStreamExt; @@ -27,6 +27,7 @@ async fn historical_to_json_rpc_transition() { network, provider_name, UseArchive::Auto, + None, // Archive source CommitmentConfig::finalized(), None, // Metrics ); @@ -38,7 +39,7 @@ async fn historical_to_json_rpc_transition() { // Stream part of the range as historical blocks. let historical = async_stream::stream! { for slot in start..=historical_end { - yield Ok(of1_client::DecodedSlot::dummy(slot)); + yield Ok(old_faithful::DecodedSlot::dummy(slot)); } };