Skip to content

feat(message_bus): implement async fire-and-forget transport for VSR consensus#3134

Open
hubcio wants to merge 1 commit intomasterfrom
bus
Open

feat(message_bus): implement async fire-and-forget transport for VSR consensus#3134
hubcio wants to merge 1 commit intomasterfrom
bus

Conversation

@hubcio
Copy link
Copy Markdown
Contributor

@hubcio hubcio commented Apr 15, 2026

The previous cache/connection.rs was a single-stream blob that
held a RefCell across .await, awaited kernel write
completion in the send path, and serialized fan-out through one
shared lock. Under VSR pipelining a slow peer stalled sends to
every other peer in the same dispatch round, killing parallel
quorum collection. Reentrant sends to the same peer would also
panic on BorrowMutError.

Rebuilt the crate around a per-connection writer task that
drains a bounded async-channel mpsc and submits batches via a
single write_vectored_all syscall, with the dup(2)-split read
half handled by an independent reader task. send_to_* is now
sync try_send under the async fn shell: zero awaits, returns
SendError::Backpressure on full so VSR can recover via WAL
retransmission or view-change timeouts.

The lifecycle module owns a root Shutdown / ShutdownToken plus
a ConnectionRegistry that tracks the per-peer Sender and
both task handles for graceful drain. The directional rule
(lower id dials, higher id accepts) eliminates the dialed-
both-ways race without a tiebreaker. TCP_NODELAY is set on
every socket, and Message::into_frozen() removes the per-send
memcpy. Three new integration tests prove the architectural
properties: backpressure, vectored batching, head-of-line
freedom under a saturated peer.

track_background early-returns when the shutdown token is
already triggered: shutdown drains the background-task vec
exactly once, so a handle pushed after the drain would be
leaked. Tasks spawned post-shutdown have already observed the
cancellation by the time they reach the tracker, so dropping
the handle is the right thing.

The compio stream framing helpers live in message_bus::framing
(formerly message_bus::codec, renamed to avoid the collision
with binary_protocol::codec, which is the sans-IO WireEncode /
WireDecode trait module). MESSAGE_ALIGN is no longer duplicated:
it is promoted to pub in iggy_binary_protocol::consensus and the
trivial message_to_frozen wrapper is dropped in favour of calling
Message::into_frozen() at the writer-task call sites directly.

Consensus, metadata, partitions, shard, and simulator are
updated to the new MessageBus shape; core/server is left
untouched as legacy code. Cluster config gains the per-node
fields the bus needs to identify replicas (replica_id,
tcp_replica port).

@hubcio hubcio changed the title feat(message_bus): async fire-and-forget transport for VSR consensus feat(message_bus): implement async fire-and-forget transport for VSR consensus Apr 15, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 15, 2026

Codecov Report

❌ Patch coverage is 84.36041% with 335 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.62%. Comparing base (2d6562b) to head (d3d93fa).

Files with missing lines Patch % Lines
core/shard/src/router.rs 0.00% 47 Missing ⚠️
core/shard/src/coordinator.rs 84.89% 39 Missing and 3 partials ⚠️
core/message_bus/src/installer.rs 85.38% 27 Missing and 5 partials ⚠️
core/message_bus/src/replica_io.rs 64.10% 26 Missing and 2 partials ⚠️
...e/message_bus/src/lifecycle/connection_registry.rs 91.58% 23 Missing and 3 partials ⚠️
core/shard/src/lib.rs 73.91% 21 Missing and 3 partials ⚠️
core/message_bus/src/cache/connection.rs 91.07% 18 Missing and 1 partial ⚠️
core/binary_protocol/src/consensus/iobuf.rs 57.57% 14 Missing ⚠️
core/message_bus/src/framing.rs 82.92% 11 Missing and 3 partials ⚠️
core/message_bus/src/lib.rs 94.49% 7 Missing and 6 partials ⚠️
... and 13 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3134      +/-   ##
============================================
- Coverage     73.17%   70.62%   -2.56%     
  Complexity      943      943              
