Skip to content

perf: stream DA SSE updates as each RPC call completes#39

Open
pthmas wants to merge 1 commit intomainfrom
pthmas/faster-da-loop
Open

perf: stream DA SSE updates as each RPC call completes#39
pthmas wants to merge 1 commit intomainfrom
pthmas/faster-da-loop

Conversation

@pthmas
Copy link
Collaborator

@pthmas pthmas commented Mar 20, 2026

Summary

  • DA SSE updates are now broadcast immediately as each ev-node RPC call returns, rather than waiting for the entire batch to finish
  • Previously, SSE clients waited for the slowest of 100 concurrent RPC calls before receiving any update; now the first result appears as soon as the fastest call in the batch completes
  • Change is a one-line swap: .collect().await + end-of-batch notify_da_updates.for_each that calls notify_da_updates per result

Test plan

  • All existing unit and integration tests pass (cargo test --workspace)
  • Clippy clean (cargo clippy --workspace -- -D warnings)
  • Verify in staging that DA status columns on the blocks page populate progressively rather than all at once

Summary by CodeRabbit

  • Performance
    • Optimized data availability worker to process block updates more efficiently with improved notification handling.

Previously, notify_da_updates was called once after the entire batch of
100 blocks had been processed, so SSE clients would wait for the slowest
RPC in the batch. Now each result is broadcast immediately via for_each
as soon as its ev-node call returns.
@coderabbitai
Copy link

coderabbitai bot commented Mar 20, 2026

📝 Walkthrough

Walkthrough

The process_blocks function in the DA worker is refactored to shift DA update notifications from end-of-batch accumulation to immediate per-block notification, eliminating intermediate buffering and using a running counter for tracking.

Changes

Cohort / File(s) Summary
DA Worker Notification Flow
backend/crates/atlas-server/src/indexer/da_worker.rs
Modified process_blocks to change notification timing from batch-end to per-updated-block; removed intermediate Vec<Option<DaSseUpdate>> accumulation and instead iterate with for_each, immediately calling notify_da_updates for each successful update while tracking a running total_updated counter.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • tac0turtle

Poem

🐰 Hop, hop—one block at a time now!
No waiting for the batch, we notify the now,
Stream and counter dance in sync,
Per-update whispers in a blink!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and accurately describes the main performance improvement: streaming DA SSE updates progressively as each RPC call completes instead of batching them.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pthmas/faster-da-loop
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pthmas pthmas marked this pull request as ready for review March 20, 2026 11:09
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/crates/atlas-server/src/indexer/da_worker.rs (1)

140-190: ⚠️ Potential issue | 🟡 Minor

Add a same-file regression test for the progressive path.

The current #[cfg(test)] block still only exercises DaWorker::notify_da_updates with prebuilt batches. There is no test that Lines 181-185 increment total_updated only for Some(_) and emit a notification per completed result, so this can regress back to end-of-batch flushing unnoticed. If DaWorker::process_blocks is too hard to unit-test directly, extract the Some(update) branch into a tiny helper and cover that instead.

As per coding guidelines, backend/crates/**/*.rs: Add unit tests for new logic in a `#[cfg(test)] mod tests block in the same file; run with cargo test --workspace.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-server/src/indexer/da_worker.rs` around lines 140 - 190,
Add a same-file unit test that exercises the progressive path where each
Some(DaSseUpdate) increments the counter and triggers notify_da_updates instead
of only testing batch flush; if DaWorker::process_blocks (the function
containing the stream, total_updated, and self.notify_da_updates) is hard to
call in a unit test, extract the Some(update) handling into a small helper
method (e.g. DaWorker::handle_da_result or apply_da_update) that increments the
counter and calls notify_da_updates, then add a #[cfg(test)] mod tests in this
file to unit-test that helper by passing simulated Some(update) and None values
and asserting total_updated increments only for Some and that notify_da_updates
is called once per Some. Ensure the test lives in the same file and runs under
cargo test --workspace.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-server/src/indexer/da_worker.rs`:
- Around line 181-185: Add a unit test in the #[cfg(test)] mod tests that
exercises process_blocks (the loop around processed > 0) and notify_da_updates:
instantiate a DaWorker with a bounded broadcast channel matching da_events_tx
capacity (256), simulate catch-up by sending many completed updates (respecting
BATCH_SIZE = 100) and assert the broadcast sender does not panic or drop beyond
its 256-message capacity; specifically verify concurrent completions from
process_blocks result in at most 256 outstanding notifications. If the test
reproduces the pressure, either increase the da_events_tx capacity or change
notify_da_updates to coalesce multiple updates into a single broadcast (e.g.,
batch several updates before calling notify_da_updates) so that the backlog
window remains comparable to the previous batch-based behavior; focus edits on
the notify_da_updates call-site and the run/process_blocks loop to implement
batching or adjust channel creation for da_events_tx.

---

Outside diff comments:
In `@backend/crates/atlas-server/src/indexer/da_worker.rs`:
- Around line 140-190: Add a same-file unit test that exercises the progressive
path where each Some(DaSseUpdate) increments the counter and triggers
notify_da_updates instead of only testing batch flush; if
DaWorker::process_blocks (the function containing the stream, total_updated, and
self.notify_da_updates) is hard to call in a unit test, extract the Some(update)
handling into a small helper method (e.g. DaWorker::handle_da_result or
apply_da_update) that increments the counter and calls notify_da_updates, then
add a #[cfg(test)] mod tests in this file to unit-test that helper by passing
simulated Some(update) and None values and asserting total_updated increments
only for Some and that notify_da_updates is called once per Some. Ensure the
test lives in the same file and runs under cargo test --workspace.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 12cb28bc-ea5b-472f-94b4-5ac99bdc6d7f

📥 Commits

Reviewing files that changed from the base of the PR and between ba5b44e and 5677b01.

📒 Files selected for processing (1)
  • backend/crates/atlas-server/src/indexer/da_worker.rs

@pthmas pthmas self-assigned this Mar 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant