perf: stream DA SSE updates as each RPC call completes#39
perf: stream DA SSE updates as each RPC call completes#39
Conversation
Previously, notify_da_updates was called once after the entire batch of 100 blocks had been processed, so SSE clients would wait for the slowest RPC in the batch. Now each result is broadcast immediately via for_each as soon as its ev-node call returns.
📝 WalkthroughWalkthroughThe Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/crates/atlas-server/src/indexer/da_worker.rs (1)
140-190:⚠️ Potential issue | 🟡 MinorAdd a same-file regression test for the progressive path.
The current
#[cfg(test)]block still only exercisesDaWorker::notify_da_updateswith prebuilt batches. There is no test that Lines 181-185 incrementtotal_updatedonly forSome(_)and emit a notification per completed result, so this can regress back to end-of-batch flushing unnoticed. IfDaWorker::process_blocksis too hard to unit-test directly, extract theSome(update)branch into a tiny helper and cover that instead.As per coding guidelines,
backend/crates/**/*.rs: Add unit tests for new logic in a `#[cfg(test)] mod tests block in the same file; run with cargo test --workspace.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-server/src/indexer/da_worker.rs` around lines 140 - 190, Add a same-file unit test that exercises the progressive path where each Some(DaSseUpdate) increments the counter and triggers notify_da_updates instead of only testing batch flush; if DaWorker::process_blocks (the function containing the stream, total_updated, and self.notify_da_updates) is hard to call in a unit test, extract the Some(update) handling into a small helper method (e.g. DaWorker::handle_da_result or apply_da_update) that increments the counter and calls notify_da_updates, then add a #[cfg(test)] mod tests in this file to unit-test that helper by passing simulated Some(update) and None values and asserting total_updated increments only for Some and that notify_da_updates is called once per Some. Ensure the test lives in the same file and runs under cargo test --workspace.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-server/src/indexer/da_worker.rs`:
- Around line 181-185: Add a unit test in the #[cfg(test)] mod tests that
exercises process_blocks (the loop around processed > 0) and notify_da_updates:
instantiate a DaWorker with a bounded broadcast channel matching da_events_tx
capacity (256), simulate catch-up by sending many completed updates (respecting
BATCH_SIZE = 100) and assert the broadcast sender does not panic or drop beyond
its 256-message capacity; specifically verify concurrent completions from
process_blocks result in at most 256 outstanding notifications. If the test
reproduces the pressure, either increase the da_events_tx capacity or change
notify_da_updates to coalesce multiple updates into a single broadcast (e.g.,
batch several updates before calling notify_da_updates) so that the backlog
window remains comparable to the previous batch-based behavior; focus edits on
the notify_da_updates call-site and the run/process_blocks loop to implement
batching or adjust channel creation for da_events_tx.
---
Outside diff comments:
In `@backend/crates/atlas-server/src/indexer/da_worker.rs`:
- Around line 140-190: Add a same-file unit test that exercises the progressive
path where each Some(DaSseUpdate) increments the counter and triggers
notify_da_updates instead of only testing batch flush; if
DaWorker::process_blocks (the function containing the stream, total_updated, and
self.notify_da_updates) is hard to call in a unit test, extract the Some(update)
handling into a small helper method (e.g. DaWorker::handle_da_result or
apply_da_update) that increments the counter and calls notify_da_updates, then
add a #[cfg(test)] mod tests in this file to unit-test that helper by passing
simulated Some(update) and None values and asserting total_updated increments
only for Some and that notify_da_updates is called once per Some. Ensure the
test lives in the same file and runs under cargo test --workspace.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 12cb28bc-ea5b-472f-94b4-5ac99bdc6d7f
📒 Files selected for processing (1)
backend/crates/atlas-server/src/indexer/da_worker.rs
Summary
.collect().await+ end-of-batchnotify_da_updates→.for_eachthat callsnotify_da_updatesper resultTest plan
cargo test --workspace)cargo clippy --workspace -- -D warnings)Summary by CodeRabbit