============================================
  Files          1123     1137      +14     
  Lines         97892    99773    +1881     
  Branches      75065    76966    +1901     
============================================
- Hits          71632    70461    -1171     
- Misses        23671    26861    +3190     
+ Partials       2589     2451     -138     
Components Coverage Δ
Rust Core 74.59% <84.36%> (+0.53%) ⬆️
Java SDK 62.30% <ø> (ø)
C# SDK 15.34% <ø> (-54.07%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.52% <ø> (+0.21%) ⬆️
Go SDK 39.41% <ø> (ø)
Files with missing lines Coverage Δ
core/binary_protocol/src/consensus/message.rs 49.45% <ø> (ø)
core/message_bus/src/config.rs 100.00% <100.00%> (ø)
core/message_bus/src/socket_opts.rs 100.00% <100.00%> (ø)
core/partitions/src/iggy_partitions.rs 43.65% <ø> (ø)
core/partitions/src/journal.rs 32.85% <100.00%> (+6.00%) ⬆️
core/shard/src/config.rs 100.00% <100.00%> (ø)
core/simulator/src/lib.rs 86.22% <100.00%> (ø)
core/simulator/src/replica.rs 100.00% <100.00%> (ø)
core/message_bus/src/lifecycle/shutdown.rs 98.55% <98.55%> (ø)
core/consensus/src/impls.rs 74.36% <66.66%> (-0.37%) ⬇️
... and 21 more

... and 103 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@hubcio hubcio force-pushed the bus branch 4 times, most recently from 73636f4 to 2bf0782 Compare April 17, 2026 10:38
Adds the message_bus crate: a compio + io_uring TCP transport that
backs the VSR consensus plane. Replicas and SDK clients both flow
through per-peer bounded mpsc queues drained by dedicated writer
tasks that coalesce up to MAX_BATCH messages into a single writev
syscall. A slow peer cannot stall sends to other peers; VSR's WAL /
timeout machinery covers drops.

Connection lifecycle, fd transfer, and shard-0 routing:

- Shard 0 is the sole binder of the replica / client listeners and
  the sole outbound dialer for higher-id peers. On each accept or
  successful dial the ShardZeroCoordinator dups the fd via
  fcntl(F_DUPFD_CLOEXEC), ships it to the target shard via an
  inter-shard ShardFrame channel, and drops its own stream so only
  the target's wrapped fd keeps the socket alive.

- TaggedSender<R> newtype lifts the senders[i].shard_id() == i
  invariant from a doc comment to a ctor assertion
  (assert_sender_ordering). Permuted sender Vecs now fail at
  bootstrap instead of silently misrouting every setup / mapping
  / forward frame.

- Authoritative replica_id -> owning_shard snapshot lives on the
  coordinator as [Option<u16>; 256]. A periodic refresh task
  re-broadcasts the snapshot so shards whose inbox was full when
  the original ReplicaMappingUpdate went out recover on the next
  tick. ConnectionLost handler invokes broadcast_mapping_clear
  cluster-wide.

- install_{replica,client}_stream reserves the registry key before
  spawning reader/writer tasks and aborts both halves (in-band
  install_aborted Cell) when the insert loses a duplicate-key
  race. Previously the losing reader stayed alive and could
  clobber the winner's mapping via notify_connection_lost or
  misroute on_request via the wrong client_id.

- notify_connection_lost fires exactly once per disconnect via a
  shared one-shot Cell<bool> guard across the writer and reader
  halves.

- periodic_reconnect skips peers already present in
  bus.replicas() or mapped via owning_shard. The dialer's contains
  check at install time fired AFTER the waste; reconnect storms
  against already-connected peers are eliminated.

- Partition retransmit sends Frozen directly from the journal
  instead of parse-reparse + per-target 4 KiB memcpy. The
  .expect("partition journal entry must contain valid prepare")
  that would panic the shard on a corrupted journal entry is gone.

