From fe2818eeedbce5f3055779d4e9c7638027394d0d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 21 Mar 2026 21:09:33 +0100 Subject: [PATCH 1/2] add custom exex for block pushing --- Cargo.lock | 260 +++++++- Cargo.toml | 8 + README.md | 38 ++ bin/ev-reth/Cargo.toml | 4 + bin/ev-reth/examples/block_logger.rs | 57 ++ bin/ev-reth/examples/remote_consumer.rs | 90 +++ bin/ev-reth/src/main.rs | 24 +- crates/exex-remote/Cargo.toml | 23 + crates/exex-remote/build.rs | 26 + crates/exex-remote/proto/remote_exex.proto | 16 + crates/exex-remote/src/codec.rs | 169 +++++ crates/exex-remote/src/error.rs | 15 + crates/exex-remote/src/lib.rs | 46 ++ crates/exex-remote/src/types.rs | 255 ++++++++ crates/node/Cargo.toml | 7 + crates/node/src/args.rs | 66 +- crates/node/src/exex.rs | 601 ++++++++++++++++++ crates/node/src/lib.rs | 3 + crates/tests/src/lib.rs | 2 + crates/tests/src/test_remote_exex_contract.rs | 31 + 20 files changed, 1715 insertions(+), 26 deletions(-) create mode 100644 bin/ev-reth/examples/block_logger.rs create mode 100644 bin/ev-reth/examples/remote_consumer.rs create mode 100644 crates/exex-remote/Cargo.toml create mode 100644 crates/exex-remote/build.rs create mode 100644 crates/exex-remote/proto/remote_exex.proto create mode 100644 crates/exex-remote/src/codec.rs create mode 100644 crates/exex-remote/src/error.rs create mode 100644 crates/exex-remote/src/lib.rs create mode 100644 crates/exex-remote/src/types.rs create mode 100644 crates/node/src/exex.rs create mode 100644 crates/tests/src/test_remote_exex_contract.rs diff --git a/Cargo.lock b/Cargo.lock index 2c4de487..7160e95b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -507,7 +507,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.5.3", "tracing", "wasmtimer", ] @@ -554,7 +554,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.5.3", "tracing", "url", "wasmtimer", @@ -860,7 +860,7 @@ dependencies = [ "serde_json", "thiserror 2.0.18", "tokio", - "tower", + "tower 0.5.3", "tracing", "url", "wasmtimer", @@ -877,7 +877,7 @@ dependencies = [ "itertools 0.14.0", "reqwest", "serde_json", - "tower", + "tower 0.5.3", "tracing", "url", ] @@ -1439,6 +1439,53 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "backon" version = "1.6.0" @@ -2935,6 +2982,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "ev-exex-remote" +version = "0.1.0" +dependencies = [ + "alloy-primitives", + "bincode", + "prost 0.13.5", + "serde", + "thiserror 2.0.18", + "tonic 0.12.3", + "tonic-build", +] + [[package]] name = "ev-node" version = "0.1.0" @@ -2950,10 +3010,13 @@ dependencies = [ "alloy-rpc-types", "alloy-rpc-types-engine", "alloy-rpc-types-eth", + "alloy-signer", + "alloy-signer-local", "async-trait", "c-kzg", "clap", "ev-common", + "ev-exex-remote", "ev-primitives", "ev-revm", "evolve-ev-reth", @@ -2976,6 +3039,7 @@ dependencies = [ "reth-evm", "reth-evm-ethereum", "reth-execution-types", + "reth-exex", "reth-node-api", "reth-node-builder", "reth-node-core", @@ -3005,6 +3069,8 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "tokio", + "tokio-stream", + "tonic 0.12.3", "tracing", "tracing-subscriber 0.3.23", ] @@ -3053,11 +3119,14 @@ dependencies = [ "alloy-rpc-types", "clap", "ev-common", + "ev-exex-remote", "ev-node", "ev-precompiles", + "ev-primitives", "ev-revm", "evolve-ev-reth", "eyre", + "futures", "reth-basic-payload-builder", "reth-chainspec", "reth-cli-util", @@ -3068,6 +3137,7 @@ dependencies = [ "reth-ethereum-forks", "reth-ethereum-payload-builder", "reth-ethereum-primitives", + "reth-exex", "reth-node-api", "reth-node-builder", "reth-payload-builder", @@ -3322,6 +3392,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.9" @@ -4423,7 +4499,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-stream", - "tower", + "tower 0.5.3", "tracing", "wasm-bindgen-futures", ] @@ -4447,7 +4523,7 @@ dependencies = [ "serde_json", "thiserror 2.0.18", "tokio", - "tower", + "tower 0.5.3", "url", ] @@ -4487,7 +4563,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.5.3", "tracing", ] @@ -4512,7 +4588,7 @@ dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-types", - "tower", + "tower 0.5.3", ] [[package]] @@ -4525,7 +4601,7 @@ dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-types", - "tower", + "tower 0.5.3", "url", ] @@ -4876,6 +4952,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.8.0" @@ -5087,6 +5169,12 @@ dependencies = [ "unsigned-varint", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nom" version = "7.1.3" @@ -5479,11 +5567,11 @@ dependencies = [ "opentelemetry-http", "opentelemetry-proto", "opentelemetry_sdk", - "prost", + "prost 0.14.3", "reqwest", "thiserror 2.0.18", "tokio", - "tonic", + "tonic 0.14.5", "tracing", ] @@ -5495,8 +5583,8 @@ checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ "opentelemetry", "opentelemetry_sdk", - "prost", - "tonic", + "prost 0.14.3", + "tonic 0.14.5", "tonic-prost", ] @@ -5644,6 +5732,16 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap 2.13.0", +] + [[package]] name = "pharos" version = "0.5.3" @@ -5946,6 +6044,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive 0.13.5", +] + [[package]] name = "prost" version = "0.14.3" @@ -5953,7 +6061,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.14.3", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck", + "itertools 0.14.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.13.5", + "prost-types", + "regex", + "syn 2.0.117", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -5969,6 +6110,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost 0.13.5", +] + [[package]] name = "quanta" version = "0.12.6" @@ -6408,7 +6558,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-util", - "tower", + "tower 0.5.3", "tower-http", "tower-service", "url", @@ -7648,7 +7798,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.5.3", "tracing", ] @@ -8117,7 +8267,7 @@ dependencies = [ "reth-tasks", "tikv-jemalloc-ctl", "tokio", - "tower", + "tower 0.5.3", "tracing", ] @@ -8491,7 +8641,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-util", - "tower", + "tower 0.5.3", "tower-http", "tracing", ] @@ -8648,7 +8798,7 @@ dependencies = [ "http", "jsonrpsee-http-client", "pin-project", - "tower", + "tower 0.5.3", "tower-http", "tracing", ] @@ -10520,6 +10670,36 @@ version = "1.0.7+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17aaa1c6e3dc22b1da4b6bba97d066e354c7945cac2f7852d4e4e7ca7a6b56d" +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.5", + "socket2 0.5.10", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic" version = "0.14.5" @@ -10540,12 +10720,26 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-stream", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.117", +] + [[package]] name = "tonic-prost" version = "0.14.5" @@ -10553,8 +10747,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a55376a0bbaa4975a3f10d009ad763d8f4108f067c7c2e74f3001fb49778d309" dependencies = [ "bytes", - "prost", - "tonic", + "prost 0.14.3", + "tonic 0.14.5", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", ] [[package]] @@ -10601,7 +10815,7 @@ dependencies = [ "pin-project-lite", "tokio", "tokio-util", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 3a3d8245..c096c238 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "bin/ev-dev", "bin/ev-reth", "crates/common", + "crates/exex-remote", "crates/ev-primitives", "crates/evolve", "crates/node", @@ -68,9 +69,11 @@ reth-rpc-engine-api = { git = "https://github.com/paradigmxyz/reth.git", tag = " reth-rpc = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3" } reth-rpc-convert = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3" } reth-codecs = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3" } +reth-exex = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.11.3", features = ["serde"] } ev-revm = { path = "crates/ev-revm" } ev-primitives = { path = "crates/ev-primitives" } +ev-exex-remote = { path = "crates/exex-remote" } # Consensus dependencies @@ -150,6 +153,11 @@ rand = "0.10" tempfile = "3.10" hex = "0.4" url = "2.5" +bincode = "1.3.3" +prost = "0.13" +tokio-stream = "0.1" +tonic = { version = "0.12", features = ["transport"] } +tonic-build = "0.12" [workspace.lints] rust.missing_debug_implementations = "warn" diff --git a/README.md b/README.md index 72681e36..a922c30f 100644 --- a/README.md +++ b/README.md @@ -217,6 +217,44 @@ curl -X POST http://localhost:8545 \ } ``` +### Remote ExEx Streaming + +ev-reth now includes an opt-in remote ExEx stream for Atlas-style consumers. The goal is to push +canonical execution notifications instead of forcing Atlas to poll `eth_getBlockByNumber` and +`eth_getBlockReceipts` for every block. + +Enable it with: + +```bash +./target/release/ev-reth node \ + --remote-exex-grpc-listen-addr 127.0.0.1:10000 \ + --remote-exex-buffer 64 +``` + +The stream carries full blocks, receipts, logs, and EvNode sponsor metadata, so the consumer +should raise gRPC limits aggressively: + +```rust +let mut client = RemoteExExClient::connect("http://127.0.0.1:10000") + .await? + .max_encoding_message_size(usize::MAX) + .max_decoding_message_size(usize::MAX); +``` + +Operationally, the stream should be treated as best-effort: + +- The node should emit `FinishedHeight` after enqueueing the notification, not after client ack. +- Slow subscribers should be bounded by a finite buffer and dropped instead of blocking block production. +- Atlas should keep its existing polling path as a fallback during rollout. + +Atlas-oriented notes: + +- The pushed notification should preserve `EvTxEnvelope` transactions and recovered `feePayer` metadata. +- Reorg and revert notifications must remain explicit, so Atlas does not assume append-only delivery. +- DA enrichment stays out of scope for the first version. +- See `bin/ev-reth/examples/block_logger.rs` for a minimal in-process ExEx example. +- See `bin/ev-reth/examples/remote_consumer.rs` for a minimal gRPC subscriber example. + ## Architecture ### Modular Design diff --git a/bin/ev-reth/Cargo.toml b/bin/ev-reth/Cargo.toml index 102062cc..fc8a7191 100644 --- a/bin/ev-reth/Cargo.toml +++ b/bin/ev-reth/Cargo.toml @@ -18,6 +18,8 @@ ev-node = { path = "../../crates/node" } ev-common = { path = "../../crates/common" } ev-revm.workspace = true evolve-ev-reth = { path = "../../crates/evolve" } +ev-exex-remote.workspace = true +ev-primitives.workspace = true ev-precompiles = { path = "../../crates/ev-precompiles" } @@ -40,6 +42,7 @@ reth-trie-db.workspace = true reth-consensus.workspace = true reth-ethereum-primitives.workspace = true reth-ethereum-forks.workspace = true +reth-exex.workspace = true # Alloy dependencies alloy-network.workspace = true @@ -57,6 +60,7 @@ clap = { workspace = true, features = ["derive", "env"] } serde = { workspace = true, features = ["derive"] } thiserror.workspace = true url.workspace = true +futures.workspace = true # Reth OTLP tracing reth-tracing-otlp.workspace = true diff --git a/bin/ev-reth/examples/block_logger.rs b/bin/ev-reth/examples/block_logger.rs new file mode 100644 index 00000000..372b96b4 --- /dev/null +++ b/bin/ev-reth/examples/block_logger.rs @@ -0,0 +1,57 @@ +//! Minimal in-process `ExEx` example for ev-reth. +//! +//! This mirrors the Reth `ExEx` pattern but keeps the handler local and small. + +use clap::Parser; +use ev_node::{EvolveArgs, EvolveChainSpecParser, EvolveNode}; +use ev_primitives::EvPrimitives; +use futures::TryStreamExt; +use reth_ethereum::node::api::{FullNodeComponents, NodeTypes}; +use reth_ethereum_cli::Cli; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use tracing::info; + +async fn block_logger(mut ctx: ExExContext) -> eyre::Result<()> +where + Node: FullNodeComponents>, +{ + while let Some(notification) = ctx.notifications.try_next().await? { + match notification { + ExExNotification::ChainCommitted { new } => { + info!( + committed_range = ?new.range(), + committed_tip = ?new.tip().num_hash(), + "received committed chain" + ); + ctx.events + .send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + ExExNotification::ChainReorged { old, new } => { + info!( + from_range = ?old.range(), + to_range = ?new.range(), + "received reorg" + ); + ctx.events + .send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + ExExNotification::ChainReverted { old } => { + info!(reverted_range = ?old.range(), "received revert"); + } + } + } + + Ok(()) +} + +fn main() -> eyre::Result<()> { + Cli::::parse().run(|builder, _evolve_args| async move { + let handle = builder + .node(EvolveNode::new()) + .install_exex("block-logger", |ctx| async move { Ok(block_logger(ctx)) }) + .launch() + .await?; + + handle.node_exit_future.await + }) +} diff --git a/bin/ev-reth/examples/remote_consumer.rs b/bin/ev-reth/examples/remote_consumer.rs new file mode 100644 index 00000000..f5d0bb30 --- /dev/null +++ b/bin/ev-reth/examples/remote_consumer.rs @@ -0,0 +1,90 @@ +//! Minimal remote `ExEx` consumer example for ev-reth. +use ev_exex_remote::{ + decode_notification_envelope, wire::RemoteNotificationV1, NotificationEnvelope, + RemoteExExClient, SubscribeRequest, +}; +use tracing::info; + +fn summarize(notification: &RemoteNotificationV1) -> (&'static str, usize, usize, usize) { + match notification { + RemoteNotificationV1::ChainCommitted { blocks, .. } => { + let tx_count = blocks.iter().map(|block| block.transactions.len()).sum(); + let sponsor_count = blocks + .iter() + .flat_map(|block| &block.transactions) + .filter(|tx| tx.fee_payer.is_some()) + .count(); + ("commit", blocks.len(), tx_count, sponsor_count) + } + RemoteNotificationV1::ChainReorged { + reverted_blocks, + committed_blocks, + .. + } => ( + "reorg", + reverted_blocks.len() + committed_blocks.len(), + reverted_blocks + .iter() + .map(|block| block.transactions.len()) + .sum::() + + committed_blocks + .iter() + .map(|block| block.transactions.len()) + .sum::(), + reverted_blocks + .iter() + .flat_map(|block| &block.transactions) + .filter(|tx| tx.fee_payer.is_some()) + .count() + + committed_blocks + .iter() + .flat_map(|block| &block.transactions) + .filter(|tx| tx.fee_payer.is_some()) + .count(), + ), + RemoteNotificationV1::ChainReverted { + reverted_blocks, .. + } => { + let tx_count = reverted_blocks + .iter() + .map(|block| block.transactions.len()) + .sum(); + let sponsor_count = reverted_blocks + .iter() + .flat_map(|block| &block.transactions) + .filter(|tx| tx.fee_payer.is_some()) + .count(); + ("revert", reverted_blocks.len(), tx_count, sponsor_count) + } + } +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let endpoint = std::env::var("REMOTE_EXEX_ENDPOINT") + .unwrap_or_else(|_| "http://127.0.0.1:10000".to_string()); + + let mut client = RemoteExExClient::connect(endpoint) + .await? + .max_encoding_message_size(usize::MAX) + .max_decoding_message_size(usize::MAX); + + let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner(); + while let Some(message) = stream.message().await? { + let message: NotificationEnvelope = message; + let notification = decode_notification_envelope(&message)?; + let (kind, block_count, tx_count, sponsor_count) = summarize(¬ification); + + info!( + schema_version = message.schema_version, + encoding = ?message.encoding, + kind, + block_count, + tx_count, + sponsor_count, + "received remote ExEx notification" + ); + } + + Ok(()) +} diff --git a/bin/ev-reth/src/main.rs b/bin/ev-reth/src/main.rs index 79c35162..5f00c902 100644 --- a/bin/ev-reth/src/main.rs +++ b/bin/ev-reth/src/main.rs @@ -16,7 +16,10 @@ use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; use url::Url; -use ev_node::{log_startup, EvolveArgs, EvolveChainSpecParser, EvolveNode}; +use ev_node::{ + log_startup, remote_exex_task, spawn_remote_exex_grpc_server, EvolveArgs, + EvolveChainSpecParser, EvolveNode, RemoteExExConfig, REMOTE_EXEX_ID, +}; #[global_allocator] static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); @@ -96,8 +99,15 @@ fn main() { init_tracing(); if let Err(err) = - Cli::::parse().run(|builder, _evolve_args| async move { + Cli::::parse().run(|builder, evolve_args| async move { log_startup(); + let remote_exex_config = evolve_args.remote_exex_grpc_listen_addr.map(|listen_addr| { + RemoteExExConfig::new(listen_addr, evolve_args.remote_exex_buffer) + }); + let remote_notifications = remote_exex_config.as_ref().map(|config| { + std::sync::Arc::new(tokio::sync::broadcast::channel(config.buffer).0) + }); + let remote_notifications_for_exex = remote_notifications.clone(); let handle = builder .node(EvolveNode::new()) .extend_rpc_modules(move |ctx| { @@ -110,9 +120,19 @@ fn main() { ctx.modules.merge_configured(evolve_txpool.into_rpc())?; Ok(()) }) + .install_exex_if(remote_exex_config.is_some(), REMOTE_EXEX_ID, move |ctx| { + let notifications = remote_notifications_for_exex + .expect("remote exex notifications should be configured"); + async move { Ok(remote_exex_task(ctx, notifications)) } + }) .launch() .await?; + if let (Some(config), Some(notifications)) = (remote_exex_config, remote_notifications) + { + spawn_remote_exex_grpc_server(&handle.node.task_executor, config, notifications); + } + info!("=== EV-RETH: Node launched successfully with ev-reth payload builder ==="); handle.node_exit_future.await }) diff --git a/crates/exex-remote/Cargo.toml b/crates/exex-remote/Cargo.toml new file mode 100644 index 00000000..423f4cbf --- /dev/null +++ b/crates/exex-remote/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "ev-exex-remote" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Shared remote ExEx transport types for ev-reth" + +[dependencies] +alloy-primitives = { workspace = true, features = ["serde"] } +bincode.workspace = true +serde.workspace = true +thiserror.workspace = true +tonic.workspace = true +prost.workspace = true + +[build-dependencies] +tonic-build.workspace = true + +[lints] +workspace = true diff --git a/crates/exex-remote/build.rs b/crates/exex-remote/build.rs new file mode 100644 index 00000000..eabedeb5 --- /dev/null +++ b/crates/exex-remote/build.rs @@ -0,0 +1,26 @@ +//! Build script for the shared remote `ExEx` transport crate. + +use std::process::Command; + +fn main() { + println!("cargo:rerun-if-changed=proto/remote_exex.proto"); + + if std::env::var_os("PROTOC").is_none() { + if let Ok(output) = Command::new("which").arg("protoc").output() { + if output.status.success() { + if let Ok(path) = String::from_utf8(output.stdout) { + let path = path.trim(); + if !path.is_empty() { + std::env::set_var("PROTOC", path); + } + } + } + } + } + + tonic_build::configure() + .build_server(true) + .build_client(true) + .compile_protos(&["proto/remote_exex.proto"], &["proto"]) + .expect("failed to compile remote exex proto"); +} diff --git a/crates/exex-remote/proto/remote_exex.proto b/crates/exex-remote/proto/remote_exex.proto new file mode 100644 index 00000000..29636f74 --- /dev/null +++ b/crates/exex-remote/proto/remote_exex.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package exex.remote.v1; + +service RemoteExEx { + rpc Subscribe(SubscribeRequest) returns (stream NotificationEnvelope); +} + +message SubscribeRequest {} + +message NotificationEnvelope { + uint32 schema_version = 1; + string encoding = 2; + bytes payload = 3; +} + diff --git a/crates/exex-remote/src/codec.rs b/crates/exex-remote/src/codec.rs new file mode 100644 index 00000000..7e584a8c --- /dev/null +++ b/crates/exex-remote/src/codec.rs @@ -0,0 +1,169 @@ +use crate::{ + error::CodecError, + proto::NotificationEnvelope, + types::{RemoteNotificationV1, REMOTE_EXEX_SCHEMA_VERSION_V1}, +}; +use bincode::Options; + +/// Bincode encoding identifier stored in the transport envelope. +pub const REMOTE_EXEX_ENCODING_BINCODE_V1: &str = "bincode/v1"; +/// Schema version for the v1 remote notification payload. +pub const REMOTE_EXEX_SCHEMA_VERSION: u32 = REMOTE_EXEX_SCHEMA_VERSION_V1; + +fn bincode_options() -> impl Options { + bincode::DefaultOptions::new().with_fixint_encoding() +} + +/// Returns the schema version used by the remote `ExEx` payloads. +pub const fn remote_notification_schema_version() -> u32 { + REMOTE_EXEX_SCHEMA_VERSION +} + +/// Encodes a remote notification into a bincode byte vector. +pub fn encode_remote_notification( + notification: &RemoteNotificationV1, +) -> Result, CodecError> { + Ok(bincode_options().serialize(notification)?) +} + +/// Decodes a remote notification from a bincode byte slice. +pub fn decode_remote_notification(bytes: &[u8]) -> Result { + Ok(bincode_options().deserialize(bytes)?) +} + +/// Wraps a remote notification in the protobuf envelope. +pub fn encode_notification_envelope( + notification: &RemoteNotificationV1, +) -> Result { + Ok(NotificationEnvelope { + schema_version: REMOTE_EXEX_SCHEMA_VERSION, + encoding: REMOTE_EXEX_ENCODING_BINCODE_V1.to_string(), + payload: encode_remote_notification(notification)?, + }) +} + +/// Decodes a protobuf envelope into a remote notification. +pub fn decode_notification_envelope( + envelope: &NotificationEnvelope, +) -> Result { + if envelope.schema_version != REMOTE_EXEX_SCHEMA_VERSION { + return Err(CodecError::InvalidEnvelope("unexpected schema version")); + } + if envelope.encoding != REMOTE_EXEX_ENCODING_BINCODE_V1 { + return Err(CodecError::InvalidEnvelope("unexpected encoding")); + } + decode_remote_notification(&envelope.payload) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::{ + RemoteBlockMetadataV1, RemoteBlockRangeV1, RemoteBlockV1, RemoteCallV1, RemoteLogV1, + RemoteReceiptV1, RemoteTransactionV1, + }; + use alloy_primitives::{Address, Bytes, B256, U256}; + + fn sample_notification() -> RemoteNotificationV1 { + let tx = RemoteTransactionV1::new( + B256::repeat_byte(0x11), + Address::repeat_byte(0x22), + 0x76, + 7, + 21_000, + Some(1_000), + 1_500, + Some(500), + Some(Address::repeat_byte(0x33)), + U256::from(99u64), + Bytes::from_static(b"input"), + Bytes::from_static(b"raw"), + Some(Address::repeat_byte(0x44)), + vec![RemoteCallV1 { + to: Some(Address::repeat_byte(0x55)), + value: U256::from(7u64), + input: Bytes::from_static(b"call"), + }], + ); + let receipt = RemoteReceiptV1::new( + B256::repeat_byte(0x11), + true, + 21_000, + 21_000, + None, + vec![RemoteLogV1 { + address: Address::repeat_byte(0x66), + topics: vec![B256::repeat_byte(0x77)], + data: Bytes::from_static(b"log"), + log_index: 0, + transaction_log_index: Some(0), + }], + Some(Address::repeat_byte(0x44)), + ); + let block = RemoteBlockV1::new( + RemoteBlockMetadataV1 { + number: 42, + hash: B256::repeat_byte(0x88), + parent_hash: B256::repeat_byte(0x99), + timestamp: 1_700_000_000, + gas_limit: 30_000_000, + gas_used: 21_000, + fee_recipient: Address::repeat_byte(0xaa), + base_fee_per_gas: Some(1), + }, + vec![tx], + vec![receipt], + ); + + RemoteNotificationV1::ChainCommitted { + range: RemoteBlockRangeV1::new(42, 42), + blocks: vec![block], + } + } + + #[test] + fn notification_roundtrip() { + let notification = sample_notification(); + let encoded = encode_remote_notification(¬ification).expect("encode"); + let decoded = decode_remote_notification(&encoded).expect("decode"); + assert_eq!(notification, decoded); + } + + #[test] + fn envelope_roundtrip() { + let notification = sample_notification(); + let envelope = encode_notification_envelope(¬ification).expect("envelope"); + assert_eq!(envelope.schema_version, REMOTE_EXEX_SCHEMA_VERSION); + assert_eq!(envelope.encoding, REMOTE_EXEX_ENCODING_BINCODE_V1); + + let decoded = decode_notification_envelope(&envelope).expect("decode envelope"); + assert_eq!(notification, decoded); + } + + #[test] + fn reorg_and_revert_variants_roundtrip() { + let block = match sample_notification() { + RemoteNotificationV1::ChainCommitted { blocks, .. } => { + blocks.into_iter().next().unwrap() + } + _ => unreachable!("sample notification should be committed"), + }; + + let reorg = RemoteNotificationV1::ChainReorged { + reverted: RemoteBlockRangeV1::new(40, 40), + reverted_blocks: vec![block.clone()], + committed: RemoteBlockRangeV1::new(40, 40), + committed_blocks: vec![block.clone()], + }; + let revert = RemoteNotificationV1::ChainReverted { + reverted: RemoteBlockRangeV1::new(39, 40), + reverted_blocks: vec![block], + }; + + for notification in [reorg, revert] { + let envelope = encode_notification_envelope(¬ification).expect("encode"); + let decoded = decode_notification_envelope(&envelope).expect("decode"); + assert_eq!(notification, decoded); + } + } +} diff --git a/crates/exex-remote/src/error.rs b/crates/exex-remote/src/error.rs new file mode 100644 index 00000000..319f4752 --- /dev/null +++ b/crates/exex-remote/src/error.rs @@ -0,0 +1,15 @@ +use thiserror::Error; + +/// Errors returned when encoding or decoding remote `ExEx` payloads. +#[derive(Debug, Error)] +pub enum CodecError { + /// Bincode serialization or deserialization failed. + #[error("bincode codec error: {0}")] + Bincode(#[from] Box), + /// The notification envelope has unexpected schema or encoding metadata. + #[error("invalid notification envelope: {0}")] + InvalidEnvelope(&'static str), +} + +/// Decode-specific alias used by callers that only care about payload parsing. +pub type DecodeError = CodecError; diff --git a/crates/exex-remote/src/lib.rs b/crates/exex-remote/src/lib.rs new file mode 100644 index 00000000..7a3aba7d --- /dev/null +++ b/crates/exex-remote/src/lib.rs @@ -0,0 +1,46 @@ +//! Shared remote `ExEx` transport for ev-reth. +//! +//! This crate provides a minimal gRPC service definition plus serde-friendly +//! wire types for transporting canonical block execution events. + +mod codec; +mod error; +mod types; + +/// Generated gRPC/protobuf bindings. +#[allow( + missing_docs, + clippy::derive_partial_eq_without_eq, + clippy::doc_markdown, + clippy::missing_const_for_fn +)] +pub mod proto { + tonic::include_proto!("exex.remote.v1"); +} + +/// Stable user-facing re-exports for consumers of the wire contract. +pub mod wire { + pub use crate::{ + proto::NotificationEnvelope, + types::{ + RemoteBlockMetadataV1, RemoteBlockRangeV1, RemoteBlockV1, RemoteCallV1, RemoteLogV1, + RemoteNotificationV1, RemoteReceiptV1, RemoteTransactionV1, + }, + }; +} + +pub use codec::{ + decode_notification_envelope, decode_remote_notification, encode_notification_envelope, + encode_remote_notification, remote_notification_schema_version, + REMOTE_EXEX_ENCODING_BINCODE_V1, REMOTE_EXEX_SCHEMA_VERSION, +}; +pub use error::{CodecError, DecodeError}; +pub use proto::{ + remote_ex_ex_client::RemoteExExClient, + remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, + NotificationEnvelope, SubscribeRequest, +}; +pub use types::{ + RemoteBlockMetadataV1, RemoteBlockRangeV1, RemoteBlockV1, RemoteCallV1, RemoteLogV1, + RemoteNotificationV1, RemoteReceiptV1, RemoteTransactionV1, +}; diff --git a/crates/exex-remote/src/types.rs b/crates/exex-remote/src/types.rs new file mode 100644 index 00000000..4944efb3 --- /dev/null +++ b/crates/exex-remote/src/types.rs @@ -0,0 +1,255 @@ +use alloy_primitives::{Address, Bytes, B256, U256}; +use serde::{Deserialize, Serialize}; + +/// Schema version used by the remote `ExEx` stream. +pub(crate) const REMOTE_EXEX_SCHEMA_VERSION_V1: u32 = 1; + +/// Block range metadata used for committed, reverted, and reorg notifications. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteBlockRangeV1 { + /// Inclusive starting block number. + pub start_block: u64, + /// Inclusive ending block number. + pub end_block: u64, +} + +impl RemoteBlockRangeV1 { + /// Creates a new block range and asserts the bounds are ordered. + pub const fn new(start_block: u64, end_block: u64) -> Self { + assert!(start_block <= end_block, "start block must be <= end block"); + Self { + start_block, + end_block, + } + } +} + +/// Extra block metadata useful to indexers. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteBlockMetadataV1 { + /// Block number. + pub number: u64, + /// Block hash. + pub hash: B256, + /// Parent block hash. + pub parent_hash: B256, + /// Block timestamp. + pub timestamp: u64, + /// Block gas limit. + pub gas_limit: u64, + /// Gas used by the block. + pub gas_used: u64, + /// Fee recipient / beneficiary. + pub fee_recipient: Address, + /// Base fee per gas, if the chain uses EIP-1559 style pricing. + pub base_fee_per_gas: Option, +} + +/// Batch-call metadata carried for `EvNode` transactions. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteCallV1 { + /// Call destination. + pub to: Option
, + /// ETH value attached to the call. + pub value: U256, + /// Calldata for the call. + pub input: Bytes, +} + +/// Transaction metadata and payload suitable for EV transactions. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteTransactionV1 { + /// Transaction hash. + pub hash: B256, + /// Sender / recovered executor. + pub sender: Address, + /// EIP-2718 transaction type. + pub tx_type: u8, + /// Transaction nonce. + pub nonce: u64, + /// Gas limit requested by the transaction. + pub gas_limit: u64, + /// Legacy-style gas price, if present. + pub gas_price: Option, + /// EIP-1559 max fee per gas. + pub max_fee_per_gas: u128, + /// EIP-1559 priority fee cap. + pub max_priority_fee_per_gas: Option, + /// Transaction recipient, if any. + pub to: Option
, + /// ETH value transferred by the transaction. + pub value: U256, + /// Transaction input. + pub input: Bytes, + /// Raw encoded transaction bytes. + pub raw_2718: Bytes, + /// Optional recovered sponsor / fee payer. + pub fee_payer: Option
, + /// Batch call metadata for `EvNode` transactions. + pub calls: Vec, +} + +impl RemoteTransactionV1 { + /// Creates a new transaction payload and validates batch metadata when present. + #[allow(clippy::too_many_arguments)] + #[allow(clippy::missing_const_for_fn)] + pub fn new( + hash: B256, + sender: Address, + tx_type: u8, + nonce: u64, + gas_limit: u64, + gas_price: Option, + max_fee_per_gas: u128, + max_priority_fee_per_gas: Option, + to: Option
, + value: U256, + input: Bytes, + raw_2718: Bytes, + fee_payer: Option
, + calls: Vec, + ) -> Self { + Self { + hash, + sender, + tx_type, + nonce, + gas_limit, + gas_price, + max_fee_per_gas, + max_priority_fee_per_gas, + to, + value, + input, + raw_2718, + fee_payer, + calls, + } + } +} + +/// Receipt log metadata. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteLogV1 { + /// Emitting contract. + pub address: Address, + /// Indexed topics. + pub topics: Vec, + /// Log data payload. + pub data: Bytes, + /// Block-local log index. + pub log_index: u64, + /// Transaction-local log index if available. + pub transaction_log_index: Option, +} + +/// Receipt payload with attached logs and EV metadata. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteReceiptV1 { + /// Transaction hash associated with the receipt. + pub transaction_hash: B256, + /// Receipt status. + pub status: bool, + /// Gas used by the transaction. + pub gas_used: u64, + /// Cumulative gas used at this point in the block. + pub cumulative_gas_used: u64, + /// Contract address created by the transaction, if any. + pub contract_address: Option
, + /// Logs emitted by the transaction. + pub logs: Vec, + /// Optional recovered sponsor / fee payer. + pub fee_payer: Option
, +} + +impl RemoteReceiptV1 { + /// Creates a receipt payload and validates nothing beyond the type boundary. + #[allow(clippy::missing_const_for_fn)] + pub fn new( + transaction_hash: B256, + status: bool, + gas_used: u64, + cumulative_gas_used: u64, + contract_address: Option
, + logs: Vec, + fee_payer: Option
, + ) -> Self { + Self { + transaction_hash, + status, + gas_used, + cumulative_gas_used, + contract_address, + logs, + fee_payer, + } + } +} + +/// Block payload with transactions, receipts, and metadata. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteBlockV1 { + /// Block metadata. + pub metadata: RemoteBlockMetadataV1, + /// Transactions in execution order. + pub transactions: Vec, + /// Receipts in execution order. + pub receipts: Vec, +} + +impl RemoteBlockV1 { + /// Creates a block payload and asserts the transaction/receipt counts match. + pub fn new( + metadata: RemoteBlockMetadataV1, + transactions: Vec, + receipts: Vec, + ) -> Self { + assert_eq!( + transactions.len(), + receipts.len(), + "transactions and receipts must have matching lengths" + ); + Self { + metadata, + transactions, + receipts, + } + } +} + +/// Remote notification variants carried over the transport. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum RemoteNotificationV1 { + /// Canonical chain extension. + ChainCommitted { + /// Inclusive committed block range. + range: RemoteBlockRangeV1, + /// Newly committed blocks. + blocks: Vec, + }, + /// Chain reorganization. + ChainReorged { + /// Reverted block range. + reverted: RemoteBlockRangeV1, + /// Reverted blocks from the old branch. + reverted_blocks: Vec, + /// Committed block range for the replacement branch. + committed: RemoteBlockRangeV1, + /// Newly committed replacement blocks. + committed_blocks: Vec, + }, + /// Explicit revert notification. + ChainReverted { + /// Inclusive reverted block range. + reverted: RemoteBlockRangeV1, + /// Reverted blocks from the old branch. + reverted_blocks: Vec, + }, +} + +impl RemoteNotificationV1 { + /// Returns the schema version associated with the wire format. + pub const fn schema_version() -> u32 { + REMOTE_EXEX_SCHEMA_VERSION_V1 + } +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index a0910c92..2cef6c3b 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -14,6 +14,7 @@ ev-common = { path = "../common" } evolve-ev-reth = { path = "../evolve" } ev-revm = { path = "../ev-revm" } ev-primitives = { path = "../ev-primitives" } +ev-exex-remote.workspace = true # Reth dependencies reth-node-builder.workspace = true @@ -53,6 +54,8 @@ reth-rpc-eth-api.workspace = true reth-rpc-eth-types.workspace = true reth-engine-primitives.workspace = true reth-ethereum-primitives.workspace = true +reth-exex.workspace = true +reth-tasks.workspace = true # Alloy dependencies alloy-rpc-types.workspace = true @@ -77,6 +80,8 @@ thiserror.workspace = true async-trait.workspace = true futures.workspace = true clap.workspace = true +tokio-stream.workspace = true +tonic.workspace = true [dev-dependencies] # Test dependencies @@ -90,6 +95,8 @@ reth-tracing.workspace = true tempfile.workspace = true hex = "0.4" alloy-rlp.workspace = true +alloy-signer.workspace = true +alloy-signer-local.workspace = true tracing-subscriber = "0.3.23" [lints] diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 65f45a2d..cf2dd7df 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -1,5 +1,69 @@ use clap::Args; +use std::net::SocketAddr; + +const DEFAULT_REMOTE_EXEX_BUFFER: usize = 1024; + +fn parse_remote_exex_buffer(value: &str) -> Result { + let parsed = value + .parse::() + .map_err(|err| format!("invalid buffer size: {err}"))?; + if parsed == 0 { + return Err("remote ExEx buffer must be greater than zero".to_string()); + } + Ok(parsed) +} /// Evolve CLI arguments (currently empty; reserved for future toggles). #[derive(Debug, Clone, Default, Args)] -pub struct EvolveArgs {} +pub struct EvolveArgs { + /// Listen address for the built-in Remote `ExEx` gRPC server. + #[arg(long, value_name = "SOCKET_ADDR")] + pub remote_exex_grpc_listen_addr: Option, + /// Bounded notification buffer shared by Remote `ExEx` subscribers. + #[arg( + long, + value_name = "N", + default_value_t = DEFAULT_REMOTE_EXEX_BUFFER, + value_parser = parse_remote_exex_buffer + )] + pub remote_exex_buffer: usize, +} + +#[cfg(test)] +mod tests { + use super::EvolveArgs; + use clap::Parser; + + #[derive(Debug, Parser)] + struct TestCli { + #[command(flatten)] + args: EvolveArgs, + } + + #[test] + fn remote_exex_defaults_to_disabled() { + let cli = TestCli::try_parse_from(["test"]).expect("default cli should parse"); + assert!(cli.args.remote_exex_grpc_listen_addr.is_none()); + assert_eq!(cli.args.remote_exex_buffer, 1024); + } + + #[test] + fn remote_exex_flags_parse() { + let cli = TestCli::try_parse_from([ + "test", + "--remote-exex-grpc-listen-addr", + "127.0.0.1:30001", + "--remote-exex-buffer", + "16", + ]) + .expect("remote exex cli should parse"); + + assert_eq!( + cli.args + .remote_exex_grpc_listen_addr + .expect("listen address"), + "127.0.0.1:30001".parse().unwrap() + ); + assert_eq!(cli.args.remote_exex_buffer, 16); + } +} diff --git a/crates/node/src/exex.rs b/crates/node/src/exex.rs new file mode 100644 index 00000000..d0a4645c --- /dev/null +++ b/crates/node/src/exex.rs @@ -0,0 +1,601 @@ +use alloy_consensus::{ + transaction::{SignerRecoverable, TxHashRef}, + Transaction as _, TxReceipt, Typed2718, +}; +use alloy_eips::eip2718::Encodable2718; +use alloy_primitives::{Address, TxKind}; +use ev_exex_remote::{ + encode_notification_envelope, + proto::{remote_ex_ex_server::RemoteExEx, NotificationEnvelope, SubscribeRequest}, + wire::{ + RemoteBlockMetadataV1, RemoteBlockRangeV1, RemoteBlockV1, RemoteCallV1, RemoteLogV1, + RemoteNotificationV1, RemoteReceiptV1, RemoteTransactionV1, + }, + RemoteExExServer, +}; +use ev_primitives::{Call, EvPrimitives, EvTxEnvelope}; +use eyre::Result; +use futures::TryStreamExt; +use reth_execution_types::Chain; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_api::{FullNodeComponents, NodeTypes}; +use reth_tasks::TaskExecutor; +use std::{net::SocketAddr, sync::Arc}; +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status}; +use tracing::{debug, info}; + +/// Stable identifier used when installing the built-in Atlas-style remote `ExEx`. +pub const REMOTE_EXEX_ID: &str = "atlas-remote-exex"; + +/// Shared best-effort notification fan-out for connected remote `ExEx` subscribers. +pub type RemoteNotificationSender = Arc>; + +/// Runtime configuration for the built-in remote `ExEx` bridge. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RemoteExExConfig { + /// Socket address where the gRPC service should listen. + pub grpc_listen_addr: SocketAddr, + /// Bounded broadcast capacity shared by live subscribers. + pub buffer: usize, +} + +impl RemoteExExConfig { + /// Creates a new remote `ExEx` configuration. + pub const fn new(grpc_listen_addr: SocketAddr, buffer: usize) -> Self { + Self { + grpc_listen_addr, + buffer, + } + } +} + +#[derive(Debug)] +struct RemoteExExService { + notifications: RemoteNotificationSender, +} + +#[tonic::async_trait] +impl RemoteExEx for RemoteExExService { + type SubscribeStream = ReceiverStream>; + + async fn subscribe( + &self, + _request: Request, + ) -> Result, Status> { + let mut notifications = self.notifications.subscribe(); + let (tx, rx) = mpsc::channel(1); + + tokio::spawn(async move { + loop { + match notifications.recv().await { + Ok(notification) => { + if tx.send(Ok(notification)).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + let status = Status::resource_exhausted(format!( + "remote exex subscriber lagged by {skipped} messages" + )); + let _ = tx.send(Err(status)).await; + break; + } + } + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +/// Spawns the best-effort gRPC server that streams remote `ExEx` notifications. +pub fn spawn_remote_exex_grpc_server( + task_executor: &TaskExecutor, + config: RemoteExExConfig, + notifications: RemoteNotificationSender, +) { + let listen_addr = config.grpc_listen_addr; + task_executor.spawn_critical_task( + "remote exex grpc server", + Box::pin(async move { + if let Err(err) = serve_remote_exex_grpc_server(listen_addr, notifications).await { + tracing::error!(%listen_addr, error = ?err, "remote exex gRPC server exited"); + } + }), + ); +} + +async fn serve_remote_exex_grpc_server( + listen_addr: SocketAddr, + notifications: RemoteNotificationSender, +) -> Result<()> { + info!(%listen_addr, "starting remote exex gRPC server"); + + Server::builder() + .add_service(RemoteExExServer::new(RemoteExExService { notifications })) + .serve(listen_addr) + .await?; + + Ok(()) +} + +/// Forwards `ExEx` notifications into the bounded remote subscriber broadcast channel. +pub async fn remote_exex_task( + mut ctx: ExExContext, + notifications: RemoteNotificationSender, +) -> Result<()> +where + Node: FullNodeComponents>, +{ + while let Some(notification) = ctx.notifications.try_next().await? { + let (remote_notification, finished_height) = match ¬ification { + ExExNotification::ChainCommitted { new } => ( + RemoteNotificationV1::ChainCommitted { + range: chain_range(new), + blocks: chain_blocks(new)?, + }, + Some(new.tip().num_hash()), + ), + ExExNotification::ChainReorged { old, new } => ( + RemoteNotificationV1::ChainReorged { + reverted: chain_range(old), + reverted_blocks: chain_blocks(old)?, + committed: chain_range(new), + committed_blocks: chain_blocks(new)?, + }, + Some(new.tip().num_hash()), + ), + ExExNotification::ChainReverted { old } => ( + RemoteNotificationV1::ChainReverted { + reverted: chain_range(old), + reverted_blocks: chain_blocks(old)?, + }, + None, + ), + }; + + let envelope = encode_notification_envelope(&remote_notification)?; + match notifications.send(envelope) { + Ok(receivers) => { + debug!(receivers, "queued remote exex notification"); + } + Err(_) => { + debug!("remote exex notification dropped because no subscribers are connected"); + } + } + + if let Some(finished_height) = finished_height { + ctx.events + .send(ExExEvent::FinishedHeight(finished_height))?; + } + } + + Ok(()) +} + +fn chain_range(chain: &Chain) -> RemoteBlockRangeV1 { + let range = chain.range(); + RemoteBlockRangeV1::new(*range.start(), *range.end()) +} + +fn chain_blocks(chain: &Chain) -> Result> { + chain + .blocks_and_receipts() + .map(remote_block) + .collect::>>() +} + +fn remote_block( + (block, receipts): ( + &reth_primitives_traits::RecoveredBlock, + &Vec, + ), +) -> Result { + let metadata = RemoteBlockMetadataV1 { + number: block.header().number, + hash: block.hash(), + parent_hash: block.header().parent_hash, + timestamp: block.header().timestamp, + gas_limit: block.header().gas_limit, + gas_used: block.header().gas_used, + fee_recipient: block.header().beneficiary, + base_fee_per_gas: block.header().base_fee_per_gas.map(u128::from), + }; + + let txs = block.body().transactions.as_slice(); + if txs.len() != receipts.len() { + eyre::bail!( + "transaction/receipt mismatch for block {}: {} txs vs {} receipts", + metadata.number, + txs.len(), + receipts.len() + ); + } + + let mut block_log_index = 0u64; + let mut previous_cumulative_gas_used = 0u64; + let mut remote_txs = Vec::with_capacity(txs.len()); + let mut remote_receipts = Vec::with_capacity(receipts.len()); + + for (tx, receipt) in txs.iter().zip(receipts.iter()) { + let sender = tx.recover_signer().map_err(|err| { + eyre::eyre!("failed to recover signer for tx {:?}: {err}", tx.tx_hash()) + })?; + let fee_payer = fee_payer(tx, sender); + + remote_txs.push(remote_transaction(tx, sender, fee_payer)); + remote_receipts.push(remote_receipt( + tx, + receipt, + fee_payer, + &mut block_log_index, + &mut previous_cumulative_gas_used, + )); + } + + Ok(RemoteBlockV1::new(metadata, remote_txs, remote_receipts)) +} + +fn remote_transaction( + tx: &EvTxEnvelope, + sender: Address, + fee_payer: Option
, +) -> RemoteTransactionV1 { + RemoteTransactionV1::new( + *tx.tx_hash(), + sender, + tx.ty(), + tx.nonce(), + tx.gas_limit(), + tx.gas_price(), + tx.max_fee_per_gas(), + tx.max_priority_fee_per_gas(), + match tx.kind() { + TxKind::Call(to) => Some(to), + TxKind::Create => None, + }, + tx.value(), + tx.input().clone(), + tx.encoded_2718().into(), + fee_payer, + batch_calls(tx), + ) +} + +fn remote_receipt( + tx: &EvTxEnvelope, + receipt: &ev_primitives::Receipt, + fee_payer: Option
, + block_log_index: &mut u64, + previous_cumulative_gas_used: &mut u64, +) -> RemoteReceiptV1 { + let mut tx_log_index = 0u64; + let logs = receipt + .logs() + .iter() + .map(|log| { + let remote_log = RemoteLogV1 { + address: log.address, + topics: log.data.topics().to_vec(), + data: log.data.data.clone(), + log_index: *block_log_index, + transaction_log_index: Some(tx_log_index), + }; + *block_log_index += 1; + tx_log_index += 1; + remote_log + }) + .collect(); + let gas_used = receipt + .cumulative_gas_used + .saturating_sub(*previous_cumulative_gas_used); + *previous_cumulative_gas_used = receipt.cumulative_gas_used; + + RemoteReceiptV1::new( + *tx.tx_hash(), + receipt.status(), + gas_used, + receipt.cumulative_gas_used, + None, + logs, + fee_payer, + ) +} + +fn fee_payer(tx: &EvTxEnvelope, sender: Address) -> Option
{ + match tx { + EvTxEnvelope::EvNode(ev) => ev + .tx() + .fee_payer_signature + .as_ref() + .and_then(|signature| ev.tx().recover_sponsor(sender, signature).ok()), + EvTxEnvelope::Ethereum(_) => None, + } +} + +fn batch_calls(tx: &EvTxEnvelope) -> Vec { + match tx { + EvTxEnvelope::EvNode(ev) => ev.tx().calls.iter().map(remote_call).collect(), + EvTxEnvelope::Ethereum(_) => Vec::new(), + } +} + +fn remote_call(call: &Call) -> RemoteCallV1 { + RemoteCallV1 { + to: match call.to { + TxKind::Call(address) => Some(address), + TxKind::Create => None, + }, + value: call.value, + input: call.input.clone(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_consensus::{SignableTransaction, TxLegacy}; + use alloy_eips::eip2930::AccessList; + use alloy_primitives::{Address, Bytes, Signature, B256, U256}; + use alloy_signer::SignerSync; + use alloy_signer_local::PrivateKeySigner; + use ev_exex_remote::{ + decode_notification_envelope, encode_notification_envelope, RemoteExExClient, + }; + use ev_primitives::{Call, EvNodeTransaction}; + use reth_primitives::Transaction; + use tokio::{ + sync::broadcast, + time::{sleep, timeout, Duration}, + }; + use tokio_stream::StreamExt; + + #[test] + fn remote_exex_config_is_constructible() { + let config = RemoteExExConfig::new("127.0.0.1:30001".parse().unwrap(), 32); + assert_eq!(config.grpc_listen_addr, "127.0.0.1:30001".parse().unwrap()); + assert_eq!(config.buffer, 32); + } + + #[test] + fn fee_payer_recovers_only_for_evnode_transactions() { + let signed = alloy_consensus::Signed::new_unhashed( + Transaction::Legacy(TxLegacy { + chain_id: Some(1234), + nonce: 0, + gas_price: 1, + gas_limit: 21_000, + to: TxKind::Create, + value: U256::ZERO, + input: Bytes::default(), + }), + Signature::test_signature(), + ); + let tx = EvTxEnvelope::Ethereum(reth_ethereum_primitives::TransactionSigned::from(signed)); + assert_eq!(fee_payer(&tx, Address::ZERO), None); + } + + #[test] + fn remote_transaction_captures_sponsored_evnode_metadata() { + let executor = PrivateKeySigner::from_slice(&[7u8; 32]).expect("executor key"); + let sponsor = PrivateKeySigner::from_slice(&[9u8; 32]).expect("sponsor key"); + let executor_address = executor.address(); + let sponsor_address = sponsor.address(); + + let tx = EvNodeTransaction { + chain_id: 1234, + nonce: 3, + max_priority_fee_per_gas: 1_000_000_000, + max_fee_per_gas: 2_000_000_000, + gas_limit: 100_000, + calls: vec![Call { + to: TxKind::Call(Address::repeat_byte(0x44)), + value: U256::from(123u64), + input: Bytes::from_static(b"call"), + }], + access_list: AccessList::default(), + fee_payer_signature: None, + }; + + let executor_sig = executor + .sign_hash_sync(&tx.signature_hash()) + .expect("executor signature"); + let mut signed = tx.into_signed(executor_sig); + let sponsor_sig = sponsor + .sign_hash_sync(&signed.tx().sponsor_signing_hash(executor_address)) + .expect("sponsor signature"); + signed.tx_mut().fee_payer_signature = Some(sponsor_sig); + + let envelope = EvTxEnvelope::EvNode(signed); + let remote = remote_transaction( + &envelope, + executor_address, + fee_payer(&envelope, executor_address), + ); + + assert_eq!(remote.sender, executor_address); + assert_eq!(remote.fee_payer, Some(sponsor_address)); + assert_eq!(remote.calls.len(), 1); + assert_eq!(remote.calls[0].to, Some(Address::repeat_byte(0x44))); + } + + #[tokio::test] + async fn lagging_subscriber_receives_resource_exhausted() { + let notifications = Arc::new(broadcast::channel(1).0); + let service = RemoteExExService { + notifications: notifications.clone(), + }; + + let response = service + .subscribe(Request::new(SubscribeRequest {})) + .await + .expect("subscribe"); + let mut stream = response.into_inner(); + + notifications + .send(NotificationEnvelope { + schema_version: 1, + encoding: "bincode/v1".to_string(), + payload: vec![1], + }) + .expect("first notification"); + tokio::task::yield_now().await; + + notifications + .send(NotificationEnvelope { + schema_version: 1, + encoding: "bincode/v1".to_string(), + payload: vec![2], + }) + .expect("second notification"); + tokio::task::yield_now().await; + + notifications + .send(NotificationEnvelope { + schema_version: 1, + encoding: "bincode/v1".to_string(), + payload: vec![3], + }) + .expect("third notification"); + notifications + .send(NotificationEnvelope { + schema_version: 1, + encoding: "bincode/v1".to_string(), + payload: vec![4], + }) + .expect("fourth notification"); + + let first = stream + .next() + .await + .expect("stream item") + .expect("first notification should arrive"); + assert_eq!(first.payload, vec![1]); + + let second = stream + .next() + .await + .expect("stream item") + .expect("second notification should arrive"); + assert_eq!(second.payload, vec![2]); + + let lagged = stream + .next() + .await + .expect("stream item") + .expect_err("lagged error"); + assert_eq!(lagged.code(), tonic::Code::ResourceExhausted); + } + + #[tokio::test] + async fn grpc_server_streams_notifications_to_subscribers() { + let notifications = Arc::new(broadcast::channel(4).0); + let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind test listener"); + let listen_addr = listener.local_addr().expect("listener addr"); + drop(listener); + + let server = tokio::spawn(serve_remote_exex_grpc_server( + listen_addr, + notifications.clone(), + )); + + let endpoint = format!("http://{listen_addr}"); + let mut client = loop { + match RemoteExExClient::connect(endpoint.clone()).await { + Ok(client) => break client, + Err(_) => sleep(Duration::from_millis(25)).await, + } + }; + + let mut stream = client + .subscribe(SubscribeRequest {}) + .await + .expect("subscribe") + .into_inner(); + + let notification = RemoteNotificationV1::ChainCommitted { + range: RemoteBlockRangeV1::new(7, 7), + blocks: vec![RemoteBlockV1::new( + RemoteBlockMetadataV1 { + number: 7, + hash: B256::repeat_byte(7), + parent_hash: B256::repeat_byte(6), + timestamp: 7, + gas_limit: 30_000_000, + gas_used: 21_000, + fee_recipient: Address::repeat_byte(8), + base_fee_per_gas: Some(1), + }, + vec![], + vec![], + )], + }; + + notifications + .send(encode_notification_envelope(¬ification).expect("encode envelope")) + .expect("queued notification"); + + let received = timeout(Duration::from_secs(5), stream.message()) + .await + .expect("notification timeout") + .expect("stream should remain healthy") + .expect("notification payload"); + + let decoded = decode_notification_envelope(&received).expect("decode envelope"); + assert_eq!(decoded, notification); + + server.abort(); + let _ = server.await; + } + + #[test] + fn encoded_envelope_roundtrips() { + let notification = RemoteNotificationV1::ChainCommitted { + range: RemoteBlockRangeV1::new(1, 1), + blocks: vec![RemoteBlockV1::new( + RemoteBlockMetadataV1 { + number: 1, + hash: B256::repeat_byte(1), + parent_hash: B256::ZERO, + timestamp: 1, + gas_limit: 30_000_000, + gas_used: 21_000, + fee_recipient: Address::repeat_byte(2), + base_fee_per_gas: Some(1), + }, + vec![RemoteTransactionV1::new( + B256::repeat_byte(3), + Address::repeat_byte(4), + 0x76, + 0, + 21_000, + None, + 1, + Some(1), + None, + U256::ZERO, + Bytes::default(), + Bytes::default(), + None, + vec![], + )], + vec![RemoteReceiptV1::new( + B256::repeat_byte(3), + true, + 21_000, + 21_000, + None, + vec![], + None, + )], + )], + }; + + let envelope = encode_notification_envelope(¬ification).expect("encode envelope"); + let decoded = decode_notification_envelope(&envelope).expect("decode envelope"); + assert_eq!(decoded, notification); + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 3007acdb..a82acd50 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -21,6 +21,8 @@ pub mod error; pub mod evm_executor; /// Executor wiring for EV aware execution. pub mod executor; +/// Execution extension support for remote consumers. +pub mod exex; /// Node composition and payload types. pub mod node; /// Payload service integration. @@ -47,6 +49,7 @@ pub use chainspec::EvolveChainSpecParser; pub use config::{ConfigError, EvolvePayloadBuilderConfig}; pub use error::EvolveEngineError; pub use executor::{build_evm_config, EvolveEvmConfig, EvolveExecutorBuilder}; +pub use exex::{remote_exex_task, spawn_remote_exex_grpc_server, RemoteExExConfig, REMOTE_EXEX_ID}; pub use node::{log_startup, EvolveEngineTypes, EvolveNode, EvolveNodeAddOns}; pub use payload_service::{EvolveEnginePayloadBuilder, EvolvePayloadBuilderBuilder}; pub use payload_types::EvBuiltPayload; diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs index 1022d18f..35510a2c 100644 --- a/crates/tests/src/lib.rs +++ b/crates/tests/src/lib.rs @@ -11,6 +11,8 @@ pub(crate) mod e2e_tests; mod test_deploy_allowlist; #[cfg(test)] mod test_evolve_engine_api; +#[cfg(test)] +mod test_remote_exex_contract; // Re-export common test utilities pub use common::*; diff --git a/crates/tests/src/test_remote_exex_contract.rs b/crates/tests/src/test_remote_exex_contract.rs new file mode 100644 index 00000000..8f6f6a26 --- /dev/null +++ b/crates/tests/src/test_remote_exex_contract.rs @@ -0,0 +1,31 @@ +//! Contract-style documentation tests for the remote `ExEx` stream. +//! +//! This file is intentionally standalone so the main implementation can wire it +//! into `crates/tests/src/lib.rs` later without rewriting the checks. + +#[test] +fn block_logger_example_mentions_finished_height() { + let source = include_str!("../../../bin/ev-reth/examples/block_logger.rs"); + assert!(source.contains("ExExEvent::FinishedHeight")); + assert!(source.contains("install_exex(\"block-logger\"")); + assert!(source.contains("committed_range")); +} + +#[test] +fn remote_consumer_example_mentions_message_limits() { + let source = include_str!("../../../bin/ev-reth/examples/remote_consumer.rs"); + assert!(source.contains("max_encoding_message_size(usize::MAX)")); + assert!(source.contains("max_decoding_message_size(usize::MAX)")); + assert!(source.contains("RemoteNotificationV1")); + assert!(source.contains("NotificationEnvelope")); + assert!(source.contains("sponsor_count")); + assert!(source.contains("fee_payer")); +} + +#[test] +fn readme_mentions_best_effort_streaming() { + let readme = include_str!("../../../README.md"); + assert!(readme.contains("best-effort")); + assert!(readme.contains("Remote ExEx")); + assert!(readme.contains("Atlas")); +} From 1805011d7d4ad545465dc61e329171a85fa7f2d5 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 21 Mar 2026 21:10:35 +0100 Subject: [PATCH 2/2] comments --- README.md | 15 +++++++++++++++ crates/exex-remote/src/lib.rs | 10 ++++++++++ crates/exex-remote/src/types.rs | 8 ++++++++ 3 files changed, 33 insertions(+) diff --git a/README.md b/README.md index a922c30f..790859b3 100644 --- a/README.md +++ b/README.md @@ -241,6 +241,21 @@ let mut client = RemoteExExClient::connect("http://127.0.0.1:10000") .max_decoding_message_size(usize::MAX); ``` +The stream intentionally uses ev-reth-owned wire types instead of exposing raw internal Reth or +EV structs directly: + +- The gRPC boundary is a contract for consumers such as Atlas, not an in-process Rust API. +- Reusing `Chain`, `EvTxEnvelope`, or raw receipt types directly would couple + Atlas to ev-reth's internal crate graph, serde layout, and exact Reth version. +- The remote types are narrower and Atlas-oriented: they preserve commit/reorg/revert semantics + and include fields Atlas needs directly, such as paired receipts/logs, recovered `feePayer`, + and batch-call metadata. +- The envelope keeps protobuf small and versioned while still carrying `raw_2718` bytes, so + consumers can fall back to full transaction decoding when needed. + +In short, the dedicated wire schema exists to make the push stream explicit and evolvable, while +leaving ev-reth free to change internal execution types without silently breaking Atlas. + Operationally, the stream should be treated as best-effort: - The node should emit `FinishedHeight` after enqueueing the notification, not after client ack. diff --git a/crates/exex-remote/src/lib.rs b/crates/exex-remote/src/lib.rs index 7a3aba7d..61346919 100644 --- a/crates/exex-remote/src/lib.rs +++ b/crates/exex-remote/src/lib.rs @@ -2,6 +2,16 @@ //! //! This crate provides a minimal gRPC service definition plus serde-friendly //! wire types for transporting canonical block execution events. +//! +//! The transport intentionally does not reuse ev-reth's internal execution types directly. +//! This is a wire contract for external consumers such as Atlas, not an in-process API. +//! Using dedicated remote types keeps the stream versionable and decoupled from ev-reth's +//! internal crate graph, serde layout, and exact Reth version. +//! +//! The remote schema is also narrower and consumer-oriented. It preserves explicit +//! commit/reorg/revert semantics and carries EV-specific derived fields that Atlas needs +//! directly, such as paired receipts/logs, recovered fee-payer metadata, batch-call data, +//! and raw EIP-2718 transaction bytes for fallback decoding. mod codec; mod error; diff --git a/crates/exex-remote/src/types.rs b/crates/exex-remote/src/types.rs index 4944efb3..9427b2f6 100644 --- a/crates/exex-remote/src/types.rs +++ b/crates/exex-remote/src/types.rs @@ -1,3 +1,11 @@ +//! Versioned wire types for the remote `ExEx` stream. +//! +//! These types deliberately mirror only the externally relevant execution data. +//! They are not aliases of ev-reth's in-process `Chain` or receipt/transaction +//! structs because the stream needs a stable, consumer-facing schema with explicit semantics. +//! The remote forms can therefore evolve independently while still preserving raw transaction +//! bytes and EV-specific derived metadata. + use alloy_primitives::{Address, Bytes, B256, U256}; use serde::{Deserialize, Serialize};