From 5677b016ce5c8e51b56b228cc4e0703e59e139d1 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:05:26 +0100 Subject: [PATCH] perf: stream DA SSE updates as each RPC call completes Previously, notify_da_updates was called once after the entire batch of 100 blocks had been processed, so SSE clients would wait for the slowest RPC in the batch. Now each result is broadcast immediately via for_each as soon as its ev-node call returns. --- .../atlas-server/src/indexer/da_worker.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/backend/crates/atlas-server/src/indexer/da_worker.rs b/backend/crates/atlas-server/src/indexer/da_worker.rs index 97c40cf..c71276c 100644 --- a/backend/crates/atlas-server/src/indexer/da_worker.rs +++ b/backend/crates/atlas-server/src/indexer/da_worker.rs @@ -137,7 +137,9 @@ impl DaWorker { let client = &self.client; let rate_limiter = &self.rate_limiter; - let results: Vec> = stream::iter(blocks) + let mut total_updated = 0usize; + + stream::iter(blocks) .map(|(block_number,)| async move { rate_limiter.until_ready().await; match client.get_da_status(block_number as u64).await { @@ -176,13 +178,16 @@ impl DaWorker { } }) .buffer_unordered(self.concurrency) - .collect() + .for_each(|result| { + if let Some(update) = result { + total_updated += 1; + self.notify_da_updates(std::slice::from_ref(&update)); + } + std::future::ready(()) + }) .await; - let updates: Vec = results.into_iter().flatten().collect(); - self.notify_da_updates(&updates); - - Ok(updates.len()) + Ok(total_updated) } }