From 5214deca3820c642f9b06cce7a2c4f576a390d3a Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 16:41:34 +0000 Subject: [PATCH 1/5] fix(core): better tracing of metadata cache --- crates/core/data-store/src/lib.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/crates/core/data-store/src/lib.rs b/crates/core/data-store/src/lib.rs index 0914d1e23..a10f6a56a 100644 --- a/crates/core/data-store/src/lib.rs +++ b/crates/core/data-store/src/lib.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use amp_object_store::{ObjectStoreCreationError, url::ObjectStoreUrl}; +use tracing::Instrument; use bytes::Bytes; use datafusion::{ arrow::datatypes::SchemaRef, @@ -834,6 +835,7 @@ impl DataStore { /// /// The `schema` parameter is required to compute DataFusion statistics from the parquet /// metadata. + #[tracing::instrument(skip_all, fields(%file_id, cache_hit))] pub async fn get_cached_parquet_metadata( &self, file_id: FileId, @@ -845,9 +847,13 @@ impl DataStore { let entry = cache .get_or_fetch(&file_id, || async move { // Cache miss, fetch from database - let footer = metadata_db::files::get_footer_bytes(&metadata_db, file_id) - .await - .map_err(GetCachedMetadataError::FetchFooter)?; + let footer = async { + metadata_db::files::get_footer_bytes(&metadata_db, file_id) + .await + .map_err(GetCachedMetadataError::FetchFooter) + } + .instrument(tracing::info_span!("fetch_footer_from_db")) + .await?; let metadata = Arc::new( ParquetMetaDataReader::new() @@ -869,6 +875,9 @@ impl DataStore { .await .map_err(GetCachedMetadataError::CacheError)?; + let cache_hit = matches!(entry.source(), Source::Memory); + tracing::Span::current().record("cache_hit", cache_hit); + if let Some(metrics) = &self.cache_metrics { match entry.source() { Source::Memory => metrics.hits.inc(), From e7e602a28a6972835de9c4d99dd22bcc0e381b1a Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 16:41:52 +0000 Subject: [PATCH 2/5] fix(ampctl): filter spans client side --- crates/core/trace-report/src/lib.rs | 48 ++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/crates/core/trace-report/src/lib.rs b/crates/core/trace-report/src/lib.rs index d07aebab0..6100a1574 100644 --- a/crates/core/trace-report/src/lib.rs +++ b/crates/core/trace-report/src/lib.rs @@ -105,6 +105,16 @@ pub fn merge_traces(traces: Vec) -> jaeger::Trace { } /// Search Jaeger and produce a filtered trace, or load from a file. +/// +/// Tag filters (`--filter key=value`) are applied **client-side** rather than +/// being passed to the Jaeger search API. This is because VictoriaTraces +/// returns significantly fewer spans per trace when server-side tag filters +/// are active, causing incomplete results for long-running traces. +/// +/// The flow is: +/// 1. Search Jaeger by operation + time range (no tag filter) +/// 2. Keep only traces where at least one span matches all tag filters +/// 3. Filter to subtrees rooted at the configured root span names #[expect(clippy::too_many_arguments)] pub async fn fetch_and_filter( jaeger_url: &str, @@ -125,11 +135,12 @@ pub async fn fetch_and_filter( } None => { let primary_operation = root_spans.first().copied(); + // Search without tag filters — apply them client-side below let params = jaeger::SearchParams { service, operation: primary_operation, limit, - tags: filters, + tags: &[], start_us: after, end_us: before, }; @@ -137,6 +148,41 @@ pub async fn fetch_and_filter( if traces.is_empty() { bail!("no traces found matching filters"); } + + // Client-side tag filtering: keep only traces where at least one + // span has ALL the requested tag key=value pairs. + let traces = if filters.is_empty() { + traces + } else { + let filtered: Vec<_> = traces + .into_iter() + .filter(|trace| { + trace.spans.iter().any(|span| { + filters.iter().all(|(key, value)| { + span.tags.iter().any(|tag| { + tag.key == *key + && tag.value.as_str().map_or_else( + || tag.value.to_string() == *value, + |v| v == value, + ) + }) + }) + }) + }) + .collect(); + if filtered.is_empty() { + bail!( + "no traces matched tag filters: {}", + filters + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(", ") + ); + } + filtered + }; + let merged = merge_traces(traces); eprintln!("Fetched {} spans from Jaeger", merged.spans.len()); merged From 6f02901005ff4c5559f44bb073ab723807ef0144 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 16:50:43 +0000 Subject: [PATCH 3/5] refactor(ampctl): move `trace-report` into ampctl --- Cargo.lock | 19 +- Cargo.toml | 1 - crates/bin/ampctl/Cargo.toml | 5 +- crates/bin/ampctl/src/cmd/trace.rs | 105 +++++++++ crates/bin/ampctl/src/cmd/trace/fetch.rs | 5 +- .../ampctl/src/cmd/trace}/flamegraph.rs | 2 +- .../ampctl/src/cmd/trace}/jaeger.rs | 0 crates/bin/ampctl/src/cmd/trace/report.rs | 96 +++++++-- crates/bin/ampctl/src/cmd/trace/search.rs | 7 +- .../src => bin/ampctl/src/cmd/trace}/time.rs | 0 .../src => bin/ampctl/src/cmd/trace}/tree.rs | 2 +- crates/core/trace-report/Cargo.toml | 15 -- crates/core/trace-report/src/lib.rs | 199 ------------------ 13 files changed, 199 insertions(+), 257 deletions(-) rename crates/{core/trace-report/src => bin/ampctl/src/cmd/trace}/flamegraph.rs (99%) rename crates/{core/trace-report/src => bin/ampctl/src/cmd/trace}/jaeger.rs (100%) rename crates/{core/trace-report/src => bin/ampctl/src/cmd/trace}/time.rs (100%) rename crates/{core/trace-report/src => bin/ampctl/src/cmd/trace}/tree.rs (99%) delete mode 100644 crates/core/trace-report/Cargo.toml delete mode 100644 crates/core/trace-report/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 0f844698d..11ef46e25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1359,6 +1359,7 @@ dependencies = [ "amp-object-store", "amp-worker-core", "anyhow", + "chrono", "clap", "console", "datasets-common", @@ -1366,6 +1367,8 @@ dependencies = [ "datasets-raw", "evm-rpc-datasets", "firehose-datasets", + "flate2", + "inferno", "monitoring", "object_store", "reqwest 0.13.2", @@ -1377,9 +1380,9 @@ dependencies = [ "thiserror 2.0.18", "tokio", "toml", - "trace-report", "tracing", "url", + "urlencoding", "vergen-gitcl", "verification", "worker", @@ -12906,20 +12909,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" -[[package]] -name = "trace-report" -version = "0.1.0" -dependencies = [ - "anyhow", - "chrono", - "flate2", - "inferno", - "reqwest 0.13.2", - "serde", - "serde_json", - "urlencoding", -] - [[package]] name = "tracing" version = "0.1.44" diff --git a/Cargo.toml b/Cargo.toml index b88f55cd5..6599b53ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ members = [ "crates/core/providers-solana/gen", "crates/core/providers-static", "crates/core/providers-static/gen", - "crates/core/trace-report", "crates/core/verification", "crates/core/worker-core", "crates/core/worker-datasets-derived", diff --git a/crates/bin/ampctl/Cargo.toml b/crates/bin/ampctl/Cargo.toml index 57f472ba2..a1fbcfbd0 100644 --- a/crates/bin/ampctl/Cargo.toml +++ b/crates/bin/ampctl/Cargo.toml @@ -20,11 +20,14 @@ amp-worker-core = { path = "../../core/worker-core" } anyhow.workspace = true clap.workspace = true console = "0.16.1" +chrono.workspace = true datasets-common = { path = "../../core/datasets-common" } datasets-derived = { path = "../../core/datasets-derived" } datasets-raw = { path = "../../core/datasets-raw" } evm-rpc-datasets = { path = "../../extractors/evm-rpc" } firehose-datasets = { path = "../../extractors/firehose" } +flate2 = "1" +inferno = "0.12" monitoring = { path = "../../core/monitoring" } object_store.workspace = true reqwest.workspace = true @@ -34,8 +37,8 @@ serde_json.workspace = true solana-datasets = { path = "../../extractors/solana" } tempo-datasets = { path = "../../extractors/tempo" } thiserror.workspace = true -trace-report = { path = "../../core/trace-report" } tokio.workspace = true +urlencoding = "2" toml.workspace = true tracing.workspace = true url.workspace = true diff --git a/crates/bin/ampctl/src/cmd/trace.rs b/crates/bin/ampctl/src/cmd/trace.rs index b21246767..ad9a687bc 100644 --- a/crates/bin/ampctl/src/cmd/trace.rs +++ b/crates/bin/ampctl/src/cmd/trace.rs @@ -1,9 +1,32 @@ mod fetch; +mod flamegraph; +mod jaeger; mod report; mod search; +mod time; +mod tree; + +use std::path::Path; + +use anyhow::{Context, Result}; +use flamegraph::FlamegraphConfig; const SERVICE: &str = "tracing"; +/// Root span names for each report type. +pub mod roots { + pub const QUERY: &[&str] = &["do_get"]; + pub const DERIVED_DATASET: &[&str] = &[ + "execute_microbatch", + "next_microbatch_range", + "write", + "close", + "register", + "send_location_change_notif", + ]; + pub const RAW_DATASET: &[&str] = &["run_range"]; +} + #[derive(Debug, clap::Subcommand)] pub enum Commands { /// Generate a performance report (flamegraphs, folded stacks, span tree) @@ -72,3 +95,85 @@ pub fn parse_filters(filters: &[String]) -> anyhow::Result }) .collect() } + +/// Generate a full report: trace JSON, wallclock SVG, busy SVG, folded stacks. +fn generate_report(trace: &jaeger::Trace, output_dir: &Path, prefix: &str) -> Result<()> { + std::fs::create_dir_all(output_dir) + .with_context(|| format!("creating {}", output_dir.display()))?; + + // Save trace + let json_path = output_dir.join(format!("{prefix}_trace.json.gz")); + save_trace_gz(trace, &json_path)?; + eprintln!("Wrote trace to {}", json_path.display()); + + // Span tree + eprintln!("\n--- Span tree ---"); + tree::print_tree(trace); + eprintln!("---\n"); + + // Wallclock flamegraph + let wallclock_config = FlamegraphConfig { + min_duration_us: 0, + use_busy_time: false, + }; + let wallclock_path = output_dir.join(format!("{prefix}_wallclock.svg")); + flamegraph::generate(trace, &wallclock_path, &wallclock_config)?; + eprintln!("Wrote wallclock flamegraph to {}", wallclock_path.display()); + + // Busy-time flamegraph + let busy_config = FlamegraphConfig { + min_duration_us: 0, + use_busy_time: true, + }; + let busy_path = output_dir.join(format!("{prefix}_busy.svg")); + flamegraph::generate(trace, &busy_path, &busy_config)?; + eprintln!("Wrote busy-time flamegraph to {}", busy_path.display()); + + // Folded stacks + let folded_path = output_dir.join(format!("{prefix}_folded.txt")); + let folded = flamegraph::build_folded(trace, &wallclock_config)?; + let folded_text = folded.join("\n"); + std::fs::write(&folded_path, &folded_text)?; + eprintln!("Wrote folded stacks to {}", folded_path.display()); + + Ok(()) +} + +/// Load a trace from a JSON or JSON.gz file. +fn load_trace(path: &Path) -> Result { + let bytes = std::fs::read(path).with_context(|| format!("reading {}", path.display()))?; + if path.to_string_lossy().ends_with(".gz") { + use std::io::Read; + + use flate2::read::GzDecoder; + let mut decoder = GzDecoder::new(&bytes[..]); + let mut json = String::new(); + decoder.read_to_string(&mut json)?; + Ok(serde_json::from_str(&json)?) + } else { + Ok(serde_json::from_slice(&bytes)?) + } +} + +/// Save a trace as gzipped JSON. +fn save_trace_gz(trace: &jaeger::Trace, path: &Path) -> Result<()> { + use std::io::Write; + + use flate2::{Compression, write::GzEncoder}; + let json = serde_json::to_vec(trace)?; + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&json)?; + std::fs::write(path, encoder.finish()?)?; + Ok(()) +} + +/// Merge multiple Jaeger traces into one. +fn merge_traces(traces: Vec) -> jaeger::Trace { + let mut iter = traces.into_iter(); + let mut merged = iter.next().expect("at least one trace"); + for trace in iter { + merged.spans.extend(trace.spans); + merged.processes.extend(trace.processes); + } + merged +} diff --git a/crates/bin/ampctl/src/cmd/trace/fetch.rs b/crates/bin/ampctl/src/cmd/trace/fetch.rs index 451576fea..174bca136 100644 --- a/crates/bin/ampctl/src/cmd/trace/fetch.rs +++ b/crates/bin/ampctl/src/cmd/trace/fetch.rs @@ -1,7 +1,6 @@ use std::path::PathBuf; -use trace_report::jaeger; - +use super::jaeger; use super::{RemoteArgs, basic_auth}; #[derive(Debug, clap::Args)] @@ -22,7 +21,7 @@ pub async fn run(args: Args) -> anyhow::Result<()> { jaeger::fetch_trace(&args.remote.jaeger_url, &args.trace_id, auth.as_deref()).await?; if args.output.to_string_lossy().ends_with(".gz") { - trace_report::save_trace_gz(&trace, &args.output)?; + super::save_trace_gz(&trace, &args.output)?; } else { let json = serde_json::to_vec(&trace)?; std::fs::write(&args.output, json)?; diff --git a/crates/core/trace-report/src/flamegraph.rs b/crates/bin/ampctl/src/cmd/trace/flamegraph.rs similarity index 99% rename from crates/core/trace-report/src/flamegraph.rs rename to crates/bin/ampctl/src/cmd/trace/flamegraph.rs index 33f833ae7..93f3e34a0 100644 --- a/crates/core/trace-report/src/flamegraph.rs +++ b/crates/bin/ampctl/src/cmd/trace/flamegraph.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, fs::File, io::BufWriter, path::Path}; use anyhow::{Context, Result}; use inferno::flamegraph::{self, Options}; -use crate::jaeger::{Span, Trace}; +use super::jaeger::{Span, Trace}; pub struct FlamegraphConfig { pub min_duration_us: u64, diff --git a/crates/core/trace-report/src/jaeger.rs b/crates/bin/ampctl/src/cmd/trace/jaeger.rs similarity index 100% rename from crates/core/trace-report/src/jaeger.rs rename to crates/bin/ampctl/src/cmd/trace/jaeger.rs diff --git a/crates/bin/ampctl/src/cmd/trace/report.rs b/crates/bin/ampctl/src/cmd/trace/report.rs index 024b8a196..8bc9536f0 100644 --- a/crates/bin/ampctl/src/cmd/trace/report.rs +++ b/crates/bin/ampctl/src/cmd/trace/report.rs @@ -1,7 +1,8 @@ use std::path::PathBuf; -use trace_report::time::parse_time; +use anyhow::bail; +use super::time::parse_time; use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters}; /// Input source: Jaeger search or local file. @@ -48,18 +49,18 @@ pub enum Commands { pub async fn run(command: Commands) -> anyhow::Result<()> { let (input, output_dir, root_spans, prefix) = match &command { Commands::Query { input, output_dir } => { - (input, output_dir, trace_report::roots::QUERY, "query") + (input, output_dir, super::roots::QUERY, "query") } Commands::DerivedDataset { input, output_dir } => ( input, output_dir, - trace_report::roots::DERIVED_DATASET, + super::roots::DERIVED_DATASET, "derived_dataset", ), Commands::RawDataset { input, output_dir } => ( input, output_dir, - trace_report::roots::RAW_DATASET, + super::roots::RAW_DATASET, "raw_dataset", ), }; @@ -69,20 +70,83 @@ pub async fn run(command: Commands) -> anyhow::Result<()> { let before = input.filter.before.as_deref().map(parse_time).transpose()?; let auth = basic_auth(); - let trace = trace_report::fetch_and_filter( - &input.remote.jaeger_url, - auth.as_deref(), - &tags, - after, - before, - input.filter.limit, - SERVICE, + let full_trace = match &input.file { + Some(path) => { + let trace = super::load_trace(path)?; + eprintln!("Loaded {} spans from {}", trace.spans.len(), path.display()); + trace + } + None => { + let primary_operation = root_spans.first().copied(); + // Search without tag filters — VictoriaTraces returns severely + // truncated spans when server-side tag filters are active. + // Tags are applied client-side below. + let params = super::jaeger::SearchParams { + service: SERVICE, + operation: primary_operation, + limit: input.filter.limit, + tags: &[], + start_us: after, + end_us: before, + }; + let traces = super::jaeger::search_traces( + &input.remote.jaeger_url, + ¶ms, + auth.as_deref(), + ) + .await?; + + if traces.is_empty() { + bail!("no traces found matching filters"); + } + + // Client-side tag filtering: keep only traces where at least one + // span has ALL the requested tag key=value pairs. + let traces = if tags.is_empty() { + traces + } else { + let filtered: Vec<_> = traces + .into_iter() + .filter(|trace| { + trace.spans.iter().any(|span| { + tags.iter().all(|(key, value)| { + span.tags.iter().any(|tag| { + tag.key == *key + && tag.value.as_str().map_or_else( + || tag.value.to_string() == *value, + |v| v == value, + ) + }) + }) + }) + }) + .collect(); + if filtered.is_empty() { + bail!( + "no traces matched tag filters: {}", + tags.iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(", ") + ); + } + filtered + }; + + let merged = super::merge_traces(traces); + eprintln!("Fetched {} spans from Jaeger", merged.spans.len()); + merged + } + }; + + let trace = full_trace.filter_to_subtrees(root_spans); + eprintln!( + "Filtered to {} spans (roots: {:?})", + trace.spans.len(), root_spans, - input.file.as_deref(), - ) - .await?; + ); - trace_report::generate_report(&trace, output_dir, prefix)?; + super::generate_report(&trace, output_dir, prefix)?; Ok(()) } diff --git a/crates/bin/ampctl/src/cmd/trace/search.rs b/crates/bin/ampctl/src/cmd/trace/search.rs index 6dbdac000..793ebea62 100644 --- a/crates/bin/ampctl/src/cmd/trace/search.rs +++ b/crates/bin/ampctl/src/cmd/trace/search.rs @@ -1,8 +1,5 @@ -use trace_report::{ - jaeger, - time::{format_epoch_secs, parse_time}, -}; - +use super::jaeger; +use super::time::{format_epoch_secs, parse_time}; use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters}; #[derive(Debug, clap::Args)] diff --git a/crates/core/trace-report/src/time.rs b/crates/bin/ampctl/src/cmd/trace/time.rs similarity index 100% rename from crates/core/trace-report/src/time.rs rename to crates/bin/ampctl/src/cmd/trace/time.rs diff --git a/crates/core/trace-report/src/tree.rs b/crates/bin/ampctl/src/cmd/trace/tree.rs similarity index 99% rename from crates/core/trace-report/src/tree.rs rename to crates/bin/ampctl/src/cmd/trace/tree.rs index c7a93ae51..3497ca317 100644 --- a/crates/core/trace-report/src/tree.rs +++ b/crates/bin/ampctl/src/cmd/trace/tree.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::jaeger; +use super::jaeger; pub fn print_tree(trace: &jaeger::Trace) { let mut children: HashMap<&str, Vec<&jaeger::Span>> = HashMap::new(); diff --git a/crates/core/trace-report/Cargo.toml b/crates/core/trace-report/Cargo.toml deleted file mode 100644 index 873affb9f..000000000 --- a/crates/core/trace-report/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "trace-report" -edition.workspace = true -version.workspace = true -license-file.workspace = true - -[dependencies] -anyhow.workspace = true -chrono.workspace = true -flate2 = "1" -inferno = "0.12" -reqwest.workspace = true -serde.workspace = true -serde_json.workspace = true -urlencoding = "2" diff --git a/crates/core/trace-report/src/lib.rs b/crates/core/trace-report/src/lib.rs deleted file mode 100644 index 6100a1574..000000000 --- a/crates/core/trace-report/src/lib.rs +++ /dev/null @@ -1,199 +0,0 @@ -pub mod flamegraph; -pub mod jaeger; -pub mod time; -pub mod tree; - -use std::path::Path; - -use anyhow::{Context, Result, bail}; -use flamegraph::FlamegraphConfig; - -/// Root span names for each report type. -pub mod roots { - pub const QUERY: &[&str] = &["do_get"]; - pub const DERIVED_DATASET: &[&str] = &[ - "execute_microbatch", - "next_microbatch_range", - "write", - "close", - "register", - "send_location_change_notif", - ]; - pub const RAW_DATASET: &[&str] = &["run_range"]; -} - -/// Generate a full report: trace JSON, wallclock SVG, busy SVG, folded stacks. -pub fn generate_report(trace: &jaeger::Trace, output_dir: &Path, prefix: &str) -> Result<()> { - std::fs::create_dir_all(output_dir) - .with_context(|| format!("creating {}", output_dir.display()))?; - - // Save trace - let json_path = output_dir.join(format!("{prefix}_trace.json.gz")); - save_trace_gz(trace, &json_path)?; - eprintln!("Wrote trace to {}", json_path.display()); - - // Span tree - eprintln!("\n--- Span tree ---"); - tree::print_tree(trace); - eprintln!("---\n"); - - // Wallclock flamegraph - let wallclock_config = FlamegraphConfig { - min_duration_us: 0, - use_busy_time: false, - }; - let wallclock_path = output_dir.join(format!("{prefix}_wallclock.svg")); - flamegraph::generate(trace, &wallclock_path, &wallclock_config)?; - eprintln!("Wrote wallclock flamegraph to {}", wallclock_path.display()); - - // Busy-time flamegraph - let busy_config = FlamegraphConfig { - min_duration_us: 0, - use_busy_time: true, - }; - let busy_path = output_dir.join(format!("{prefix}_busy.svg")); - flamegraph::generate(trace, &busy_path, &busy_config)?; - eprintln!("Wrote busy-time flamegraph to {}", busy_path.display()); - - // Folded stacks - let folded_path = output_dir.join(format!("{prefix}_folded.txt")); - let folded = flamegraph::build_folded(trace, &wallclock_config)?; - let folded_text = folded.join("\n"); - std::fs::write(&folded_path, &folded_text)?; - eprintln!("Wrote folded stacks to {}", folded_path.display()); - - Ok(()) -} - -/// Load a trace from a JSON or JSON.gz file. -pub fn load_trace(path: &Path) -> Result { - let bytes = std::fs::read(path).with_context(|| format!("reading {}", path.display()))?; - if path.to_string_lossy().ends_with(".gz") { - use std::io::Read; - - use flate2::read::GzDecoder; - let mut decoder = GzDecoder::new(&bytes[..]); - let mut json = String::new(); - decoder.read_to_string(&mut json)?; - Ok(serde_json::from_str(&json)?) - } else { - Ok(serde_json::from_slice(&bytes)?) - } -} - -/// Save a trace as gzipped JSON. -pub fn save_trace_gz(trace: &jaeger::Trace, path: &Path) -> Result<()> { - use std::io::Write; - - use flate2::{Compression, write::GzEncoder}; - let json = serde_json::to_vec(trace)?; - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(&json)?; - std::fs::write(path, encoder.finish()?)?; - Ok(()) -} - -/// Merge multiple Jaeger traces into one. -pub fn merge_traces(traces: Vec) -> jaeger::Trace { - let mut iter = traces.into_iter(); - let mut merged = iter.next().expect("at least one trace"); - for trace in iter { - merged.spans.extend(trace.spans); - merged.processes.extend(trace.processes); - } - merged -} - -/// Search Jaeger and produce a filtered trace, or load from a file. -/// -/// Tag filters (`--filter key=value`) are applied **client-side** rather than -/// being passed to the Jaeger search API. This is because VictoriaTraces -/// returns significantly fewer spans per trace when server-side tag filters -/// are active, causing incomplete results for long-running traces. -/// -/// The flow is: -/// 1. Search Jaeger by operation + time range (no tag filter) -/// 2. Keep only traces where at least one span matches all tag filters -/// 3. Filter to subtrees rooted at the configured root span names -#[expect(clippy::too_many_arguments)] -pub async fn fetch_and_filter( - jaeger_url: &str, - basic_auth: Option<&str>, - filters: &[(String, String)], - after: Option, - before: Option, - limit: u32, - service: &str, - root_spans: &[&str], - file: Option<&Path>, -) -> Result { - let full_trace = match file { - Some(path) => { - let trace = load_trace(path)?; - eprintln!("Loaded {} spans from {}", trace.spans.len(), path.display()); - trace - } - None => { - let primary_operation = root_spans.first().copied(); - // Search without tag filters — apply them client-side below - let params = jaeger::SearchParams { - service, - operation: primary_operation, - limit, - tags: &[], - start_us: after, - end_us: before, - }; - let traces = jaeger::search_traces(jaeger_url, ¶ms, basic_auth).await?; - if traces.is_empty() { - bail!("no traces found matching filters"); - } - - // Client-side tag filtering: keep only traces where at least one - // span has ALL the requested tag key=value pairs. - let traces = if filters.is_empty() { - traces - } else { - let filtered: Vec<_> = traces - .into_iter() - .filter(|trace| { - trace.spans.iter().any(|span| { - filters.iter().all(|(key, value)| { - span.tags.iter().any(|tag| { - tag.key == *key - && tag.value.as_str().map_or_else( - || tag.value.to_string() == *value, - |v| v == value, - ) - }) - }) - }) - }) - .collect(); - if filtered.is_empty() { - bail!( - "no traces matched tag filters: {}", - filters - .iter() - .map(|(k, v)| format!("{k}={v}")) - .collect::>() - .join(", ") - ); - } - filtered - }; - - let merged = merge_traces(traces); - eprintln!("Fetched {} spans from Jaeger", merged.spans.len()); - merged - } - }; - - let filtered = full_trace.filter_to_subtrees(root_spans); - eprintln!( - "Filtered to {} spans (roots: {:?})", - filtered.spans.len(), - root_spans, - ); - Ok(filtered) -} From 998f90d6fb9fac58de3c90406ebeb170e0cf47b6 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 16:54:07 +0000 Subject: [PATCH 4/5] fix(ampctl): stop printing span tree as part of report --- crates/bin/ampctl/src/cmd/trace.rs | 6 -- crates/bin/ampctl/src/cmd/trace/tree.rs | 84 ------------------------- 2 files changed, 90 deletions(-) delete mode 100644 crates/bin/ampctl/src/cmd/trace/tree.rs diff --git a/crates/bin/ampctl/src/cmd/trace.rs b/crates/bin/ampctl/src/cmd/trace.rs index ad9a687bc..595483a7b 100644 --- a/crates/bin/ampctl/src/cmd/trace.rs +++ b/crates/bin/ampctl/src/cmd/trace.rs @@ -4,7 +4,6 @@ mod jaeger; mod report; mod search; mod time; -mod tree; use std::path::Path; @@ -106,11 +105,6 @@ fn generate_report(trace: &jaeger::Trace, output_dir: &Path, prefix: &str) -> Re save_trace_gz(trace, &json_path)?; eprintln!("Wrote trace to {}", json_path.display()); - // Span tree - eprintln!("\n--- Span tree ---"); - tree::print_tree(trace); - eprintln!("---\n"); - // Wallclock flamegraph let wallclock_config = FlamegraphConfig { min_duration_us: 0, diff --git a/crates/bin/ampctl/src/cmd/trace/tree.rs b/crates/bin/ampctl/src/cmd/trace/tree.rs deleted file mode 100644 index 3497ca317..000000000 --- a/crates/bin/ampctl/src/cmd/trace/tree.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::collections::HashMap; - -use super::jaeger; - -pub fn print_tree(trace: &jaeger::Trace) { - let mut children: HashMap<&str, Vec<&jaeger::Span>> = HashMap::new(); - for span in &trace.spans { - for reference in &span.references { - children - .entry(reference.span_id.as_str()) - .or_default() - .push(span); - } - } - for kids in children.values_mut() { - kids.sort_by_key(|s| s.start_time); - } - - let span_ids: std::collections::HashSet<&str> = - trace.spans.iter().map(|s| s.span_id.as_str()).collect(); - let roots: Vec<&jaeger::Span> = trace - .spans - .iter() - .filter(|s| { - s.references.is_empty() - || s.references - .iter() - .all(|r| !span_ids.contains(r.span_id.as_str())) - }) - .collect(); - - for root in roots { - show(root, &children, 0); - } -} - -fn tag_u64(span: &jaeger::Span, key: &str) -> Option { - span.tags.iter().find(|t| t.key == key).and_then(|t| { - t.value - .as_u64() - .or_else(|| t.value.as_i64().map(|v| v as u64)) - }) -} - -fn show(span: &jaeger::Span, children: &HashMap<&str, Vec<&jaeger::Span>>, indent: usize) { - let mut label = span.operation_name.clone(); - for tag in &span.tags { - if matches!(tag.key.as_str(), "start_block" | "end_block" | "query") - && let Some(v) = tag.value.as_str() - { - let v = if v.len() > 50 { &v[..50] } else { v }; - label.push_str(&format!(" {}={}", tag.key, v)); - } - } - - let dur_ms = span.duration as f64 / 1000.0; - let busy_ns = tag_u64(span, "busy_ns"); - let busy_str = busy_ns - .map(|b| format!(", busy={:.1}ms", b as f64 / 1_000_000.0)) - .unwrap_or_default(); - - let kids = children.get(span.span_id.as_str()); - let kids_dur: u64 = kids - .map(|k| k.iter().map(|s| s.duration).sum()) - .unwrap_or(0); - let overflow = if kids_dur > span.duration { - " !!OVERFLOW" - } else { - "" - }; - let self_dur = span.duration.saturating_sub(kids_dur) as f64 / 1000.0; - - println!( - "{:indent$}{label} ({dur_ms:.1}ms, self={self_dur:.1}ms{busy_str}{overflow})", - "", - indent = indent * 2 - ); - - if let Some(kids) = kids { - for child in kids { - show(child, children, indent + 1); - } - } -} From a4c950e76ef70894dbffe5ff72df06ae596ddbec Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 16:54:32 +0000 Subject: [PATCH 5/5] chore(ampctl): fmt --- crates/bin/ampctl/src/cmd/trace/fetch.rs | 3 +-- crates/bin/ampctl/src/cmd/trace/report.rs | 30 +++++++---------------- crates/bin/ampctl/src/cmd/trace/search.rs | 7 +++--- crates/core/data-store/src/lib.rs | 2 +- crates/core/monitoring/src/logging.rs | 1 - 5 files changed, 15 insertions(+), 28 deletions(-) diff --git a/crates/bin/ampctl/src/cmd/trace/fetch.rs b/crates/bin/ampctl/src/cmd/trace/fetch.rs index 174bca136..ede2abf14 100644 --- a/crates/bin/ampctl/src/cmd/trace/fetch.rs +++ b/crates/bin/ampctl/src/cmd/trace/fetch.rs @@ -1,7 +1,6 @@ use std::path::PathBuf; -use super::jaeger; -use super::{RemoteArgs, basic_auth}; +use super::{RemoteArgs, basic_auth, jaeger}; #[derive(Debug, clap::Args)] pub struct Args { diff --git a/crates/bin/ampctl/src/cmd/trace/report.rs b/crates/bin/ampctl/src/cmd/trace/report.rs index 8bc9536f0..ea052abb1 100644 --- a/crates/bin/ampctl/src/cmd/trace/report.rs +++ b/crates/bin/ampctl/src/cmd/trace/report.rs @@ -2,8 +2,7 @@ use std::path::PathBuf; use anyhow::bail; -use super::time::parse_time; -use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters}; +use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters, time::parse_time}; /// Input source: Jaeger search or local file. #[derive(Debug, clap::Args, Clone)] @@ -48,21 +47,16 @@ pub enum Commands { pub async fn run(command: Commands) -> anyhow::Result<()> { let (input, output_dir, root_spans, prefix) = match &command { - Commands::Query { input, output_dir } => { - (input, output_dir, super::roots::QUERY, "query") - } + Commands::Query { input, output_dir } => (input, output_dir, super::roots::QUERY, "query"), Commands::DerivedDataset { input, output_dir } => ( input, output_dir, super::roots::DERIVED_DATASET, "derived_dataset", ), - Commands::RawDataset { input, output_dir } => ( - input, - output_dir, - super::roots::RAW_DATASET, - "raw_dataset", - ), + Commands::RawDataset { input, output_dir } => { + (input, output_dir, super::roots::RAW_DATASET, "raw_dataset") + } }; let tags = parse_filters(&input.filter.filters)?; @@ -89,12 +83,9 @@ pub async fn run(command: Commands) -> anyhow::Result<()> { start_us: after, end_us: before, }; - let traces = super::jaeger::search_traces( - &input.remote.jaeger_url, - ¶ms, - auth.as_deref(), - ) - .await?; + let traces = + super::jaeger::search_traces(&input.remote.jaeger_url, ¶ms, auth.as_deref()) + .await?; if traces.is_empty() { bail!("no traces found matching filters"); @@ -112,10 +103,7 @@ pub async fn run(command: Commands) -> anyhow::Result<()> { tags.iter().all(|(key, value)| { span.tags.iter().any(|tag| { tag.key == *key - && tag.value.as_str().map_or_else( - || tag.value.to_string() == *value, - |v| v == value, - ) + && tag.value.as_str().is_some_and(|v| v == value) }) }) }) diff --git a/crates/bin/ampctl/src/cmd/trace/search.rs b/crates/bin/ampctl/src/cmd/trace/search.rs index 793ebea62..1a5cb887c 100644 --- a/crates/bin/ampctl/src/cmd/trace/search.rs +++ b/crates/bin/ampctl/src/cmd/trace/search.rs @@ -1,6 +1,7 @@ -use super::jaeger; -use super::time::{format_epoch_secs, parse_time}; -use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters}; +use super::{ + FilterArgs, RemoteArgs, SERVICE, basic_auth, jaeger, parse_filters, + time::{format_epoch_secs, parse_time}, +}; #[derive(Debug, clap::Args)] pub struct Args { diff --git a/crates/core/data-store/src/lib.rs b/crates/core/data-store/src/lib.rs index a10f6a56a..85dccd443 100644 --- a/crates/core/data-store/src/lib.rs +++ b/crates/core/data-store/src/lib.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use amp_object_store::{ObjectStoreCreationError, url::ObjectStoreUrl}; -use tracing::Instrument; use bytes::Bytes; use datafusion::{ arrow::datatypes::SchemaRef, @@ -30,6 +29,7 @@ use metadata_db::{ }; use monitoring::telemetry::metrics::{Counter, Gauge, Meter}; use object_store::{ObjectMeta, ObjectStore, buffered::BufWriter, path::Path}; +use tracing::Instrument; use url::Url; use uuid::Uuid; diff --git a/crates/core/monitoring/src/logging.rs b/crates/core/monitoring/src/logging.rs index 670b06971..35432863a 100644 --- a/crates/core/monitoring/src/logging.rs +++ b/crates/core/monitoring/src/logging.rs @@ -103,7 +103,6 @@ const AMP_CRATES: &[&str] = &[ "solana_storage_proto", "tempo_datasets", "tests", - "trace_report", "verification", "worker", ];