Skip to content

Bulk bid processing#892

Merged
ZanCorDX merged 3 commits intodevelopfrom
dx/bids-bulk-new
Mar 16, 2026
Merged

Bulk bid processing#892
ZanCorDX merged 3 commits intodevelopfrom
dx/bids-bulk-new

Conversation

@ZanCorDX
Copy link
Collaborator

📝 Summary

This PR changes BiddingService::observe_relay_bids to take a vector of bids to allow bulk bid processing.

💡 Motivation and Context

This should allow the bidder to process more efficiently the bids (eg: generate a single bid from us after processing the vector).

✅ I have completed the following steps:

  • Run make lint
  • Run make test
  • Added tests (if applicable)

@ZanCorDX ZanCorDX requested a review from dvush as a code owner March 12, 2026 20:06
@claude
Copy link
Contributor

claude bot commented Mar 12, 2026

Claude finished @ZanCorDX's task in 52s —— View job


  • Read diff and changed files
  • Post review feedback

Review

Clean, small PR. The interface change from single to Vec is straightforward and all call sites are updated. A few observations:

1. On-error bid loss in spawn_scraped_bids_and_blocks_subscriber

In bidding_service_server_adapter.rs (lines 241-249), when poll returns an error partway through, bids collected before the error are still forwarded via update_new_bids. This is fine and arguably better than before (where an error mid-poll would lose all bids), but worth noting that partial batches will be sent on error — make sure downstream consumers handle variable-size batches correctly.

2. BiddingServiceClientAdapter::observe_relay_bids still sends one-by-one

In bidding_service_client_adapter.rs (lines 329-333), the client adapter loops and calls self.scraped_bids_publisher.send(bid) individually for each element. This means the bulk benefit doesn't propagate through the client→server IPC boundary — the server side will still receive individual bids. If the goal is end-to-end bulk processing, this adapter would need a bulk send as well.

3. No capacity hint on Vec::new()

In bidding_service_server_adapter.rs line 241, Vec::new() is used without a capacity hint. If typical poll batches have a known or estimable size, Vec::with_capacity(...) would avoid reallocations. Minor optimization, not blocking.

Overall this is a clean incremental change. The main point to consider is item 2 — whether the per-bid send in the client adapter undermines the purpose of this PR.

@claude
Copy link
Contributor

claude bot commented Mar 13, 2026

Claude finished @ZanCorDX's task in 1m 13s —— View job


  • Read diff and changed files
  • Post review feedback

Review

The second commit (9dc8b21) addresses the main concern from the previous review — the publisher-side now batches via try_recv draining + send_many, reducing IPC notify overhead. The interface change and all call sites are consistent.

Three inline comments posted:

  1. send_many silently drops intermediate errors — only the last publish_item error is preserved. Earlier failures are overwritten. Suggested logging the error count for observability.

  2. Partial batch on poll error — when poll errors mid-stream in spawn_scraped_bids_and_blocks_subscriber, the already-collected bids are forwarded without any log of batch size vs expected, making bid loss hard to diagnose.

  3. Client adapter still sends one-by-one — noted as an optimization boundary, not blocking since the publisher-side try_recv batching compensates on the hot path.

No blocking issues found. The changes are clean and incremental.

Comment on lines +238 to +254
pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> {
let mut some_sent = false;
let mut publish_item_err = None;
for item in items {
match self.publish_item(item) {
Ok(_) => some_sent = true,
Err(err) => publish_item_err = Some(err),
}
}
if some_sent {
self.notifier.notify()?;
}
match publish_item_err {
Some(err) => Err(err),
None => Ok(()),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_many silently swallows all errors except the last one. If publish_item fails on items 1 and 3 out of 5, only the error from item 3 is returned — the earlier failure is dropped. This makes debugging bid loss harder since you won't know how many items actually failed.

Consider collecting all errors or at least counting failures:

Suggested change
pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> {
let mut some_sent = false;
let mut publish_item_err = None;
for item in items {
match self.publish_item(item) {
Ok(_) => some_sent = true,
Err(err) => publish_item_err = Some(err),
}
}
if some_sent {
self.notifier.notify()?;
}
match publish_item_err {
Some(err) => Err(err),
None => Ok(()),
}
}
pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> {
let mut some_sent = false;
let mut err_count = 0;
let mut last_err = None;
for item in items {
match self.publish_item(item) {
Ok(_) => some_sent = true,
Err(err) => {
err_count += 1;
last_err = Some(err);
}
}
}
if some_sent {
self.notifier.notify()?;
}
match last_err {
Some(err) => {
tracing::error!(err_count, "send_many: {} item(s) failed to publish", err_count);
Err(err)
}
None => Ok(()),
}
}

Comment on lines +241 to +249
let mut bids = Vec::new();
if let Err(err) = scraped_bids_subscriber.poll(|sample| {
inner.lock().update_new_bid(sample);
bids.push(sample);
}) {
error!(err=?err, "scraped_bids_subscriber poll failed.");
}
if !bids.is_empty() {
inner.lock().update_new_bids(bids);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When poll errors partway through, bids will contain only the items collected before the error. These are still forwarded to update_new_bids, which is correct behavior, but the partial batch is silently sent without any indication of truncation. Consider logging the count of bids forwarded alongside the error so operators can correlate bid loss with poll failures.

Comment on lines +329 to 333
fn observe_relay_bids(&self, bid_with_stats: Vec<ScrapedRelayBlockBidWithStats>) {
for bid in bid_with_stats {
self.scraped_bids_publisher.send(bid);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client adapter still sends bids one-by-one over the channel. The new send_many / try_recv batching on the publisher thread partially compensates for this, but if the publisher thread drains faster than bids arrive, each bid still results in a separate IPC publish + notify cycle. This is fine for now since the publisher-side batching via try_recv handles the hot path, but worth noting the optimization boundary.

@claude
Copy link
Contributor

claude bot commented Mar 13, 2026

Claude finished @ZanCorDX's task in 1m 13s —— View job


  • Read diff and changed files
  • Post review feedback

Review

The third commit (219c81e) adds the error trace improvement on top of the previous batching work. The overall change set is clean. Two inline comments posted:

  1. Unbounded try_recv drain in ScrapedBidsPublisher — The drain loop that batches bids from the mpsc channel has no cap. Under sustained high throughput this could accumulate unbounded bids before publishing, causing memory pressure and latency. Suggested capping batch size.

  2. Silent partial batch on poll error — When scraped_bids_subscriber.poll fails mid-stream, the partial batch is forwarded without logging its size, making bid loss hard to diagnose. Suggested including bids_collected count in the error log.

No blocking issues. The send_many error handling (logging count + returning last error) addresses the concern from the previous review correctly.

Comment on lines +292 to +295
let mut bids = vec![scraped_bid];
while let Ok(extra) = scraped_bids_rx.try_recv() {
bids.push(extra);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try_recv drain loop has no upper bound. Under sustained high bid throughput, this could accumulate an unbounded number of bids in memory before any are published, causing latency spikes and memory pressure. Consider capping the batch size:

Suggested change
let mut bids = vec![scraped_bid];
while let Ok(extra) = scraped_bids_rx.try_recv() {
bids.push(extra);
}
let mut bids = vec![scraped_bid];
while bids.len() < 1024 {
match scraped_bids_rx.try_recv() {
Ok(extra) => bids.push(extra),
Err(_) => break,
}
}

Comment on lines +241 to +249
let mut bids = Vec::new();
if let Err(err) = scraped_bids_subscriber.poll(|sample| {
inner.lock().update_new_bid(sample);
bids.push(sample);
}) {
error!(err=?err, "scraped_bids_subscriber poll failed.");
}
if !bids.is_empty() {
inner.lock().update_new_bids(bids);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When poll errors partway through, bids contains only the items collected before the error — these are still forwarded to update_new_bids. This is correct behavior, but without logging the batch size it's hard for operators to tell if bids were lost. Consider:

Suggested change
let mut bids = Vec::new();
if let Err(err) = scraped_bids_subscriber.poll(|sample| {
inner.lock().update_new_bid(sample);
bids.push(sample);
}) {
error!(err=?err, "scraped_bids_subscriber poll failed.");
}
if !bids.is_empty() {
inner.lock().update_new_bids(bids);
}
let mut bids = Vec::new();
if let Err(err) = scraped_bids_subscriber.poll(|sample| {
bids.push(sample);
}) {
error!(err=?err, bids_collected = bids.len(), "scraped_bids_subscriber poll failed.");
}
if !bids.is_empty() {
inner.lock().update_new_bids(bids);
}

@ZanCorDX ZanCorDX merged commit e4f61de into develop Mar 16, 2026
20 checks passed
@ZanCorDX ZanCorDX deleted the dx/bids-bulk-new branch March 16, 2026 15:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants