Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ hex = "0.4.3"
indicatif = "0.18"
indoc = "2.0.6"
itertools = "0.14.0"
memmap2 = "0.9"
opentelemetry = { version = "0.31", features = ["trace"] }
opentelemetry-otlp = { version = "0.31", features = [
"grpc-tonic",
Expand Down
1 change: 1 addition & 0 deletions crates/core/providers-solana/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ datasets-raw = { path = "../datasets-raw" }
futures.workspace = true
fs-err.workspace = true
governor.workspace = true
memmap2.workspace = true
headers = { workspace = true }
monitoring = { path = "../monitoring" }
prost.workspace = true
Expand Down
10 changes: 2 additions & 8 deletions crates/core/providers-solana/examples/solana_car_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
time::{Duration, Instant},
};

use amp_providers_solana::old_faithful::{car_download_url, car_filename};
use anyhow::Context;
use backon::{ExponentialBuilder, Retryable};
use clap::Parser;
Expand Down Expand Up @@ -89,7 +90,7 @@ async fn main() -> anyhow::Result<()> {
.build()?;

for epoch in cli.start_epoch..=end_epoch {
let dest = output_dir.join(format!("epoch-{epoch}.car"));
let dest = output_dir.join(car_filename(epoch));
let result = (|| ensure_car_file_exists(epoch, &reqwest, &dest))
.retry(ExponentialBuilder::new().without_max_times())
.when(should_retry)
Expand Down Expand Up @@ -287,13 +288,6 @@ enum CarDownloadError {
PartialDownloadNotSupported,
}

/// Generates the Old Faithful CAR download URL for the given epoch.
///
/// Reference: <https://docs.old-faithful.net/references/of1-files>.
fn car_download_url(epoch: solana_clock::Epoch) -> String {
format!("https://files.old-faithful.net/{epoch}/epoch-{epoch}.car")
}

fn log_download_progress(
epoch: solana_clock::Epoch,
bytes_downloaded: u64,
Expand Down
6 changes: 4 additions & 2 deletions crates/core/providers-solana/examples/solana_compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration};

use amp_providers_solana::{
commitment_config, config::SolanaProviderConfig, non_empty_of1_slot, non_empty_rpc_slot,
of1_client, rpc_client, tables,
old_faithful, rpc_client, tables,
};
use anyhow::Context;
use backon::{ExponentialBuilder, Retryable};
Expand Down Expand Up @@ -81,11 +81,13 @@ async fn main() -> anyhow::Result<()> {
commitment: Some(commitment_config(provider_cfg.commitment)),
};

let of1_stream = of1_client::stream(
let of1_stream = old_faithful::stream(
start_slot,
end_slot,
reqwest,
rpc_client.clone(),
// Local CAR archive source, None = use official archive.
None,
get_block_config,
// Metrics, we don't need to record them.
None,
Expand Down
26 changes: 16 additions & 10 deletions crates/core/providers-solana/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::{collections::HashSet, sync::Mutex};
use std::{
num::{NonZeroU32, NonZeroU64},
path::PathBuf,
str::FromStr,
sync::Arc,
};
Expand All @@ -27,7 +28,7 @@ use solana_clock::Slot;
use crate::{
config::UseArchive,
error::{SlotConversionError, SlotConversionResult},
metrics, of1_client, rpc_client, tables,
metrics, old_faithful, rpc_client, tables,
};

/// Marker string that appears in Solana transaction log message lists when logs
Expand All @@ -44,6 +45,7 @@ pub struct Client {
network: NetworkId,
provider_name: ProviderName,
use_archive: UseArchive,
archive_dir: Option<PathBuf>,
commitment: CommitmentConfig,
/// Monitor which epochs are currently being streamed to prevent overlapping
/// streams. This is only used in debug builds as an additional safety check
Expand All @@ -61,6 +63,7 @@ impl Client {
network: NetworkId,
provider_name: ProviderName,
use_archive: UseArchive,
archive_dir: Option<PathBuf>,
commitment: CommitmentConfig,
meter: Option<&monitoring::telemetry::metrics::Meter>,
) -> Self {
Expand Down Expand Up @@ -94,6 +97,7 @@ impl Client {
network,
provider_name,
use_archive,
archive_dir,
commitment,
#[cfg(debug_assertions)]
epochs_in_progress: Arc::new(Mutex::new(HashSet::new())),
Expand All @@ -102,7 +106,7 @@ impl Client {

/// Core implementation of the block streaming logic, parameterized over the historical block stream source.
/// For production use-cases the historical block stream will come from the Old Faithful archive (using the
/// [`of1_client`] module) and for testing we can provide a custom stream of decoded slots directly.
/// [`old_faithful`] module) and for testing we can provide a custom stream of decoded slots directly.
///
/// NOTE: The reason this is marked as `pub` is because it is used in integration tests
/// in the `tests` crate.
Expand All @@ -116,7 +120,7 @@ impl Client {
get_transaction_config: rpc_client::rpc_config::RpcTransactionConfig,
) -> impl Stream<Item = Result<Rows, BlockStreamError>>
where
T: Stream<Item = Result<of1_client::DecodedSlot, BlockStreamError>>,
T: Stream<Item = Result<old_faithful::DecodedSlot, BlockStreamError>>,
{
async_stream::stream! {
// Helper macro to simplify error handling and early returns in the stream.
Expand Down Expand Up @@ -286,16 +290,17 @@ impl BlockStreamer for Client {
let metrics = self
.metrics
.clone()
.map(|registry| of1_client::MetricsContext {
.map(|registry| old_faithful::metrics::Context {
registry,
provider: self.provider_name.clone(),
network: self.network.clone(),
});
of1_client::stream(
old_faithful::stream(
start,
end,
self.reqwest.clone(),
self.main_rpc_client.clone(),
self.archive_dir.clone(),
get_block_config,
metrics,
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -337,12 +342,12 @@ impl BlockStreamer for Client {
}
}

/// Converts [of1_client::DecodedSlot] to [tables::NonEmptySlot]. This conversion can fail if any
/// Converts [old_faithful::DecodedSlot] to [tables::NonEmptySlot]. This conversion can fail if any
/// of the decoded fields do not match the expected format/values.
pub fn non_empty_of1_slot(
slot: of1_client::DecodedSlot,
slot: old_faithful::DecodedSlot,
) -> SlotConversionResult<tables::NonEmptySlot> {
let of1_client::DecodedSlot {
let old_faithful::DecodedSlot {
slot,
parent_slot,
blockhash,
Expand Down Expand Up @@ -625,7 +630,7 @@ mod tests {
use url::Url;

use super::Client;
use crate::{config::UseArchive, of1_client, rpc_client};
use crate::{config::UseArchive, old_faithful, rpc_client};

#[tokio::test]
async fn historical_blocks_only() {
Expand All @@ -647,6 +652,7 @@ mod tests {
network,
provider_name,
UseArchive::Auto,
None, // Archive source
CommitmentConfig::finalized(),
None, // Meter
);
Expand All @@ -657,7 +663,7 @@ mod tests {
// Stream the entire range as historical blocks.
let historical = async_stream::stream! {
for slot in start..=end {
yield Ok(of1_client::DecodedSlot::dummy(slot));
yield Ok(old_faithful::DecodedSlot::dummy(slot));
}
};

Expand Down
11 changes: 9 additions & 2 deletions crates/core/providers-solana/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::num::NonZeroU32;
use std::{num::NonZeroU32, path::PathBuf};

use amp_providers_common::{network_id::NetworkId, redacted::Redacted};
use headers::{HeaderName, HeaderValue};
Expand Down Expand Up @@ -40,10 +40,17 @@ pub struct SolanaProviderConfig {
/// Optional rate limit for RPC calls per second.
pub max_rpc_calls_per_second: Option<NonZeroU32>,

/// Controls when to use the Solana archive for historical data.
/// Controls when to use the Old Faithful CAR archive for historical data.
#[serde(default)]
pub use_archive: UseArchive,

/// Optional local path to the Old Faithful CAR archive directory for historical data.
///
/// The directory is expected to have pre-downloaded CAR files that follow the naming
/// pattern established in [crate::old_faithful::car_filename]. This will be ensured
/// if the `solana-car-download` example is used to populate the directory.
pub archive_dir: Option<PathBuf>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No validation on archive_dir: Following the "Validate at the Edge" principle, consider validating that archive_dir (when provided) is an existing directory at deserialization or client construction time, rather than deferring discovery of a bad path to the first epoch read. A missing or non-directory path would currently produce a confusing CarReaderError::Io deep in the streaming pipeline.

This could be a simple check in Client::new or a validated newtype for the directory path.


/// Commitment level for RPC requests.
#[serde(default)]
pub commitment: CommitmentLevel,
Expand Down
76 changes: 52 additions & 24 deletions crates/core/providers-solana/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use amp_providers_common::network_id::NetworkId;
use datasets_raw::{client::BlockStreamError, rows::TableRowError};
use yellowstone_faithful_car_parser as car_parser;

use crate::of1_client;

pub type SlotConversionResult<T> = Result<T, SlotConversionError>;
pub type RowConversionResult<T> = Result<T, RowConversionError>;

Expand Down Expand Up @@ -253,7 +251,7 @@ impl SlotConversionError {
/// (Content Addressable aRchive) files. These errors cover RPC communication,
/// file handling, and CAR parsing failures.
#[derive(Debug, thiserror::Error)]
pub enum Of1StreamError {
pub enum OldFaithfulStreamError {
/// Failed to communicate with the Solana RPC client.
///
/// This occurs when querying the RPC for slot information or block data,
Expand All @@ -267,7 +265,7 @@ pub enum Of1StreamError {
/// CAR files, which may include HTTP errors, unsupported range requests, or
/// other file access problems.
#[error("failed to stream CAR file")]
FileStream(#[source] of1_client::CarReaderError),
FileStream(#[source] CarReaderError),

/// Encountered an unexpected node type while reading a block from CAR.
///
Expand Down Expand Up @@ -376,37 +374,67 @@ pub enum Of1StreamError {
BlocktimeOverflow { slot: u64, blocktime: u64 },
}

impl From<Of1StreamError> for BlockStreamError {
fn from(value: Of1StreamError) -> Self {
impl From<OldFaithfulStreamError> for BlockStreamError {
fn from(value: OldFaithfulStreamError) -> Self {
// There is no catch-all here on purpose, to force consideration of
// each error type when mapping to recoverable vs fatal.
match value {
Of1StreamError::FileStream(of1_client::CarReaderError::Io(_))
| Of1StreamError::FileStream(of1_client::CarReaderError::Http(
OldFaithfulStreamError::FileStream(CarReaderError::Io(_))
| OldFaithfulStreamError::FileStream(CarReaderError::Http(
reqwest::StatusCode::NOT_FOUND,
))
| Of1StreamError::FileStream(of1_client::CarReaderError::RangeRequestUnsupported)
| Of1StreamError::UnexpectedNode { .. }
| Of1StreamError::MissingNode { .. }
| Of1StreamError::RewardSlotMismatch { .. }
| Of1StreamError::Zstd { .. }
| Of1StreamError::Bincode(_)
| Of1StreamError::DecodeField { .. }
| Of1StreamError::NodeParse(_)
| Of1StreamError::DataframeReassembly(_)
| Of1StreamError::DecodeBase58(_)
| Of1StreamError::TryIntoArray { .. }
| Of1StreamError::BlocktimeOverflow { .. } => BlockStreamError::Fatal(value.into()),

Of1StreamError::RpcClient(_)
| Of1StreamError::FileStream(of1_client::CarReaderError::Http(_))
| Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => {
| OldFaithfulStreamError::FileStream(CarReaderError::RangeRequestUnsupported)
| OldFaithfulStreamError::UnexpectedNode { .. }
| OldFaithfulStreamError::MissingNode { .. }
| OldFaithfulStreamError::RewardSlotMismatch { .. }
| OldFaithfulStreamError::Zstd { .. }
| OldFaithfulStreamError::Bincode(_)
| OldFaithfulStreamError::DecodeField { .. }
| OldFaithfulStreamError::NodeParse(_)
| OldFaithfulStreamError::DataframeReassembly(_)
| OldFaithfulStreamError::DecodeBase58(_)
| OldFaithfulStreamError::TryIntoArray { .. }
| OldFaithfulStreamError::BlocktimeOverflow { .. } => {
BlockStreamError::Fatal(value.into())
}

OldFaithfulStreamError::RpcClient(_)
| OldFaithfulStreamError::FileStream(CarReaderError::Http(_))
| OldFaithfulStreamError::FileStream(CarReaderError::Reqwest(_)) => {
BlockStreamError::Recoverable(value.into())
}
}
}
}

/// Errors that can occur during streaming of Solana blocks from Old Faithful v1 (OF1) CAR files.
#[derive(Debug, thiserror::Error)]
pub enum CarReaderError {
/// IO error when reading the CAR file.
///
/// This can occur due to network issues, file corruption, or other problems when
/// accessing the CAR file.
#[error("IO error: {0}")]
Io(#[source] std::io::Error),
/// Reqwest error when connecting to or reading from the CAR file.
///
/// This can occur due to network issues, timeouts, or other problems when making
/// HTTP requests to access the CAR file.
#[error("Reqwest error: {0}")]
Reqwest(#[source] reqwest::Error),
/// The server responded with an HTTP error status code when trying to access the CAR file.
///
/// This can occur due to network issues, server problems, or if the CAR file is not available.
#[error("HTTP error: {0}")]
Http(reqwest::StatusCode),
/// The server does not support HTTP range requests.
///
/// This is a non-recoverable error because the Old Faithful reader relies on range requests
/// to be able to resume interrupted downloads without re-downloading the entire CAR.
#[error("server does not support range requests")]
RangeRequestUnsupported,
}

/// Error during Solana client initialization or operation.
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
Expand Down
3 changes: 2 additions & 1 deletion crates/core/providers-solana/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod config;
pub mod error;
pub mod kind;
pub mod metrics;
pub mod of1_client;
pub mod old_faithful;
pub mod rpc_client;
pub mod tables;

Expand Down Expand Up @@ -103,6 +103,7 @@ pub fn client(
config.network,
name,
config.use_archive,
config.archive_dir,
commitment,
meter,
);
Expand Down
Loading
Loading