Conversation
|
Claude finished @ZanCorDX's task in 52s —— View job
ReviewClean, small PR. The interface change from single to 1. On-error bid loss in In 2. In 3. No capacity hint on In Overall this is a clean incremental change. The main point to consider is item 2 — whether the per-bid send in the client adapter undermines the purpose of this PR. |
|
Claude finished @ZanCorDX's task in 1m 13s —— View job
ReviewThe second commit ( Three inline comments posted:
No blocking issues found. The changes are clean and incremental. |
| pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> { | ||
| let mut some_sent = false; | ||
| let mut publish_item_err = None; | ||
| for item in items { | ||
| match self.publish_item(item) { | ||
| Ok(_) => some_sent = true, | ||
| Err(err) => publish_item_err = Some(err), | ||
| } | ||
| } | ||
| if some_sent { | ||
| self.notifier.notify()?; | ||
| } | ||
| match publish_item_err { | ||
| Some(err) => Err(err), | ||
| None => Ok(()), | ||
| } | ||
| } |
There was a problem hiding this comment.
send_many silently swallows all errors except the last one. If publish_item fails on items 1 and 3 out of 5, only the error from item 3 is returned — the earlier failure is dropped. This makes debugging bid loss harder since you won't know how many items actually failed.
Consider collecting all errors or at least counting failures:
| pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> { | |
| let mut some_sent = false; | |
| let mut publish_item_err = None; | |
| for item in items { | |
| match self.publish_item(item) { | |
| Ok(_) => some_sent = true, | |
| Err(err) => publish_item_err = Some(err), | |
| } | |
| } | |
| if some_sent { | |
| self.notifier.notify()?; | |
| } | |
| match publish_item_err { | |
| Some(err) => Err(err), | |
| None => Ok(()), | |
| } | |
| } | |
| pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> { | |
| let mut some_sent = false; | |
| let mut err_count = 0; | |
| let mut last_err = None; | |
| for item in items { | |
| match self.publish_item(item) { | |
| Ok(_) => some_sent = true, | |
| Err(err) => { | |
| err_count += 1; | |
| last_err = Some(err); | |
| } | |
| } | |
| } | |
| if some_sent { | |
| self.notifier.notify()?; | |
| } | |
| match last_err { | |
| Some(err) => { | |
| tracing::error!(err_count, "send_many: {} item(s) failed to publish", err_count); | |
| Err(err) | |
| } | |
| None => Ok(()), | |
| } | |
| } |
| let mut bids = Vec::new(); | ||
| if let Err(err) = scraped_bids_subscriber.poll(|sample| { | ||
| inner.lock().update_new_bid(sample); | ||
| bids.push(sample); | ||
| }) { | ||
| error!(err=?err, "scraped_bids_subscriber poll failed."); | ||
| } | ||
| if !bids.is_empty() { | ||
| inner.lock().update_new_bids(bids); | ||
| } |
There was a problem hiding this comment.
When poll errors partway through, bids will contain only the items collected before the error. These are still forwarded to update_new_bids, which is correct behavior, but the partial batch is silently sent without any indication of truncation. Consider logging the count of bids forwarded alongside the error so operators can correlate bid loss with poll failures.
| fn observe_relay_bids(&self, bid_with_stats: Vec<ScrapedRelayBlockBidWithStats>) { | ||
| for bid in bid_with_stats { | ||
| self.scraped_bids_publisher.send(bid); | ||
| } | ||
| } |
There was a problem hiding this comment.
The client adapter still sends bids one-by-one over the channel. The new send_many / try_recv batching on the publisher thread partially compensates for this, but if the publisher thread drains faster than bids arrive, each bid still results in a separate IPC publish + notify cycle. This is fine for now since the publisher-side batching via try_recv handles the hot path, but worth noting the optimization boundary.
|
Claude finished @ZanCorDX's task in 1m 13s —— View job
ReviewThe third commit (
No blocking issues. The |
| let mut bids = vec![scraped_bid]; | ||
| while let Ok(extra) = scraped_bids_rx.try_recv() { | ||
| bids.push(extra); | ||
| } |
There was a problem hiding this comment.
The try_recv drain loop has no upper bound. Under sustained high bid throughput, this could accumulate an unbounded number of bids in memory before any are published, causing latency spikes and memory pressure. Consider capping the batch size:
| let mut bids = vec![scraped_bid]; | |
| while let Ok(extra) = scraped_bids_rx.try_recv() { | |
| bids.push(extra); | |
| } | |
| let mut bids = vec![scraped_bid]; | |
| while bids.len() < 1024 { | |
| match scraped_bids_rx.try_recv() { | |
| Ok(extra) => bids.push(extra), | |
| Err(_) => break, | |
| } | |
| } |
| let mut bids = Vec::new(); | ||
| if let Err(err) = scraped_bids_subscriber.poll(|sample| { | ||
| inner.lock().update_new_bid(sample); | ||
| bids.push(sample); | ||
| }) { | ||
| error!(err=?err, "scraped_bids_subscriber poll failed."); | ||
| } | ||
| if !bids.is_empty() { | ||
| inner.lock().update_new_bids(bids); | ||
| } |
There was a problem hiding this comment.
When poll errors partway through, bids contains only the items collected before the error — these are still forwarded to update_new_bids. This is correct behavior, but without logging the batch size it's hard for operators to tell if bids were lost. Consider:
| let mut bids = Vec::new(); | |
| if let Err(err) = scraped_bids_subscriber.poll(|sample| { | |
| inner.lock().update_new_bid(sample); | |
| bids.push(sample); | |
| }) { | |
| error!(err=?err, "scraped_bids_subscriber poll failed."); | |
| } | |
| if !bids.is_empty() { | |
| inner.lock().update_new_bids(bids); | |
| } | |
| let mut bids = Vec::new(); | |
| if let Err(err) = scraped_bids_subscriber.poll(|sample| { | |
| bids.push(sample); | |
| }) { | |
| error!(err=?err, bids_collected = bids.len(), "scraped_bids_subscriber poll failed."); | |
| } | |
| if !bids.is_empty() { | |
| inner.lock().update_new_bids(bids); | |
| } |
📝 Summary
This PR changes BiddingService::observe_relay_bids to take a vector of bids to allow bulk bid processing.
💡 Motivation and Context
This should allow the bidder to process more efficiently the bids (eg: generate a single bid from us after processing the vector).
✅ I have completed the following steps:
make lintmake test