Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3134 +/- ##
============================================
- Coverage 73.17% 70.62% -2.56%
Complexity 943 943
============================================
Files 1123 1137 +14
Lines 97892 99773 +1881
Branches 75065 76966 +1901
============================================
- Hits 71632 70461 -1171
- Misses 23671 26861 +3190
+ Partials 2589 2451 -138
🚀 New features to boost your workflow:
|
73636f4 to
2bf0782
Compare
Adds the message_bus crate: a compio + io_uring TCP transport that
backs the VSR consensus plane. Replicas and SDK clients both flow
through per-peer bounded mpsc queues drained by dedicated writer
tasks that coalesce up to MAX_BATCH messages into a single writev
syscall. A slow peer cannot stall sends to other peers; VSR's WAL /
timeout machinery covers drops.
Connection lifecycle, fd transfer, and shard-0 routing:
- Shard 0 is the sole binder of the replica / client listeners and
the sole outbound dialer for higher-id peers. On each accept or
successful dial the ShardZeroCoordinator dups the fd via
fcntl(F_DUPFD_CLOEXEC), ships it to the target shard via an
inter-shard ShardFrame channel, and drops its own stream so only
the target's wrapped fd keeps the socket alive.
- TaggedSender<R> newtype lifts the senders[i].shard_id() == i
invariant from a doc comment to a ctor assertion
(assert_sender_ordering). Permuted sender Vecs now fail at
bootstrap instead of silently misrouting every setup / mapping
/ forward frame.
- Authoritative replica_id -> owning_shard snapshot lives on the
coordinator as [Option<u16>; 256]. A periodic refresh task
re-broadcasts the snapshot so shards whose inbox was full when
the original ReplicaMappingUpdate went out recover on the next
tick. ConnectionLost handler invokes broadcast_mapping_clear
cluster-wide.
- install_{replica,client}_stream reserves the registry key before
spawning reader/writer tasks and aborts both halves (in-band
install_aborted Cell) when the insert loses a duplicate-key
race. Previously the losing reader stayed alive and could
clobber the winner's mapping via notify_connection_lost or
misroute on_request via the wrong client_id.
- notify_connection_lost fires exactly once per disconnect via a
shared one-shot Cell<bool> guard across the writer and reader
halves.
- periodic_reconnect skips peers already present in
bus.replicas() or mapped via owning_shard. The dialer's contains
check at install time fired AFTER the waste; reconnect storms
against already-connected peers are eliminated.
- Partition retransmit sends Frozen directly from the journal
instead of parse-reparse + per-target 4 KiB memcpy. The
.expect("partition journal entry must contain valid prepare")
that would panic the shard on a corrupted journal entry is gone.
- ConnectionLost handler on shard 0 calls
coordinator.forget_mapping + broadcast_mapping_clear before
touching local state, so other shards stop routing via a dead
replica immediately.
Transport knobs:
- MessageBusConfig (max_batch, max_message_size,
peer_queue_capacity, reconnect_period, keepalive_idle /
_interval / _retries, close_peer_timeout) replaces the set of
hardcoded module-level consts. Threads through
IggyMessageBus::new / ::with_config, writer_task::run,
framing::read_message, socket_opts::
apply_keepalive_for_connection, installer, and connector.
- CoordinatorConfig (refresh_period plus two shard-0 skip flags)
replaces the hardcoded refresh cadence and the prior
shard-0-always RR. Default: skip shard 0 for replicas (long-lived
steady flows offload to peer shards), keep shard 0 in the client
RR (short-lived flows benefit from shard-0 parallelism). Both
flags are configurable. rr_pick handles the skip logic and
degrades safely when total_shards == 1.
- Both config types live in-crate under core/message_bus/src/config.rs
and core/shard/src/config.rs with TODO markers pointing to their
eventual home under core/configs once downstream server bootstrap
that constructs the transport from ServerConfig lands.
MAX_MESSAGE_SIZE and DEFAULT_RECONNECT_PERIOD retained as pub
consts for test / bench ergonomics; other consts deleted.
Lifecycle and send-path correctness:
- Send-path borrow: ConnectionRegistry::borrow_sender and
ReplicaRegistry::borrow_sender return Option<Ref<'_, BusSender>>
via Ref::filter_map instead of a cloned Sender. try_send takes
&self, so the prior clone was an atomic RMW per send on the
inner Arc<State>. Zero atomic RMW on the fast path now.
- Graceful shutdown drains clients + replicas before background
tasks (accept loops, reconnect periodic, coordinator refresh).
A misbehaving background task can no longer consume the full
timeout budget and force writer tasks to tear down mid
write_vectored_all.
- drain_entries splits the per-entry deadline. Writer gets the
full shared deadline; reader gets at least READER_DRAIN_FLOOR
(250 ms) or half of what remains after the writer returns. Slow
writers no longer force-cancel the reader.
- IggyMessageBus::set_replica_forward_fn / set_client_forward_fn
take &self + RefCell<Option<ShardForwardFn>>, matching the
sibling set_connection_lost_fn / set_shard_mapping pattern.
Production wraps the bus in Rc, so the prior &mut self
signature was unreachable post-wrap.
Security + robustness:
- dup_fd uses fcntl(F_DUPFD_CLOEXEC) so no future fork+exec child
can inherit replica or client sockets. close_fd logs non-EINTR
failures that previously went silent.
- SendError::DupFailed payload is i32 (errno), not String. The
only heap-allocating variant is on a resource-exhaustion path
and is now gone.
- replica_listener::handshake carries a TODO(IGGY-112) marker:
current check validates cluster_id and directional id bound
only; no shared secret / mTLS / version negotiation.
Acceptable on the assumed trusted network; hardening tracked
under IGGY-112.
Tests:
- Integration tests across tcp_client_roundtrip, replica_roundtrip,
directional_connection, reconnect, reconnect_skip_connected,
shard_zero_gating, head_of_line, vectored_batch, backpressure,
connection_lost_notify, duplicate_client_id, graceful_shutdown.
- Unit tests for framing, connection_registry drain budgets,
coordinator RR with the four skip-flag combinations,
fd_transfer FD_CLOEXEC assertion, and ShardZeroCoordinator
sender-ordering panic.
cargo build, cargo fmt --all, cargo clippy --workspace --all-targets,
and cargo nextest run --workspace (2284 passed, 96 skipped) all green.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The previous cache/connection.rs was a single-stream blob that
held a RefCell across .await, awaited kernel write
completion in the send path, and serialized fan-out through one
shared lock. Under VSR pipelining a slow peer stalled sends to
every other peer in the same dispatch round, killing parallel
quorum collection. Reentrant sends to the same peer would also
panic on BorrowMutError.
Rebuilt the crate around a per-connection writer task that
drains a bounded async-channel mpsc and submits batches via a
single write_vectored_all syscall, with the dup(2)-split read
half handled by an independent reader task. send_to_* is now
sync try_send under the async fn shell: zero awaits, returns
SendError::Backpressure on full so VSR can recover via WAL
retransmission or view-change timeouts.
The lifecycle module owns a root Shutdown / ShutdownToken plus
a ConnectionRegistry that tracks the per-peer Sender and
both task handles for graceful drain. The directional rule
(lower id dials, higher id accepts) eliminates the dialed-
both-ways race without a tiebreaker. TCP_NODELAY is set on
every socket, and Message::into_frozen() removes the per-send
memcpy. Three new integration tests prove the architectural
properties: backpressure, vectored batching, head-of-line
freedom under a saturated peer.
track_background early-returns when the shutdown token is
already triggered: shutdown drains the background-task vec
exactly once, so a handle pushed after the drain would be
leaked. Tasks spawned post-shutdown have already observed the
cancellation by the time they reach the tracker, so dropping
the handle is the right thing.
The compio stream framing helpers live in message_bus::framing
(formerly message_bus::codec, renamed to avoid the collision
with binary_protocol::codec, which is the sans-IO WireEncode /
WireDecode trait module). MESSAGE_ALIGN is no longer duplicated:
it is promoted to pub in iggy_binary_protocol::consensus and the
trivial message_to_frozen wrapper is dropped in favour of calling
Message::into_frozen() at the writer-task call sites directly.
Consensus, metadata, partitions, shard, and simulator are
updated to the new MessageBus shape; core/server is left
untouched as legacy code. Cluster config gains the per-node
fields the bus needs to identify replicas (replica_id,
tcp_replica port).