- ConnectionLost handler on shard 0 calls
  coordinator.forget_mapping + broadcast_mapping_clear before
  touching local state, so other shards stop routing via a dead
  replica immediately.

Transport knobs:

- MessageBusConfig (max_batch, max_message_size,
  peer_queue_capacity, reconnect_period, keepalive_idle /
  _interval / _retries, close_peer_timeout) replaces the set of
  hardcoded module-level consts. Threads through
  IggyMessageBus::new / ::with_config, writer_task::run,
  framing::read_message, socket_opts::
  apply_keepalive_for_connection, installer, and connector.

- CoordinatorConfig (refresh_period plus two shard-0 skip flags)
  replaces the hardcoded refresh cadence and the prior
  shard-0-always RR. Default: skip shard 0 for replicas (long-lived
  steady flows offload to peer shards), keep shard 0 in the client
  RR (short-lived flows benefit from shard-0 parallelism). Both
  flags are configurable. rr_pick handles the skip logic and
  degrades safely when total_shards == 1.

- Both config types live in-crate under core/message_bus/src/config.rs
  and core/shard/src/config.rs with TODO markers pointing to their
  eventual home under core/configs once downstream server bootstrap
  that constructs the transport from ServerConfig lands.
  MAX_MESSAGE_SIZE and DEFAULT_RECONNECT_PERIOD retained as pub
  consts for test / bench ergonomics; other consts deleted.

Lifecycle and send-path correctness:

- Send-path borrow: ConnectionRegistry::borrow_sender and
  ReplicaRegistry::borrow_sender return Option<Ref<'_, BusSender>>
  via Ref::filter_map instead of a cloned Sender. try_send takes
  &self, so the prior clone was an atomic RMW per send on the
  inner Arc<State>. Zero atomic RMW on the fast path now.

- Graceful shutdown drains clients + replicas before background
  tasks (accept loops, reconnect periodic, coordinator refresh).
  A misbehaving background task can no longer consume the full
  timeout budget and force writer tasks to tear down mid
  write_vectored_all.

- drain_entries splits the per-entry deadline. Writer gets the
  full shared deadline; reader gets at least READER_DRAIN_FLOOR
  (250 ms) or half of what remains after the writer returns. Slow
  writers no longer force-cancel the reader.

- IggyMessageBus::set_replica_forward_fn / set_client_forward_fn
  take &self + RefCell<Option<ShardForwardFn>>, matching the
  sibling set_connection_lost_fn / set_shard_mapping pattern.
  Production wraps the bus in Rc, so the prior &mut self
  signature was unreachable post-wrap.

Security + robustness:

- dup_fd uses fcntl(F_DUPFD_CLOEXEC) so no future fork+exec child
  can inherit replica or client sockets. close_fd logs non-EINTR
  failures that previously went silent.

- SendError::DupFailed payload is i32 (errno), not String. The
  only heap-allocating variant is on a resource-exhaustion path
  and is now gone.

- replica_listener::handshake carries a TODO(IGGY-112) marker:
  current check validates cluster_id and directional id bound
  only; no shared secret / mTLS / version negotiation.
  Acceptable on the assumed trusted network; hardening tracked
  under IGGY-112.

Tests:

- Integration tests across tcp_client_roundtrip, replica_roundtrip,
  directional_connection, reconnect, reconnect_skip_connected,
  shard_zero_gating, head_of_line, vectored_batch, backpressure,
  connection_lost_notify, duplicate_client_id, graceful_shutdown.

- Unit tests for framing, connection_registry drain budgets,
  coordinator RR with the four skip-flag combinations,
  fd_transfer FD_CLOEXEC assertion, and ShardZeroCoordinator
  sender-ordering panic.

cargo build, cargo fmt --all, cargo clippy --workspace --all-targets,
and cargo nextest run --workspace (2284 passed, 96 skipped) all green.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant