Skip to content

OPT: Skip upgrade_record match when upgrade not needed#123

Open
wtn wants to merge 1 commit intodatabento:devfrom
wtn:upgrade
Open

OPT: Skip upgrade_record match when upgrade not needed#123
wtn wants to merge 1 commit intodatabento:devfrom
wtn:upgrade

Conversation

@wtn
Copy link
Copy Markdown
Contributor

@wtn wtn commented Apr 16, 2026

While profiling a local replay scenario, I saw DbnFsm::upgrade_record taking a meaningful share of decode CPU on current-version data decoded with VersionUpgradePolicy::AsIs, a workload where the function has nothing to do. The per-record match over (version, policy, rtype) runs and falls through every arm.

The proposed change caches whether the (policy, version) combo can ever trigger an upgrade. When it can't, process() skips the dispatch and transitions straight to Consume.

Implementation, benchmark, and reproducer script

Implementation

upgrade_record_with_version is the match in question. Its arms only fire on old-version data under a non-AsIs policy (v1→v2 InstrumentDefMsg, v2→v3 StatMsg, etc.); for current-version data or AsIs, every arm is skipped and the function returns the record unchanged. The work is wasted on the common path.

Approach:

  1. Add a cached needs_upgrade: bool on DbnFsm.
  2. Refresh it at every site that writes upgrade_policy or input_dbn_version: new, Default, the builder's build, set_input_dbn_version, set_upgrade_policy, decode_prelude (metadata path), reset.
  3. In process() and process_multiple(), when !needs_upgrade, skip the upgrade_record call and the compat-buffer bookkeeping, transitioning to Consume with a RecordRef built directly from the main buffer.

No public API change; upgrade_record stays in place for the slow path.

Benchmark

MacOS sample(1) on a 395 MiB MBO replay (5 s): upgrade_record took ~14% of decode-loop CPU on upstream main. After PR #122 lands, that same absolute time is ~19% of the smaller total. Expected throughput gain on AsIs workloads of 15-25%.

Synthetic decode_ref, peak throughput over N=2M records, 30 iters, arm64-darwin25:

schema record size before after Δ
trade 48 B 5.15 GB/s 6.34 GB/s +23%
mbo 56 B 5.78 GB/s 7.27 GB/s +26%
mbp1 80 B 7.33 GB/s 8.94 GB/s +22%
mbp10 368 B 19.67 GB/s 21.71 GB/s +10%
ohlcv1s 56 B 5.73 GB/s 7.22 GB/s +26%

Reproducer

Copy the script below into rust/dbn/examples/decode_bench.rs, then:

cargo run --release --example decode_bench -p dbn

Defaults: N=1_000_000 records × 5 schemas, 20 timed iters, 3 warmup. Override via env vars (N, ITERS, WARMUP).

For a before/after comparison: run on main for a baseline, git switch to this branch, re-run.

//! Synthetic `RecordDecoder::decode_ref` throughput benchmark.
//!
//! Encodes `N` default records of several schemas into an in-memory buffer,
//! then runs a timed decode loop over each. Reports per-iteration min and
//! median time, peak and median throughput in GB/s, and records/sec.
//!
//! `N`, `ITERS`, and `WARMUP` are read from the environment (defaults:
//! 1_000_000 / 20 / 3). Records are default-constructed; only record size
//! affects decode throughput.
//!
//! ```sh
//! cargo run --release --example decode_bench -p dbn
//! ```

use std::env;
use std::hint::black_box;
use std::time::Instant;

use dbn::decode::{DbnRecordDecoder, DecodeRecordRef};
use dbn::encode::{DbnEncodable, DbnRecordEncoder, EncodeRecord};
use dbn::{MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, Schema, TradeMsg};

fn env_usize(key: &str, default: usize) -> usize {
    env::var(key)
        .ok()
        .and_then(|v| v.replace('_', "").parse().ok())
        .unwrap_or(default)
}

fn encode_repeated<R: DbnEncodable>(n: usize, rec: &R) -> Vec<u8> {
    let mut buf = Vec::with_capacity(n * std::mem::size_of::<R>());
    let mut enc = DbnRecordEncoder::new(&mut buf);
    for _ in 0..n {
        enc.encode_record(rec).unwrap();
    }
    buf
}

