Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ members = [
"crates/core/providers-solana/gen",
"crates/core/providers-static",
"crates/core/providers-static/gen",
"crates/core/trace-report",
"crates/core/verification",
"crates/core/worker-core",
"crates/core/worker-datasets-derived",
Expand Down
5 changes: 4 additions & 1 deletion crates/bin/ampctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ amp-worker-core = { path = "../../core/worker-core" }
anyhow.workspace = true
clap.workspace = true
console = "0.16.1"
chrono.workspace = true
datasets-common = { path = "../../core/datasets-common" }
datasets-derived = { path = "../../core/datasets-derived" }
datasets-raw = { path = "../../core/datasets-raw" }
evm-rpc-datasets = { path = "../../extractors/evm-rpc" }
firehose-datasets = { path = "../../extractors/firehose" }
flate2 = "1"
inferno = "0.12"
monitoring = { path = "../../core/monitoring" }
object_store.workspace = true
reqwest.workspace = true
Expand All @@ -34,8 +37,8 @@ serde_json.workspace = true
solana-datasets = { path = "../../extractors/solana" }
tempo-datasets = { path = "../../extractors/tempo" }
thiserror.workspace = true
trace-report = { path = "../../core/trace-report" }
tokio.workspace = true
urlencoding = "2"
toml.workspace = true
tracing.workspace = true
url.workspace = true
Expand Down
99 changes: 99 additions & 0 deletions crates/bin/ampctl/src/cmd/trace.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
mod fetch;
mod flamegraph;
mod jaeger;
mod report;
mod search;
mod time;

use std::path::Path;

use anyhow::{Context, Result};
use flamegraph::FlamegraphConfig;

const SERVICE: &str = "tracing";

/// Root span names for each report type.
pub mod roots {
pub const QUERY: &[&str] = &["do_get"];
pub const DERIVED_DATASET: &[&str] = &[
"execute_microbatch",
"next_microbatch_range",
"write",
"close",
"register",
"send_location_change_notif",
];
pub const RAW_DATASET: &[&str] = &["run_range"];
}

#[derive(Debug, clap::Subcommand)]
pub enum Commands {
/// Generate a performance report (flamegraphs, folded stacks, span tree)
Expand Down Expand Up @@ -72,3 +94,80 @@ pub fn parse_filters(filters: &[String]) -> anyhow::Result<Vec<(String, String)>
})
.collect()
}

/// Generate a full report: trace JSON, wallclock SVG, busy SVG, folded stacks.
fn generate_report(trace: &jaeger::Trace, output_dir: &Path, prefix: &str) -> Result<()> {
std::fs::create_dir_all(output_dir)
.with_context(|| format!("creating {}", output_dir.display()))?;

// Save trace
let json_path = output_dir.join(format!("{prefix}_trace.json.gz"));
save_trace_gz(trace, &json_path)?;
eprintln!("Wrote trace to {}", json_path.display());

// Wallclock flamegraph
let wallclock_config = FlamegraphConfig {
min_duration_us: 0,
use_busy_time: false,
};
let wallclock_path = output_dir.join(format!("{prefix}_wallclock.svg"));
flamegraph::generate(trace, &wallclock_path, &wallclock_config)?;
eprintln!("Wrote wallclock flamegraph to {}", wallclock_path.display());

// Busy-time flamegraph
let busy_config = FlamegraphConfig {
min_duration_us: 0,
use_busy_time: true,
};
let busy_path = output_dir.join(format!("{prefix}_busy.svg"));
flamegraph::generate(trace, &busy_path, &busy_config)?;
eprintln!("Wrote busy-time flamegraph to {}", busy_path.display());

// Folded stacks
let folded_path = output_dir.join(format!("{prefix}_folded.txt"));
let folded = flamegraph::build_folded(trace, &wallclock_config)?;
let folded_text = folded.join("\n");
std::fs::write(&folded_path, &folded_text)?;
eprintln!("Wrote folded stacks to {}", folded_path.display());

Ok(())
}

/// Load a trace from a JSON or JSON.gz file.
fn load_trace(path: &Path) -> Result<jaeger::Trace> {
let bytes = std::fs::read(path).with_context(|| format!("reading {}", path.display()))?;
if path.to_string_lossy().ends_with(".gz") {
use std::io::Read;

use flate2::read::GzDecoder;
let mut decoder = GzDecoder::new(&bytes[..]);
let mut json = String::new();
decoder.read_to_string(&mut json)?;
Ok(serde_json::from_str(&json)?)
} else {
Ok(serde_json::from_slice(&bytes)?)
}
}

/// Save a trace as gzipped JSON.
fn save_trace_gz(trace: &jaeger::Trace, path: &Path) -> Result<()> {
use std::io::Write;

use flate2::{Compression, write::GzEncoder};
let json = serde_json::to_vec(trace)?;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&json)?;
std::fs::write(path, encoder.finish()?)?;
Ok(())
}

/// Merge multiple Jaeger traces into one.
fn merge_traces(traces: Vec<jaeger::Trace>) -> jaeger::Trace {
let mut iter = traces.into_iter();
let mut merged = iter.next().expect("at least one trace");
for trace in iter {
merged.spans.extend(trace.spans);
merged.processes.extend(trace.processes);
}
merged
}
6 changes: 2 additions & 4 deletions crates/bin/ampctl/src/cmd/trace/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::path::PathBuf;

use trace_report::jaeger;

use super::{RemoteArgs, basic_auth};
use super::{RemoteArgs, basic_auth, jaeger};

#[derive(Debug, clap::Args)]
pub struct Args {
Expand All @@ -22,7 +20,7 @@ pub async fn run(args: Args) -> anyhow::Result<()> {
jaeger::fetch_trace(&args.remote.jaeger_url, &args.trace_id, auth.as_deref()).await?;

if args.output.to_string_lossy().ends_with(".gz") {
trace_report::save_trace_gz(&trace, &args.output)?;
super::save_trace_gz(&trace, &args.output)?;
} else {
let json = serde_json::to_vec(&trace)?;
std::fs::write(&args.output, json)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, fs::File, io::BufWriter, path::Path};
use anyhow::{Context, Result};
use inferno::flamegraph::{self, Options};

use crate::jaeger::{Span, Trace};
use super::jaeger::{Span, Trace};

pub struct FlamegraphConfig {
pub min_duration_us: u64,
Expand Down
100 changes: 76 additions & 24 deletions crates/bin/ampctl/src/cmd/trace/report.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::path::PathBuf;

use trace_report::time::parse_time;
use anyhow::bail;

use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters};
use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters, time::parse_time};

/// Input source: Jaeger search or local file.
#[derive(Debug, clap::Args, Clone)]
Expand Down Expand Up @@ -47,42 +47,94 @@ pub enum Commands {

pub async fn run(command: Commands) -> anyhow::Result<()> {
let (input, output_dir, root_spans, prefix) = match &command {
Commands::Query { input, output_dir } => {
(input, output_dir, trace_report::roots::QUERY, "query")
}
Commands::Query { input, output_dir } => (input, output_dir, super::roots::QUERY, "query"),
Commands::DerivedDataset { input, output_dir } => (
input,
output_dir,
trace_report::roots::DERIVED_DATASET,
super::roots::DERIVED_DATASET,
"derived_dataset",
),
Commands::RawDataset { input, output_dir } => (
input,
output_dir,
trace_report::roots::RAW_DATASET,
"raw_dataset",
),
Commands::RawDataset { input, output_dir } => {
(input, output_dir, super::roots::RAW_DATASET, "raw_dataset")
}
};

let tags = parse_filters(&input.filter.filters)?;
let after = input.filter.after.as_deref().map(parse_time).transpose()?;
let before = input.filter.before.as_deref().map(parse_time).transpose()?;
let auth = basic_auth();

let trace = trace_report::fetch_and_filter(
&input.remote.jaeger_url,
auth.as_deref(),
&tags,
after,
before,
input.filter.limit,
SERVICE,
let full_trace = match &input.file {
Some(path) => {
let trace = super::load_trace(path)?;
eprintln!("Loaded {} spans from {}", trace.spans.len(), path.display());
trace
}
None => {
let primary_operation = root_spans.first().copied();
// Search without tag filters — VictoriaTraces returns severely
// truncated spans when server-side tag filters are active.
// Tags are applied client-side below.
let params = super::jaeger::SearchParams {
service: SERVICE,
operation: primary_operation,
limit: input.filter.limit,
tags: &[],
start_us: after,
end_us: before,
};
let traces =
super::jaeger::search_traces(&input.remote.jaeger_url, &params, auth.as_deref())
.await?;

if traces.is_empty() {
bail!("no traces found matching filters");
}

// Client-side tag filtering: keep only traces where at least one
// span has ALL the requested tag key=value pairs.
let traces = if tags.is_empty() {
traces
} else {
let filtered: Vec<_> = traces
.into_iter()
.filter(|trace| {
trace.spans.iter().any(|span| {
tags.iter().all(|(key, value)| {
span.tags.iter().any(|tag| {
tag.key == *key
&& tag.value.as_str().is_some_and(|v| v == value)
})
})
})
})
.collect();
if filtered.is_empty() {
bail!(
"no traces matched tag filters: {}",
tags.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", ")
);
}
filtered
};

let merged = super::merge_traces(traces);
eprintln!("Fetched {} spans from Jaeger", merged.spans.len());
merged
}
};

let trace = full_trace.filter_to_subtrees(root_spans);
eprintln!(
"Filtered to {} spans (roots: {:?})",
trace.spans.len(),
root_spans,
input.file.as_deref(),
)
.await?;
);

trace_report::generate_report(&trace, output_dir, prefix)?;
super::generate_report(&trace, output_dir, prefix)?;

Ok(())
}
6 changes: 2 additions & 4 deletions crates/bin/ampctl/src/cmd/trace/search.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use trace_report::{
jaeger,
use super::{
FilterArgs, RemoteArgs, SERVICE, basic_auth, jaeger, parse_filters,
time::{format_epoch_secs, parse_time},
};

use super::{FilterArgs, RemoteArgs, SERVICE, basic_auth, parse_filters};

#[derive(Debug, clap::Args)]
pub struct Args {
#[command(flatten)]
Expand Down
Loading
Loading