Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 237 additions & 23 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"bin/ev-dev",
"bin/ev-reth",
"crates/common",
"crates/exex-remote",
"crates/ev-primitives",
"crates/evolve",
"crates/node",
Expand Down Expand Up @@ -68,9 +69,11 @@ reth-rpc-engine-api = { git = "https://github.com/paradigmxyz/reth.git", tag = "
reth-rpc = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3" }
reth-rpc-convert = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3" }
reth-codecs = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3" }
reth-exex = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3", features = ["serde"] }

ev-revm = { path = "crates/ev-revm" }
ev-primitives = { path = "crates/ev-primitives" }
ev-exex-remote = { path = "crates/exex-remote" }


# Consensus dependencies
Expand Down Expand Up @@ -150,6 +153,11 @@ rand = "0.10"
tempfile = "3.10"
hex = "0.4"
url = "2.5"
bincode = "1.3.3"
prost = "0.13"
tokio-stream = "0.1"
tonic = { version = "0.12", features = ["transport"] }
tonic-build = "0.12"

[workspace.lints]
rust.missing_debug_implementations = "warn"
Expand Down
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,59 @@ curl -X POST http://localhost:8545 \
}
```

### Remote ExEx Streaming

ev-reth now includes an opt-in remote ExEx stream for Atlas-style consumers. The goal is to push
canonical execution notifications instead of forcing Atlas to poll `eth_getBlockByNumber` and
`eth_getBlockReceipts` for every block.

Enable it with:

```bash
./target/release/ev-reth node \
--remote-exex-grpc-listen-addr 127.0.0.1:10000 \
--remote-exex-buffer 64
```

The stream carries full blocks, receipts, logs, and EvNode sponsor metadata, so the consumer
should raise gRPC limits aggressively:

```rust
let mut client = RemoteExExClient::connect("http://127.0.0.1:10000")
.await?
.max_encoding_message_size(usize::MAX)
.max_decoding_message_size(usize::MAX);
```

The stream intentionally uses ev-reth-owned wire types instead of exposing raw internal Reth or
EV structs directly:

- The gRPC boundary is a contract for consumers such as Atlas, not an in-process Rust API.
- Reusing `Chain<EvPrimitives>`, `EvTxEnvelope`, or raw receipt types directly would couple
Atlas to ev-reth's internal crate graph, serde layout, and exact Reth version.
- The remote types are narrower and Atlas-oriented: they preserve commit/reorg/revert semantics
and include fields Atlas needs directly, such as paired receipts/logs, recovered `feePayer`,
and batch-call metadata.
- The envelope keeps protobuf small and versioned while still carrying `raw_2718` bytes, so
consumers can fall back to full transaction decoding when needed.

In short, the dedicated wire schema exists to make the push stream explicit and evolvable, while
leaving ev-reth free to change internal execution types without silently breaking Atlas.

Operationally, the stream should be treated as best-effort:

- The node should emit `FinishedHeight` after enqueueing the notification, not after client ack.
- Slow subscribers should be bounded by a finite buffer and dropped instead of blocking block production.
- Atlas should keep its existing polling path as a fallback during rollout.

Atlas-oriented notes:

- The pushed notification should preserve `EvTxEnvelope` transactions and recovered `feePayer` metadata.
- Reorg and revert notifications must remain explicit, so Atlas does not assume append-only delivery.
- DA enrichment stays out of scope for the first version.
- See `bin/ev-reth/examples/block_logger.rs` for a minimal in-process ExEx example.
- See `bin/ev-reth/examples/remote_consumer.rs` for a minimal gRPC subscriber example.

## Architecture

### Modular Design
Expand Down
4 changes: 4 additions & 0 deletions bin/ev-reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ ev-node = { path = "../../crates/node" }
ev-common = { path = "../../crates/common" }
ev-revm.workspace = true
evolve-ev-reth = { path = "../../crates/evolve" }
ev-exex-remote.workspace = true
ev-primitives.workspace = true

ev-precompiles = { path = "../../crates/ev-precompiles" }

Expand All @@ -40,6 +42,7 @@ reth-trie-db.workspace = true
reth-consensus.workspace = true
reth-ethereum-primitives.workspace = true
reth-ethereum-forks.workspace = true
reth-exex.workspace = true

# Alloy dependencies
alloy-network.workspace = true
Expand All @@ -57,6 +60,7 @@ clap = { workspace = true, features = ["derive", "env"] }
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
url.workspace = true
futures.workspace = true

# Reth OTLP tracing
reth-tracing-otlp.workspace = true
Expand Down
57 changes: 57 additions & 0 deletions bin/ev-reth/examples/block_logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! Minimal in-process `ExEx` example for ev-reth.
//!
//! This mirrors the Reth `ExEx` pattern but keeps the handler local and small.

use clap::Parser;
use ev_node::{EvolveArgs, EvolveChainSpecParser, EvolveNode};
use ev_primitives::EvPrimitives;
use futures::TryStreamExt;
use reth_ethereum::node::api::{FullNodeComponents, NodeTypes};
use reth_ethereum_cli::Cli;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use tracing::info;

async fn block_logger<Node>(mut ctx: ExExContext<Node>) -> eyre::Result<()>
where
Node: FullNodeComponents<Types: NodeTypes<Primitives = EvPrimitives>>,
{
while let Some(notification) = ctx.notifications.try_next().await? {
match notification {
ExExNotification::ChainCommitted { new } => {
info!(
committed_range = ?new.range(),
committed_tip = ?new.tip().num_hash(),
"received committed chain"
);
ctx.events
.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?;
}
ExExNotification::ChainReorged { old, new } => {
info!(
from_range = ?old.range(),
to_range = ?new.range(),
"received reorg"
);
ctx.events
.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?;
}
ExExNotification::ChainReverted { old } => {
info!(reverted_range = ?old.range(), "received revert");
}
}
}

Ok(())
}

fn main() -> eyre::Result<()> {
Cli::<EvolveChainSpecParser, EvolveArgs>::parse().run(|builder, _evolve_args| async move {
let handle = builder
.node(EvolveNode::new())
.install_exex("block-logger", |ctx| async move { Ok(block_logger(ctx)) })
.launch()
.await?;

handle.node_exit_future.await
})
}
90 changes: 90 additions & 0 deletions bin/ev-reth/examples/remote_consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Minimal remote `ExEx` consumer example for ev-reth.
use ev_exex_remote::{
decode_notification_envelope, wire::RemoteNotificationV1, NotificationEnvelope,
RemoteExExClient, SubscribeRequest,
};
use tracing::info;

fn summarize(notification: &RemoteNotificationV1) -> (&'static str, usize, usize, usize) {
match notification {
RemoteNotificationV1::ChainCommitted { blocks, .. } => {
let tx_count = blocks.iter().map(|block| block.transactions.len()).sum();
let sponsor_count = blocks
.iter()
.flat_map(|block| &block.transactions)
.filter(|tx| tx.fee_payer.is_some())
.count();
("commit", blocks.len(), tx_count, sponsor_count)
}
RemoteNotificationV1::ChainReorged {
reverted_blocks,
committed_blocks,
..
} => (
"reorg",
reverted_blocks.len() + committed_blocks.len(),
reverted_blocks
.iter()
.map(|block| block.transactions.len())
.sum::<usize>()
+ committed_blocks
.iter()
.map(|block| block.transactions.len())
.sum::<usize>(),
reverted_blocks
.iter()
.flat_map(|block| &block.transactions)
.filter(|tx| tx.fee_payer.is_some())
.count()
+ committed_blocks
.iter()
.flat_map(|block| &block.transactions)
.filter(|tx| tx.fee_payer.is_some())
.count(),
),
RemoteNotificationV1::ChainReverted {
reverted_blocks, ..
} => {
let tx_count = reverted_blocks
.iter()
.map(|block| block.transactions.len())
.sum();
let sponsor_count = reverted_blocks
.iter()
.flat_map(|block| &block.transactions)
.filter(|tx| tx.fee_payer.is_some())
.count();
("revert", reverted_blocks.len(), tx_count, sponsor_count)
}
}
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let endpoint = std::env::var("REMOTE_EXEX_ENDPOINT")
.unwrap_or_else(|_| "http://127.0.0.1:10000".to_string());

let mut client = RemoteExExClient::connect(endpoint)
.await?
.max_encoding_message_size(usize::MAX)
.max_decoding_message_size(usize::MAX);

let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner();
while let Some(message) = stream.message().await? {
let message: NotificationEnvelope = message;
let notification = decode_notification_envelope(&message)?;
let (kind, block_count, tx_count, sponsor_count) = summarize(&notification);

info!(
schema_version = message.schema_version,
encoding = ?message.encoding,
kind,
block_count,
tx_count,
sponsor_count,
"received remote ExEx notification"
);
}

Ok(())
}
24 changes: 22 additions & 2 deletions bin/ev-reth/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
use url::Url;

use ev_node::{log_startup, EvolveArgs, EvolveChainSpecParser, EvolveNode};
use ev_node::{
log_startup, remote_exex_task, spawn_remote_exex_grpc_server, EvolveArgs,
EvolveChainSpecParser, EvolveNode, RemoteExExConfig, REMOTE_EXEX_ID,
};

#[global_allocator]
static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
Expand Down Expand Up @@ -96,8 +99,15 @@ fn main() {
init_tracing();

if let Err(err) =
Cli::<EvolveChainSpecParser, EvolveArgs>::parse().run(|builder, _evolve_args| async move {
Cli::<EvolveChainSpecParser, EvolveArgs>::parse().run(|builder, evolve_args| async move {
log_startup();
let remote_exex_config = evolve_args.remote_exex_grpc_listen_addr.map(|listen_addr| {
RemoteExExConfig::new(listen_addr, evolve_args.remote_exex_buffer)
});
let remote_notifications = remote_exex_config.as_ref().map(|config| {
std::sync::Arc::new(tokio::sync::broadcast::channel(config.buffer).0)
});
let remote_notifications_for_exex = remote_notifications.clone();
let handle = builder
.node(EvolveNode::new())
.extend_rpc_modules(move |ctx| {
Expand All @@ -110,9 +120,19 @@ fn main() {
ctx.modules.merge_configured(evolve_txpool.into_rpc())?;
Ok(())
})
.install_exex_if(remote_exex_config.is_some(), REMOTE_EXEX_ID, move |ctx| {
let notifications = remote_notifications_for_exex
.expect("remote exex notifications should be configured");
async move { Ok(remote_exex_task(ctx, notifications)) }
})
.launch()
.await?;

if let (Some(config), Some(notifications)) = (remote_exex_config, remote_notifications)
{
spawn_remote_exex_grpc_server(&handle.node.task_executor, config, notifications);
}

info!("=== EV-RETH: Node launched successfully with ev-reth payload builder ===");
handle.node_exit_future.await
})
Expand Down
23 changes: 23 additions & 0 deletions crates/exex-remote/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "ev-exex-remote"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Shared remote ExEx transport types for ev-reth"

[dependencies]
alloy-primitives = { workspace = true, features = ["serde"] }
bincode.workspace = true
serde.workspace = true
thiserror.workspace = true
tonic.workspace = true
prost.workspace = true

[build-dependencies]
tonic-build.workspace = true

[lints]
workspace = true
26 changes: 26 additions & 0 deletions crates/exex-remote/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! Build script for the shared remote `ExEx` transport crate.

use std::process::Command;

fn main() {
println!("cargo:rerun-if-changed=proto/remote_exex.proto");

if std::env::var_os("PROTOC").is_none() {
if let Ok(output) = Command::new("which").arg("protoc").output() {
if output.status.success() {
if let Ok(path) = String::from_utf8(output.stdout) {
let path = path.trim();
if !path.is_empty() {
std::env::set_var("PROTOC", path);
}
}
}
}
}

tonic_build::configure()
.build_server(true)
.build_client(true)
.compile_protos(&["proto/remote_exex.proto"], &["proto"])
.expect("failed to compile remote exex proto");
}
16 changes: 16 additions & 0 deletions crates/exex-remote/proto/remote_exex.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package exex.remote.v1;

service RemoteExEx {
rpc Subscribe(SubscribeRequest) returns (stream NotificationEnvelope);
}

message SubscribeRequest {}

message NotificationEnvelope {
uint32 schema_version = 1;
string encoding = 2;
bytes payload = 3;
}

Loading
Loading