fn bench(name: &str, record_size: usize, bytes: &[u8], iters: usize, warmup: usize) {
    let mut timings = Vec::with_capacity(iters);
    let mut records_per_iter: u64 = 0;
    for i in 0..(iters + warmup) {
        let mut decoder = DbnRecordDecoder::new(bytes);
        let mut count: u64 = 0;
        let t0 = Instant::now();
        while let Ok(Some(rec)) = decoder.decode_record_ref() {
            black_box(&rec);
            count += 1;
        }
        let elapsed = t0.elapsed();
        black_box(count);
        if i >= warmup {
            timings.push(elapsed.as_secs_f64());
            records_per_iter = count;
        }
    }
    timings.sort_by(|a, b| a.partial_cmp(b).unwrap());
    let min = timings[0];
    let med = timings[timings.len() / 2];
    let bytes_per_iter = bytes.len();
    let gbps_med = (bytes_per_iter as f64) / med / 1e9;
    let gbps_min = (bytes_per_iter as f64) / min / 1e9;
    let recs_per_s = (records_per_iter as f64) / med;
    println!(
        "{:<8} rec={:>4}B  count={:>10}  bytes={:>10}  min={:>8.4}ms  med={:>8.4}ms  {:>6.2} GB/s (peak {:>6.2})  {:>10.2} Mrec/s",
        name,
        record_size,
        records_per_iter,
        bytes_per_iter,
        min * 1e3,
        med * 1e3,
        gbps_med,
        gbps_min,
        recs_per_s / 1e6,
    );
}

fn main() {
    let n = env_usize("N", 1_000_000);
    let iters = env_usize("ITERS", 20);
    let warmup = env_usize("WARMUP", 3);

    println!(
        "DBN decode bench (synthetic) — N={n}, iters={iters}, warmup={warmup} — dbn v{}",
        env!("CARGO_PKG_VERSION"),
    );
    println!("records default-constructed; only layout size affects decoder throughput");
    println!();

    let trade = TradeMsg::default();
    let trade_bytes = encode_repeated(n, &trade);
    bench("trade", std::mem::size_of::<TradeMsg>(), &trade_bytes, iters, warmup);

    let mbo = MboMsg::default();
    let mbo_bytes = encode_repeated(n, &mbo);
    bench("mbo", std::mem::size_of::<MboMsg>(), &mbo_bytes, iters, warmup);

    let mbp1 = Mbp1Msg::default();
    let mbp1_bytes = encode_repeated(n, &mbp1);
    bench("mbp1", std::mem::size_of::<Mbp1Msg>(), &mbp1_bytes, iters, warmup);

    let mbp10 = Mbp10Msg::default();
    let mbp10_bytes = encode_repeated(n, &mbp10);
    bench("mbp10", std::mem::size_of::<Mbp10Msg>(), &mbp10_bytes, iters, warmup);

    let ohlcv = OhlcvMsg::default_for_schema(Schema::Ohlcv1S);
    let ohlcv_bytes = encode_repeated(n, &ohlcv);
    bench("ohlcv1s", std::mem::size_of::<OhlcvMsg>(), &ohlcv_bytes, iters, warmup);
}

Type of change

  • Performance optimization

How has this change been tested?

  • test_compute_needs_upgrade: parameterized rstest (11 cases) covering each policy and version combination (including unknown version).
  • test_needs_upgrade_refreshed_on_setters: walks each mutation site (set_input_dbn_version, set_upgrade_policy, reset) and asserts the cache stays consistent.
  • Existing test_dbn_identity rstest matrix covers both branches: (v3, AsIs)/(v3, UpgradeToV3)/(v2, UpgradeToV2) hit the fast path; (v1, UpgradeToV2)/(v1, UpgradeToV3)/(v2, UpgradeToV3) hit the slow path.
  • cargo test -p dbn --features async --lib passes

Checklist

  • My code builds locally with no new warnings (scripts/build.sh)
  • My code follows the style guidelines (scripts/lint.sh and scripts/format.sh)
  • New and existing unit tests pass locally with my changes (scripts/test.sh)
  • I have made corresponding changes to the documentation (not applicable; internal)
  • I have added tests that prove my fix is effective

Declaration

I confirm this contribution is made under an Apache 2.0 license and that I have the authority necessary to make this contribution on behalf of its copyright owner.

Copy link
Copy Markdown
Contributor

@threecgreen threecgreen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice change. I'm also seeing 6-15% improvement in throughput for uncompressed (lowest with mbp-10) and 2-9% improvement for zstd-compressed.

You should update the CHANGELOG to mention this optimization, and please "credit: " yourself, as in previous PRs.

Comment thread rust/dbn/src/decode/dbn/fsm.rs
Comment thread rust/dbn/src/decode/dbn/fsm.rs Outdated
Comment thread rust/dbn/src/decode/dbn/fsm.rs Outdated
@wtn wtn marked this pull request as ready for review April 17, 2026 01:16
Comment thread rust/dbn/src/decode/dbn/fsm.rs Outdated
Comment thread rust/dbn/src/decode/dbn/fsm.rs Outdated
Comment thread CHANGELOG.md Outdated
Comment thread CHANGELOG.md Outdated
@wtn wtn changed the base branch from main to dev April 17, 2026 15:39
@wtn wtn requested a review from threecgreen April 17, 2026 15:57
@threecgreen
Copy link
Copy Markdown
Contributor

LGTM. I will commit this change and it'll be mirrored to GitHub

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants