diff --git a/.github/scripts/check_version_sync.py b/.github/scripts/check_version_sync.py new file mode 100644 index 0000000..02ebb63 --- /dev/null +++ b/.github/scripts/check_version_sync.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import subprocess +import sys +import tomllib +from pathlib import Path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Verify Cargo.toml version is aligned with git tags." + ) + parser.add_argument( + "--cargo-toml", + default="Cargo.toml", + help="Path to the Cargo.toml file to validate.", + ) + parser.add_argument( + "--tag", + help="Require Cargo.toml to match this exact git tag (for release workflows).", + ) + return parser.parse_args() + + +def normalize_version(raw: str) -> str: + value = raw.strip() + return value[1:] if value.startswith("v") else value + + +def parse_version(raw: str) -> tuple[int, ...]: + normalized = normalize_version(raw) + parts = normalized.split(".") + if not parts or any(not part.isdigit() for part in parts): + raise ValueError( + f"Unsupported version '{raw}'. Expected dotted numeric versions like 0.5.26." + ) + return tuple(int(part) for part in parts) + + +def read_cargo_version(cargo_toml: Path) -> str: + with cargo_toml.open("rb") as handle: + data = tomllib.load(handle) + try: + return str(data["package"]["version"]) + except KeyError as error: + raise KeyError(f"Missing [package].version in {cargo_toml}") from error + + +def latest_release_tag() -> str | None: + completed = subprocess.run( + ["git", "tag", "--list", "v*", "--sort=-v:refname"], + check=True, + capture_output=True, + text=True, + ) + for line in completed.stdout.splitlines(): + candidate = line.strip() + if candidate: + return candidate + return None + + +def main() -> int: + args = parse_args() + cargo_toml = Path(args.cargo_toml) + cargo_version = read_cargo_version(cargo_toml) + cargo_tuple = parse_version(cargo_version) + + if args.tag: + tag_version = normalize_version(args.tag) + if cargo_version != tag_version: + print( + f"Cargo.toml version {cargo_version} does not match release tag {args.tag}.", + file=sys.stderr, + ) + return 1 + print(f"Cargo.toml version {cargo_version} matches release tag {args.tag}.") + return 0 + + latest_tag = latest_release_tag() + if latest_tag is None: + print(f"Cargo.toml version {cargo_version} validated (no release tags found).") + return 0 + + latest_tuple = parse_version(latest_tag) + if cargo_tuple < latest_tuple: + print( + ( + f"Cargo.toml version {cargo_version} is behind the latest tag {latest_tag}. " + "Bump [package].version before merging." + ), + file=sys.stderr, + ) + return 1 + + print( + f"Cargo.toml version {cargo_version} is aligned with latest tag {latest_tag}." + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3d53b6e..ace1e06 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,13 +10,22 @@ env: CARGO_TERM_COLOR: always jobs: + version: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + with: + fetch-depth: 0 + - name: Verify Cargo version is not behind tags + run: python3 .github/scripts/check_version_sync.py + lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Lint GitHub Actions uses: raven-actions/actionlint@v2 - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v5 with: node-version: '20' - name: Build frontend @@ -33,7 +42,7 @@ jobs: security: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - uses: dtolnay/rust-toolchain@1.88.0 - uses: Swatinem/rust-cache@v2 - name: Install cargo-audit @@ -47,8 +56,8 @@ jobs: matrix: os: [ubuntu-latest, macos-latest, windows-latest] steps: - - uses: actions/checkout@v4 - - uses: actions/setup-node@v4 + - uses: actions/checkout@v5 + - uses: actions/setup-node@v5 with: node-version: '20' - name: Build frontend diff --git a/.github/workflows/diffscope.yml b/.github/workflows/diffscope.yml index 75449d1..e141b23 100644 --- a/.github/workflows/diffscope.yml +++ b/.github/workflows/diffscope.yml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-latest if: github.event.pull_request.head.repo.full_name == github.repository && !github.event.pull_request.draft steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 with: fetch-depth: 0 ref: ${{ github.event.pull_request.head.sha }} diff --git a/.github/workflows/eval.yml b/.github/workflows/eval.yml index 09c2eca..92dff2e 100644 --- a/.github/workflows/eval.yml +++ b/.github/workflows/eval.yml @@ -32,7 +32,7 @@ jobs: echo "configured=false" >> "$GITHUB_OUTPUT" fi - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 if: ${{ steps.secret-check.outputs.configured == 'true' }} with: fetch-depth: 0 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 21a3b9f..99216af 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -21,7 +21,9 @@ jobs: contents: write steps: - name: Checkout code - uses: actions/checkout@v4 + uses: actions/checkout@v5 + - name: Verify Cargo version matches tag + run: python3 .github/scripts/check_version_sync.py --tag "${{ github.ref_name }}" - name: Extract version id: get_version @@ -99,10 +101,10 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Install Node.js - uses: actions/setup-node@v4 + uses: actions/setup-node@v5 with: node-version: '20' @@ -215,7 +217,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/Cargo.lock b/Cargo.lock index bd61b85..2fa1d1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,7 +543,7 @@ dependencies = [ [[package]] name = "diffscope" -version = "0.5.3" +version = "0.5.26" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index bb93410..2863eb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "diffscope" -version = "0.5.3" +version = "0.5.26" edition = "2021" rust-version = "1.88" authors = ["Jonathan Haas "] diff --git a/src/commands/eval/runner/execute.rs b/src/commands/eval/runner/execute.rs index df8889b..95c7ee4 100644 --- a/src/commands/eval/runner/execute.rs +++ b/src/commands/eval/runner/execute.rs @@ -40,7 +40,7 @@ pub(in super::super) async fn run_eval_fixture( EvalFixtureDagConfig { repro_validate, repro_max_comments, - artifact_context, + artifact_context: artifact_context.cloned(), }, ) .await?; diff --git a/src/commands/eval/runner/execute/dag.rs b/src/commands/eval/runner/execute/dag.rs index d10a5a4..39f9aa6 100644 --- a/src/commands/eval/runner/execute/dag.rs +++ b/src/commands/eval/runner/execute/dag.rs @@ -1,12 +1,13 @@ use anyhow::Result; -use futures::FutureExt; +use futures::{future::BoxFuture, FutureExt}; +use std::cell::RefCell; use tracing::debug; use crate::config; use crate::core; use crate::core::dag::{ - describe_dag, execute_dag, DagExecutionRecord, DagExecutionTrace, DagGraphContract, DagNode, - DagNodeContract, DagNodeExecutionHints, DagNodeKind, DagNodeSpec, + describe_dag, execute_dag_with_parallelism, DagExecutionRecord, DagExecutionTrace, + DagGraphContract, DagNode, DagNodeContract, DagNodeExecutionHints, DagNodeKind, DagNodeSpec, }; use crate::core::eval_benchmarks::FixtureResult as BenchmarkFixtureResult; use crate::review::review_diff_content_raw; @@ -46,10 +47,10 @@ impl DagNode for EvalFixtureStage { } } -pub(super) struct EvalFixtureDagConfig<'a> { +pub(super) struct EvalFixtureDagConfig { pub(super) repro_validate: bool, pub(super) repro_max_comments: usize, - pub(super) artifact_context: Option<&'a EvalFixtureArtifactContext>, + pub(super) artifact_context: Option, } pub(super) struct EvalFixtureExecutionOutcome { @@ -60,9 +61,9 @@ pub(super) struct EvalFixtureExecutionOutcome { pub(super) details: FixtureResultDetails, } -struct EvalFixtureDagContext<'a> { +struct EvalFixtureDagContext { prepared: PreparedFixtureExecution, - dag_config: EvalFixtureDagConfig<'a>, + dag_config: EvalFixtureDagConfig, comments: Vec, warnings: Vec, verification_report: Option, @@ -76,8 +77,36 @@ struct EvalFixtureDagContext<'a> { dag_traces: Vec, } -impl<'a> EvalFixtureDagContext<'a> { - fn new(prepared: PreparedFixtureExecution, dag_config: EvalFixtureDagConfig<'a>) -> Self { +#[derive(Debug)] +enum EvalFixtureStageOutput { + Review { + comments: Vec, + warnings: Vec, + verification_report: Option, + agent_activity: Option, + dag_traces: Vec, + }, + ExpectationMatching { + match_summary: FixtureMatchSummary, + failures: Vec, + }, + CommentCountValidation { + failures: Vec, + }, + BenchmarkMetrics { + benchmark_metrics: Option, + }, + ReproductionValidation { + reproduction_summary: Option, + warnings: Vec, + }, + ArtifactCapture { + artifact_path: Option, + }, +} + +impl EvalFixtureDagContext { + fn new(prepared: PreparedFixtureExecution, dag_config: EvalFixtureDagConfig) -> Self { Self { prepared, dag_config, @@ -127,21 +156,61 @@ impl<'a> EvalFixtureDagContext<'a> { pub(super) async fn execute_eval_fixture_dag( config: &config::Config, prepared: PreparedFixtureExecution, - dag_config: EvalFixtureDagConfig<'_>, + dag_config: EvalFixtureDagConfig, ) -> Result { let specs = build_stage_specs(dag_config.repro_validate); let dag_description = describe_dag(&specs); debug!(?dag_description, "Executing eval fixture DAG"); - let mut context = EvalFixtureDagContext::new(prepared, dag_config); - let records = execute_dag(&specs, &mut context, |stage, context| { - async move { execute_stage(stage, config, context).await }.boxed() - }) + let context = RefCell::new(EvalFixtureDagContext::new(prepared, dag_config)); + let records = execute_dag_with_parallelism( + &specs, + |stage| { + let mut context = context.borrow_mut(); + spawn_stage(stage, config, &mut context) + }, + |stage, output| { + let mut context = context.borrow_mut(); + apply_stage_output(stage, output, &mut context) + }, + ) .await?; + let mut context = context.into_inner(); rewrite_fixture_artifact_with_eval_trace(&mut context, &records).await?; context.into_outcome(records) } +fn stage_hints(stage: EvalFixtureStage) -> DagNodeExecutionHints { + match stage { + EvalFixtureStage::Review => DagNodeExecutionHints { + parallelizable: false, + retryable: true, + side_effects: false, + subgraph: Some("review_pipeline".to_string()), + }, + EvalFixtureStage::ExpectationMatching + | EvalFixtureStage::CommentCountValidation + | EvalFixtureStage::BenchmarkMetrics => DagNodeExecutionHints { + parallelizable: true, + retryable: true, + side_effects: false, + subgraph: None, + }, + EvalFixtureStage::ReproductionValidation => DagNodeExecutionHints { + parallelizable: true, + retryable: true, + side_effects: false, + subgraph: None, + }, + EvalFixtureStage::ArtifactCapture => DagNodeExecutionHints { + parallelizable: false, + retryable: true, + side_effects: true, + subgraph: None, + }, + } +} + pub(in super::super::super) fn describe_eval_fixture_graph( repro_validate: bool, ) -> DagGraphContract { @@ -166,12 +235,7 @@ pub(in super::super::super) fn describe_eval_fixture_graph( "verification_report".to_string(), "agent_activity".to_string(), ], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: Some("review_pipeline".to_string()), - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, EvalFixtureStage::ExpectationMatching => DagNodeContract { @@ -187,12 +251,7 @@ pub(in super::super::super) fn describe_eval_fixture_graph( .collect(), inputs: vec!["comments".to_string(), "fixture_expectations".to_string()], outputs: vec!["match_summary".to_string(), "failures".to_string()], - hints: DagNodeExecutionHints { - parallelizable: true, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, EvalFixtureStage::CommentCountValidation => DagNodeContract { @@ -211,12 +270,7 @@ pub(in super::super::super) fn describe_eval_fixture_graph( "failures".to_string(), ], outputs: vec!["failures".to_string()], - hints: DagNodeExecutionHints { - parallelizable: true, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, EvalFixtureStage::BenchmarkMetrics => DagNodeContract { @@ -236,12 +290,7 @@ pub(in super::super::super) fn describe_eval_fixture_graph( "failures".to_string(), ], outputs: vec!["benchmark_metrics".to_string()], - hints: DagNodeExecutionHints { - parallelizable: true, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, EvalFixtureStage::ReproductionValidation => DagNodeContract { @@ -261,12 +310,7 @@ pub(in super::super::super) fn describe_eval_fixture_graph( "comments".to_string(), ], outputs: vec!["reproduction_summary".to_string(), "warnings".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, EvalFixtureStage::ArtifactCapture => DagNodeContract { @@ -288,12 +332,7 @@ pub(in super::super::super) fn describe_eval_fixture_graph( "benchmark_metrics".to_string(), ], outputs: vec!["artifact_path".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: true, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, }) @@ -315,26 +354,31 @@ fn build_stage_specs(repro_validate: bool) -> Vec> DagNodeSpec { id: EvalFixtureStage::Review, dependencies: vec![], + hints: stage_hints(EvalFixtureStage::Review), enabled: true, }, DagNodeSpec { id: EvalFixtureStage::ExpectationMatching, dependencies: vec![EvalFixtureStage::Review], + hints: stage_hints(EvalFixtureStage::ExpectationMatching), enabled: true, }, DagNodeSpec { id: EvalFixtureStage::CommentCountValidation, dependencies: vec![EvalFixtureStage::ExpectationMatching], + hints: stage_hints(EvalFixtureStage::CommentCountValidation), enabled: true, }, DagNodeSpec { id: EvalFixtureStage::BenchmarkMetrics, dependencies: vec![EvalFixtureStage::CommentCountValidation], + hints: stage_hints(EvalFixtureStage::BenchmarkMetrics), enabled: true, }, DagNodeSpec { id: EvalFixtureStage::ReproductionValidation, dependencies: vec![EvalFixtureStage::Review], + hints: stage_hints(EvalFixtureStage::ReproductionValidation), enabled: repro_validate, }, DagNodeSpec { @@ -347,131 +391,236 @@ fn build_stage_specs(repro_validate: bool) -> Vec> } else { vec![EvalFixtureStage::BenchmarkMetrics] }, + hints: stage_hints(EvalFixtureStage::ArtifactCapture), enabled: true, }, ] } -async fn execute_stage( +fn spawn_stage( stage: EvalFixtureStage, config: &config::Config, - context: &mut EvalFixtureDagContext<'_>, -) -> Result<()> { + context: &mut EvalFixtureDagContext, +) -> Result>> { match stage { - EvalFixtureStage::Review => execute_review_stage(config, context).await, - EvalFixtureStage::ExpectationMatching => execute_expectation_stage(context), - EvalFixtureStage::CommentCountValidation => execute_comment_count_stage(context), - EvalFixtureStage::BenchmarkMetrics => execute_benchmark_metrics_stage(context), + EvalFixtureStage::Review => { + let diff_content = context.prepared.diff_content.clone(); + let repo_path = context.prepared.repo_path.clone(); + let config = config.clone(); + Ok(async move { + let review_result = + review_diff_content_raw(&diff_content, config, &repo_path).await?; + Ok(EvalFixtureStageOutput::Review { + verification_report: convert_verification_report( + review_result.verification_report, + ), + agent_activity: convert_agent_activity(review_result.agent_activity), + dag_traces: review_result.dag_traces, + comments: review_result.comments, + warnings: review_result.warnings, + }) + } + .boxed()) + } + EvalFixtureStage::ExpectationMatching => { + let expectations = context.prepared.fixture.expect.clone(); + let comments = context.comments.clone(); + Ok(async move { + let match_summary = evaluate_fixture_expectations(&expectations, &comments); + Ok(EvalFixtureStageOutput::ExpectationMatching { + failures: match_summary.failures.clone(), + match_summary, + }) + } + .boxed()) + } + EvalFixtureStage::CommentCountValidation => { + let Some(_) = context.match_summary.as_ref() else { + anyhow::bail!("comment count validation requires expectation matches"); + }; + let total_comments = context.total_comments; + let expectations = context.prepared.fixture.expect.clone(); + let mut failures = context.failures.clone(); + Ok(async move { + append_total_comment_failures(&mut failures, total_comments, &expectations); + Ok(EvalFixtureStageOutput::CommentCountValidation { failures }) + } + .boxed()) + } + EvalFixtureStage::BenchmarkMetrics => { + let Some(match_summary) = context.match_summary.clone() else { + anyhow::bail!("benchmark metrics require expectation matches"); + }; + let prepared = context.prepared.clone(); + let total_comments = context.total_comments; + let failures = context.failures.clone(); + Ok(async move { + Ok(EvalFixtureStageOutput::BenchmarkMetrics { + benchmark_metrics: build_benchmark_metrics( + &prepared, + total_comments, + &match_summary, + &failures, + ), + }) + } + .boxed()) + } EvalFixtureStage::ReproductionValidation => { - execute_reproduction_stage(config, context).await + let config = config.clone(); + let prepared = context.prepared.clone(); + let comments = context.comments.clone(); + let repro_max_comments = context.dag_config.repro_max_comments; + Ok(async move { + let reproduction_summary = maybe_run_reproduction_validation( + &config, + &prepared, + &comments, + repro_max_comments, + ) + .await?; + let warnings = reproduction_summary + .as_ref() + .map(build_reproduction_warnings) + .unwrap_or_default(); + Ok(EvalFixtureStageOutput::ReproductionValidation { + reproduction_summary, + warnings, + }) + } + .boxed()) + } + EvalFixtureStage::ArtifactCapture => { + let Some(match_summary) = context.match_summary.clone() else { + anyhow::bail!("artifact stage requires expectation matching output"); + }; + let prepared = context.prepared.clone(); + let artifact_context = context.dag_config.artifact_context.clone(); + let total_comments = context.total_comments; + let comments = context.comments.clone(); + let warnings = context.warnings.clone(); + let failures = context.failures.clone(); + let benchmark_metrics = context.benchmark_metrics.clone(); + let verification_report = context.verification_report.clone(); + let agent_activity = context.agent_activity.clone(); + let reproduction_summary = context.reproduction_summary.clone(); + let dag_traces = context.dag_traces.clone(); + Ok(async move { + let artifact_path = maybe_write_fixture_artifact(EvalFixtureArtifactInput { + context: artifact_context.as_ref(), + prepared: &prepared, + total_comments, + comments: &comments, + warnings: &warnings, + failures: &failures, + benchmark_metrics: benchmark_metrics.as_ref(), + rule_metrics: &match_summary.rule_metrics, + rule_summary: match_summary.rule_summary, + verification_report: verification_report.as_ref(), + agent_activity: agent_activity.as_ref(), + reproduction_summary: reproduction_summary.as_ref(), + dag_traces: &dag_traces, + }) + .await?; + Ok(EvalFixtureStageOutput::ArtifactCapture { artifact_path }) + } + .boxed()) } - EvalFixtureStage::ArtifactCapture => execute_artifact_stage(context).await, - } -} - -async fn execute_review_stage( - config: &config::Config, - context: &mut EvalFixtureDagContext<'_>, -) -> Result<()> { - let review_result = review_diff_content_raw( - &context.prepared.diff_content, - config.clone(), - &context.prepared.repo_path, - ) - .await?; - context.verification_report = convert_verification_report(review_result.verification_report); - context.agent_activity = convert_agent_activity(review_result.agent_activity); - context.dag_traces = review_result.dag_traces; - context.comments = review_result.comments; - context.warnings = review_result.warnings; - context.total_comments = context.comments.len(); - Ok(()) -} - -fn execute_expectation_stage(context: &mut EvalFixtureDagContext<'_>) -> Result<()> { - let match_summary = - evaluate_fixture_expectations(&context.prepared.fixture.expect, &context.comments); - context.failures = match_summary.failures.clone(); - context.match_summary = Some(match_summary); - Ok(()) -} - -fn execute_comment_count_stage(context: &mut EvalFixtureDagContext<'_>) -> Result<()> { - if context.match_summary.is_none() { - anyhow::bail!("comment count validation requires expectation matches"); } - append_total_comment_failures( - &mut context.failures, - context.total_comments, - &context.prepared.fixture.expect, - ); - Ok(()) } -fn execute_benchmark_metrics_stage(context: &mut EvalFixtureDagContext<'_>) -> Result<()> { - let Some(match_summary) = context.match_summary.as_ref() else { - anyhow::bail!("benchmark metrics require expectation matches"); - }; - context.benchmark_metrics = build_benchmark_metrics( - &context.prepared, - context.total_comments, - match_summary, - &context.failures, - ); - Ok(()) -} - -async fn execute_reproduction_stage( - config: &config::Config, - context: &mut EvalFixtureDagContext<'_>, +fn apply_stage_output( + stage: EvalFixtureStage, + output: EvalFixtureStageOutput, + context: &mut EvalFixtureDagContext, ) -> Result<()> { - context.reproduction_summary = maybe_run_reproduction_validation( - config, - &context.prepared, - &context.comments, - context.dag_config.repro_max_comments, - ) - .await?; - if let Some(summary) = context.reproduction_summary.as_ref() { - context - .warnings - .extend(summary.checks.iter().filter_map(|check| { - check.warning.as_ref().map(|warning| { - format!( - "reproduction validator for comment {} ({}) reported: {}", - check.comment_id, check.model, warning - ) - }) - })); + match (stage, output) { + ( + EvalFixtureStage::Review, + EvalFixtureStageOutput::Review { + comments, + warnings, + verification_report, + agent_activity, + dag_traces, + }, + ) => { + context.total_comments = comments.len(); + context.comments = comments; + context.warnings = warnings; + context.verification_report = verification_report; + context.agent_activity = agent_activity; + context.dag_traces = dag_traces; + Ok(()) + } + ( + EvalFixtureStage::ExpectationMatching, + EvalFixtureStageOutput::ExpectationMatching { + match_summary, + failures, + }, + ) => { + context.match_summary = Some(match_summary); + context.failures = failures; + Ok(()) + } + ( + EvalFixtureStage::CommentCountValidation, + EvalFixtureStageOutput::CommentCountValidation { failures }, + ) => { + context.failures = failures; + Ok(()) + } + ( + EvalFixtureStage::BenchmarkMetrics, + EvalFixtureStageOutput::BenchmarkMetrics { benchmark_metrics }, + ) => { + context.benchmark_metrics = benchmark_metrics; + Ok(()) + } + ( + EvalFixtureStage::ReproductionValidation, + EvalFixtureStageOutput::ReproductionValidation { + reproduction_summary, + warnings, + }, + ) => { + context.reproduction_summary = reproduction_summary; + context.warnings.extend(warnings); + Ok(()) + } + ( + EvalFixtureStage::ArtifactCapture, + EvalFixtureStageOutput::ArtifactCapture { artifact_path }, + ) => { + context.artifact_path = artifact_path; + Ok(()) + } + (stage, output) => anyhow::bail!( + "fixture DAG stage '{}' received incompatible output: {:?}", + stage.name(), + output + ), } - Ok(()) } -async fn execute_artifact_stage(context: &mut EvalFixtureDagContext<'_>) -> Result<()> { - let Some(match_summary) = context.match_summary.as_ref() else { - anyhow::bail!("artifact stage requires expectation matching output"); - }; - context.artifact_path = maybe_write_fixture_artifact(EvalFixtureArtifactInput { - context: context.dag_config.artifact_context, - prepared: &context.prepared, - total_comments: context.total_comments, - comments: &context.comments, - warnings: &context.warnings, - failures: &context.failures, - benchmark_metrics: context.benchmark_metrics.as_ref(), - rule_metrics: &match_summary.rule_metrics, - rule_summary: match_summary.rule_summary, - verification_report: context.verification_report.as_ref(), - agent_activity: context.agent_activity.as_ref(), - reproduction_summary: context.reproduction_summary.as_ref(), - dag_traces: &context.dag_traces, - }) - .await?; - Ok(()) +fn build_reproduction_warnings(summary: &EvalReproductionSummary) -> Vec { + summary + .checks + .iter() + .filter_map(|check| { + check.warning.as_ref().map(|warning| { + format!( + "reproduction validator for comment {} ({}) reported: {}", + check.comment_id, check.model, warning + ) + }) + }) + .collect() } async fn rewrite_fixture_artifact_with_eval_trace( - context: &mut EvalFixtureDagContext<'_>, + context: &mut EvalFixtureDagContext, eval_records: &[DagExecutionRecord], ) -> Result<()> { if context.artifact_path.is_none() { @@ -487,7 +636,7 @@ async fn rewrite_fixture_artifact_with_eval_trace( records: eval_records.to_vec(), }); context.artifact_path = maybe_write_fixture_artifact(EvalFixtureArtifactInput { - context: context.dag_config.artifact_context, + context: context.dag_config.artifact_context.as_ref(), prepared: &context.prepared, total_comments: context.total_comments, comments: &context.comments, @@ -542,6 +691,17 @@ mod tests { assert_eq!(artifact.dependencies, vec!["benchmark_metrics"]); } + #[test] + fn build_stage_specs_marks_reproduction_parallelizable() { + let specs = build_stage_specs(true); + let reproduction = specs + .iter() + .find(|spec| spec.id == EvalFixtureStage::ReproductionValidation) + .unwrap(); + + assert!(reproduction.hints.parallelizable); + } + #[test] fn eval_fixture_graph_contract_exposes_reproduction_outputs() { let graph = describe_eval_fixture_graph(true); diff --git a/src/commands/eval/runner/execute/loading.rs b/src/commands/eval/runner/execute/loading.rs index c719543..3e2c4a3 100644 --- a/src/commands/eval/runner/execute/loading.rs +++ b/src/commands/eval/runner/execute/loading.rs @@ -12,6 +12,7 @@ use super::super::super::{EvalFixture, EvalFixtureMetadata, LoadedEvalFixture}; use diff::load_diff_content; use repo::resolve_repo_path; +#[derive(Debug, Clone)] pub(super) struct PreparedFixtureExecution { pub(super) fixture_path: PathBuf, pub(super) fixture_name: String, diff --git a/src/commands/eval/runner/matching.rs b/src/commands/eval/runner/matching.rs index dd06a73..44a8c48 100644 --- a/src/commands/eval/runner/matching.rs +++ b/src/commands/eval/runner/matching.rs @@ -14,6 +14,7 @@ use required::collect_required_matches; use rules::build_rule_match_metrics; use unexpected::collect_unexpected_matches; +#[derive(Debug, Clone)] pub(super) struct FixtureMatchSummary { pub(super) failures: Vec, pub(super) required_matches: usize, diff --git a/src/core/dag.rs b/src/core/dag.rs index d6392dd..3c381be 100644 --- a/src/core/dag.rs +++ b/src/core/dag.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::hash::Hash; use std::time::Instant; +use tokio::task::JoinSet; pub trait DagNode: Clone + Eq + Hash { fn name(&self) -> &'static str; @@ -13,6 +14,7 @@ pub trait DagNode: Clone + Eq + Hash { pub struct DagNodeSpec { pub id: NodeId, pub dependencies: Vec, + pub hints: DagNodeExecutionHints, pub enabled: bool, } @@ -163,6 +165,119 @@ where Ok(records) } +pub async fn execute_dag_with_parallelism( + specs: &[DagNodeSpec], + mut spawn: SpawnFn, + mut apply: ApplyFn, +) -> Result> +where + NodeId: DagNode + Send + 'static, + TaskOutput: Send + 'static, + SpawnFn: FnMut(NodeId) -> Result>>, + ApplyFn: FnMut(NodeId, TaskOutput) -> Result<()>, +{ + let mut completed = HashSet::new(); + let mut in_flight = HashSet::new(); + let mut join_set = JoinSet::new(); + let mut recorded = Vec::with_capacity(specs.len()); + let mut launch_sequence = 0usize; + + while completed.len() < specs.len() { + let ready_indices = specs + .iter() + .enumerate() + .filter(|(_, candidate)| { + !completed.contains(&candidate.id) + && !in_flight.contains(&candidate.id) + && candidate + .dependencies + .iter() + .all(|dependency| completed.contains(dependency)) + }) + .map(|(index, _)| index) + .collect::>(); + + if join_set.is_empty() { + let Some(index) = ready_indices.first().copied() else { + anyhow::bail!("DAG has unresolved or cyclic dependencies"); + }; + let spec = specs[index].clone(); + if !spec.enabled { + recorded.push(( + launch_sequence, + DagExecutionRecord { + name: spec.id.name().to_string(), + enabled: false, + duration_ms: 0, + }, + )); + launch_sequence += 1; + completed.insert(spec.id); + continue; + } + if !spec.hints.parallelizable { + let started = Instant::now(); + let output = spawn(spec.id.clone())?.await?; + apply(spec.id.clone(), output)?; + recorded.push(( + launch_sequence, + DagExecutionRecord { + name: spec.id.name().to_string(), + enabled: true, + duration_ms: started.elapsed().as_millis() as u64, + }, + )); + launch_sequence += 1; + completed.insert(spec.id); + continue; + } + } + + for index in ready_indices.iter().copied() { + let spec = specs[index].clone(); + if !spec.enabled || !spec.hints.parallelizable { + continue; + } + let sequence = launch_sequence; + launch_sequence += 1; + let id = spec.id.clone(); + let future = spawn(id.clone())?; + let started = Instant::now(); + in_flight.insert(id.clone()); + join_set.spawn(async move { + let output = future.await; + (sequence, id, started.elapsed().as_millis() as u64, output) + }); + } + + if !join_set.is_empty() { + let Some(joined) = join_set.join_next().await else { + anyhow::bail!("DAG has unresolved or cyclic dependencies"); + }; + let (sequence, id, duration_ms, output) = joined + .map_err(|error| anyhow::anyhow!("parallel DAG task failed to join: {error}"))?; + let output = output?; + apply(id.clone(), output)?; + in_flight.remove(&id); + completed.insert(id.clone()); + recorded.push(( + sequence, + DagExecutionRecord { + name: id.name().to_string(), + enabled: true, + duration_ms, + }, + )); + continue; + } + + anyhow::bail!("DAG has unresolved or cyclic dependencies"); + } + + recorded.sort_by_key(|(sequence, _)| *sequence); + Ok(recorded.into_iter().map(|(_, record)| record).collect()) +} + pub fn plan_dag_execution( graph: &DagGraphContract, completed: &[String], @@ -248,6 +363,20 @@ fn derive_satisfied_nodes( mod tests { use super::*; use futures::FutureExt; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + use std::time::Duration; + + fn hints(parallelizable: bool) -> DagNodeExecutionHints { + DagNodeExecutionHints { + parallelizable, + retryable: true, + side_effects: false, + subgraph: None, + } + } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] enum TestNode { @@ -272,16 +401,19 @@ mod tests { DagNodeSpec { id: TestNode::Root, dependencies: vec![], + hints: hints(false), enabled: true, }, DagNodeSpec { id: TestNode::Leaf, dependencies: vec![TestNode::Branch], + hints: hints(false), enabled: true, }, DagNodeSpec { id: TestNode::Branch, dependencies: vec![TestNode::Root], + hints: hints(false), enabled: true, }, ]; @@ -307,11 +439,13 @@ mod tests { DagNodeSpec { id: TestNode::Root, dependencies: vec![], + hints: hints(false), enabled: true, }, DagNodeSpec { id: TestNode::Leaf, dependencies: vec![TestNode::Root], + hints: hints(true), enabled: false, }, ]); @@ -422,4 +556,195 @@ mod tests { assert!(plan.nodes[1].ready); assert!(plan.nodes[1].unmet_dependencies.is_empty()); } + + #[tokio::test] + async fn execute_dag_with_parallelism_runs_ready_parallel_nodes_concurrently() { + let specs = vec![ + DagNodeSpec { + id: TestNode::Root, + dependencies: vec![], + hints: hints(false), + enabled: true, + }, + DagNodeSpec { + id: TestNode::Branch, + dependencies: vec![TestNode::Root], + hints: hints(true), + enabled: true, + }, + DagNodeSpec { + id: TestNode::Leaf, + dependencies: vec![TestNode::Root], + hints: hints(true), + enabled: true, + }, + ]; + let active = Arc::new(AtomicUsize::new(0)); + let max_active = Arc::new(AtomicUsize::new(0)); + let mut applied = Vec::new(); + + let records = execute_dag_with_parallelism( + &specs, + |node| { + let active = Arc::clone(&active); + let max_active = Arc::clone(&max_active); + Ok(async move { + if node != TestNode::Root { + let current = active.fetch_add(1, Ordering::SeqCst) + 1; + let observed_max = max_active.load(Ordering::SeqCst); + if current > observed_max { + max_active.store(current, Ordering::SeqCst); + } + tokio::time::sleep(Duration::from_millis(25)).await; + active.fetch_sub(1, Ordering::SeqCst); + } + Ok(node.name().to_string()) + } + .boxed()) + }, + |_, output| { + applied.push(output); + Ok(()) + }, + ) + .await + .unwrap(); + + assert_eq!(records.len(), 3); + assert_eq!(records[0].name, "root"); + assert_eq!(records[1].name, "branch"); + assert_eq!(records[2].name, "leaf"); + assert!(max_active.load(Ordering::SeqCst) >= 2); + assert!(applied.contains(&"branch".to_string())); + assert!(applied.contains(&"leaf".to_string())); + } + + #[tokio::test] + async fn execute_dag_with_parallelism_preserves_ready_spec_order_for_disabled_nodes() { + let specs = vec![ + DagNodeSpec { + id: TestNode::Root, + dependencies: vec![], + hints: hints(false), + enabled: true, + }, + DagNodeSpec { + id: TestNode::Branch, + dependencies: vec![TestNode::Root], + hints: hints(true), + enabled: true, + }, + DagNodeSpec { + id: TestNode::Leaf, + dependencies: vec![TestNode::Root], + hints: hints(true), + enabled: false, + }, + ]; + let mut applied = Vec::new(); + + let records = execute_dag_with_parallelism( + &specs, + |node| Ok(async move { Ok(node.name().to_string()) }.boxed()), + |_, output| { + applied.push(output); + Ok(()) + }, + ) + .await + .unwrap(); + + assert_eq!( + records + .iter() + .map(|record| record.name.as_str()) + .collect::>(), + vec!["root", "branch", "leaf"] + ); + assert_eq!(applied, vec!["root", "branch"]); + } + + #[tokio::test] + async fn execute_dag_with_parallelism_skips_non_parallel_ready_nodes_when_batching() { + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + enum MixedNode { + Root, + FastA, + Slow, + FastB, + } + + impl DagNode for MixedNode { + fn name(&self) -> &'static str { + match self { + Self::Root => "root", + Self::FastA => "fast_a", + Self::Slow => "slow", + Self::FastB => "fast_b", + } + } + } + + let specs = vec![ + DagNodeSpec { + id: MixedNode::Root, + dependencies: vec![], + hints: hints(false), + enabled: true, + }, + DagNodeSpec { + id: MixedNode::FastA, + dependencies: vec![MixedNode::Root], + hints: hints(true), + enabled: true, + }, + DagNodeSpec { + id: MixedNode::Slow, + dependencies: vec![MixedNode::Root], + hints: hints(false), + enabled: true, + }, + DagNodeSpec { + id: MixedNode::FastB, + dependencies: vec![MixedNode::Root], + hints: hints(true), + enabled: true, + }, + ]; + let active = Arc::new(AtomicUsize::new(0)); + let max_active = Arc::new(AtomicUsize::new(0)); + + let records = execute_dag_with_parallelism( + &specs, + |node| { + let active = Arc::clone(&active); + let max_active = Arc::clone(&max_active); + Ok(async move { + if matches!(node, MixedNode::FastA | MixedNode::FastB) { + let current = active.fetch_add(1, Ordering::SeqCst) + 1; + let observed_max = max_active.load(Ordering::SeqCst); + if current > observed_max { + max_active.store(current, Ordering::SeqCst); + } + tokio::time::sleep(Duration::from_millis(25)).await; + active.fetch_sub(1, Ordering::SeqCst); + } + Ok(node.name().to_string()) + } + .boxed()) + }, + |_, _| Ok(()), + ) + .await + .unwrap(); + + assert_eq!( + records + .iter() + .map(|record| record.name.as_str()) + .collect::>(), + vec!["root", "fast_a", "fast_b", "slow"] + ); + assert!(max_active.load(Ordering::SeqCst) >= 2); + } } diff --git a/src/review/pipeline/orchestrate/dag.rs b/src/review/pipeline/orchestrate/dag.rs index b63b312..78ef624 100644 --- a/src/review/pipeline/orchestrate/dag.rs +++ b/src/review/pipeline/orchestrate/dag.rs @@ -101,6 +101,29 @@ pub(super) async fn execute_review_pipeline_dag( Ok(result) } +fn stage_hints(stage: ReviewPipelineStage) -> DagNodeExecutionHints { + match stage { + ReviewPipelineStage::InitializeServices + | ReviewPipelineStage::BuildSession + | ReviewPipelineStage::PrepareJobs + | ReviewPipelineStage::Postprocess => DagNodeExecutionHints { + parallelizable: false, + retryable: true, + side_effects: false, + subgraph: match stage { + ReviewPipelineStage::Postprocess => Some("review_postprocess".to_string()), + _ => None, + }, + }, + ReviewPipelineStage::ExecuteJobs => DagNodeExecutionHints { + parallelizable: true, + retryable: true, + side_effects: false, + subgraph: None, + }, + } +} + pub(in super::super) fn describe_review_pipeline_graph() -> DagGraphContract { DagGraphContract { name: "review_pipeline".to_string(), @@ -117,12 +140,7 @@ pub(in super::super) fn describe_review_pipeline_graph() -> DagGraphContract { dependencies: vec![], inputs: vec!["config".to_string(), "repo_path".to_string()], outputs: vec!["pipeline_services".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(ReviewPipelineStage::InitializeServices), enabled: true, }, DagNodeContract { @@ -132,12 +150,7 @@ pub(in super::super) fn describe_review_pipeline_graph() -> DagGraphContract { dependencies: vec!["initialize_services".to_string()], inputs: vec!["diff_content".to_string(), "pipeline_services".to_string(), "progress_callback".to_string()], outputs: vec!["review_session".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(ReviewPipelineStage::BuildSession), enabled: true, }, DagNodeContract { @@ -147,12 +160,7 @@ pub(in super::super) fn describe_review_pipeline_graph() -> DagGraphContract { dependencies: vec!["build_session".to_string()], inputs: vec!["pipeline_services".to_string(), "review_session".to_string()], outputs: vec!["prepared_review_jobs".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(ReviewPipelineStage::PrepareJobs), enabled: true, }, DagNodeContract { @@ -162,12 +170,7 @@ pub(in super::super) fn describe_review_pipeline_graph() -> DagGraphContract { dependencies: vec!["prepare_jobs".to_string()], inputs: vec!["prepared_review_jobs".to_string(), "pipeline_services".to_string(), "review_session".to_string()], outputs: vec!["execution_summary".to_string()], - hints: DagNodeExecutionHints { - parallelizable: true, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(ReviewPipelineStage::ExecuteJobs), enabled: true, }, DagNodeContract { @@ -177,12 +180,7 @@ pub(in super::super) fn describe_review_pipeline_graph() -> DagGraphContract { dependencies: vec!["execute_jobs".to_string()], inputs: vec!["execution_summary".to_string(), "pipeline_services".to_string(), "review_session".to_string()], outputs: vec!["review_result".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: Some("review_postprocess".to_string()), - }, + hints: stage_hints(ReviewPipelineStage::Postprocess), enabled: true, }, ], @@ -194,26 +192,31 @@ fn build_review_pipeline_specs() -> Vec> { DagNodeSpec { id: ReviewPipelineStage::InitializeServices, dependencies: vec![], + hints: stage_hints(ReviewPipelineStage::InitializeServices), enabled: true, }, DagNodeSpec { id: ReviewPipelineStage::BuildSession, dependencies: vec![ReviewPipelineStage::InitializeServices], + hints: stage_hints(ReviewPipelineStage::BuildSession), enabled: true, }, DagNodeSpec { id: ReviewPipelineStage::PrepareJobs, dependencies: vec![ReviewPipelineStage::BuildSession], + hints: stage_hints(ReviewPipelineStage::PrepareJobs), enabled: true, }, DagNodeSpec { id: ReviewPipelineStage::ExecuteJobs, dependencies: vec![ReviewPipelineStage::PrepareJobs], + hints: stage_hints(ReviewPipelineStage::ExecuteJobs), enabled: true, }, DagNodeSpec { id: ReviewPipelineStage::Postprocess, dependencies: vec![ReviewPipelineStage::ExecuteJobs], + hints: stage_hints(ReviewPipelineStage::Postprocess), enabled: true, }, ] diff --git a/src/review/pipeline/postprocess/dag.rs b/src/review/pipeline/postprocess/dag.rs index 0e98461..5d99a2f 100644 --- a/src/review/pipeline/postprocess/dag.rs +++ b/src/review/pipeline/postprocess/dag.rs @@ -129,6 +129,23 @@ pub(super) async fn run_postprocess_dag( Ok(result) } +fn stage_hints(stage: ReviewPostprocessStage) -> DagNodeExecutionHints { + match stage { + ReviewPostprocessStage::SaveConventionStore => DagNodeExecutionHints { + parallelizable: false, + retryable: true, + side_effects: true, + subgraph: None, + }, + _ => DagNodeExecutionHints { + parallelizable: false, + retryable: true, + side_effects: false, + subgraph: None, + }, + } +} + pub(in super::super) fn describe_review_postprocess_graph( config: &crate::config::Config, has_convention_store_path: bool, @@ -149,12 +166,7 @@ pub(in super::super) fn describe_review_postprocess_graph( .collect(), inputs: vec!["comments".to_string()], outputs: vec!["comments".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::PluginPostProcessors => DagNodeContract { @@ -169,12 +181,7 @@ pub(in super::super) fn describe_review_postprocess_graph( .collect(), inputs: vec!["comments".to_string(), "repo_path".to_string()], outputs: vec!["comments".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::Verification => DagNodeContract { @@ -197,12 +204,7 @@ pub(in super::super) fn describe_review_postprocess_graph( "verification_report".to_string(), "warnings".to_string(), ], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::SemanticFeedback => DagNodeContract { @@ -222,12 +224,7 @@ pub(in super::super) fn describe_review_postprocess_graph( "embedding_adapter".to_string(), ], outputs: vec!["comments".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::FeedbackCalibration => DagNodeContract { @@ -243,12 +240,7 @@ pub(in super::super) fn describe_review_postprocess_graph( .collect(), inputs: vec!["comments".to_string(), "feedback_store".to_string()], outputs: vec!["comments".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::ReviewFilters => DagNodeContract { @@ -267,12 +259,7 @@ pub(in super::super) fn describe_review_postprocess_graph( "feedback_store".to_string(), ], outputs: vec!["comments".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::EnhancedFilters => DagNodeContract { @@ -290,12 +277,7 @@ pub(in super::super) fn describe_review_postprocess_graph( "enhanced_review_context".to_string(), ], outputs: vec!["comments".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::ConventionSuppression => DagNodeContract { @@ -313,12 +295,7 @@ pub(in super::super) fn describe_review_postprocess_graph( "comments".to_string(), "convention_suppressed_count".to_string(), ], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::PrioritizeFindings => DagNodeContract { @@ -334,12 +311,7 @@ pub(in super::super) fn describe_review_postprocess_graph( .collect(), inputs: vec!["comments".to_string()], outputs: vec!["comments".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: false, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, ReviewPostprocessStage::SaveConventionStore => DagNodeContract { @@ -358,12 +330,7 @@ pub(in super::super) fn describe_review_postprocess_graph( "convention_store_path".to_string(), ], outputs: vec!["convention_store_saved".to_string()], - hints: DagNodeExecutionHints { - parallelizable: false, - retryable: true, - side_effects: true, - subgraph: None, - }, + hints: stage_hints(spec.id), enabled: spec.enabled, }, }) @@ -392,51 +359,61 @@ fn build_postprocess_specs( DagNodeSpec { id: ReviewPostprocessStage::SpecializedDedup, dependencies: vec![], + hints: stage_hints(ReviewPostprocessStage::SpecializedDedup), enabled: true, }, DagNodeSpec { id: ReviewPostprocessStage::PluginPostProcessors, dependencies: vec![ReviewPostprocessStage::SpecializedDedup], + hints: stage_hints(ReviewPostprocessStage::PluginPostProcessors), enabled: true, }, DagNodeSpec { id: ReviewPostprocessStage::Verification, dependencies: vec![ReviewPostprocessStage::PluginPostProcessors], + hints: stage_hints(ReviewPostprocessStage::Verification), enabled: config.verification_pass, }, DagNodeSpec { id: ReviewPostprocessStage::SemanticFeedback, dependencies: vec![ReviewPostprocessStage::Verification], + hints: stage_hints(ReviewPostprocessStage::SemanticFeedback), enabled: config.semantic_feedback, }, DagNodeSpec { id: ReviewPostprocessStage::FeedbackCalibration, dependencies: vec![ReviewPostprocessStage::SemanticFeedback], + hints: stage_hints(ReviewPostprocessStage::FeedbackCalibration), enabled: config.enhanced_feedback, }, DagNodeSpec { id: ReviewPostprocessStage::ReviewFilters, dependencies: vec![ReviewPostprocessStage::FeedbackCalibration], + hints: stage_hints(ReviewPostprocessStage::ReviewFilters), enabled: true, }, DagNodeSpec { id: ReviewPostprocessStage::EnhancedFilters, dependencies: vec![ReviewPostprocessStage::ReviewFilters], + hints: stage_hints(ReviewPostprocessStage::EnhancedFilters), enabled: true, }, DagNodeSpec { id: ReviewPostprocessStage::ConventionSuppression, dependencies: vec![ReviewPostprocessStage::EnhancedFilters], + hints: stage_hints(ReviewPostprocessStage::ConventionSuppression), enabled: true, }, DagNodeSpec { id: ReviewPostprocessStage::PrioritizeFindings, dependencies: vec![ReviewPostprocessStage::ConventionSuppression], + hints: stage_hints(ReviewPostprocessStage::PrioritizeFindings), enabled: true, }, DagNodeSpec { id: ReviewPostprocessStage::SaveConventionStore, dependencies: vec![ReviewPostprocessStage::PrioritizeFindings], + hints: stage_hints(ReviewPostprocessStage::SaveConventionStore), enabled: has_convention_store_path, }, ]