From 4a8a402c0fc07b4fffe405263fa8387703482d26 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 26 Feb 2026 17:03:50 +0800 Subject: [PATCH 01/29] RATIS-2403. Support leader batch write to improve linearizable follower read throughput --- .../src/site/markdown/configurations.md | 37 ++++- .../ratis/server/RaftServerConfigKeys.java | 33 ++++- .../ratis/server/impl/LeaderStateImpl.java | 130 ++++++++++++++++-- .../ratis/server/impl/PendingRequests.java | 12 ++ .../apache/ratis/LinearizableReadTests.java | 5 +- ...stLinearizableLeaderLeaseReadWithGrpc.java | 5 +- ...adAppliedIndexLeaderLeaseReadWithGrpc.java | 6 +- ...tLinearizableReadAppliedIndexWithGrpc.java | 6 +- ...leReadRepliedIndexLeaderLeaseWithGrpc.java | 29 ++++ ...tLinearizableReadRepliedIndexWithGrpc.java | 29 ++++ .../grpc/TestLinearizableReadWithGrpc.java | 5 +- 11 files changed, 267 insertions(+), 30 deletions(-) create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 67e988348c..3d59c6b9f4 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer within this specified ti ### Read Index - Configurations related to ReadIndex used in linearizable read -| **Property** | `raft.server.read.read-index.applied-index.enabled` | -|:----------------|:----------------------------------------------------------------------| -| **Description** | whether applied index (instead of commit index) is used for ReadIndex | -| **Type** | boolean | -| **Default** | false | +| **Property** | `raft.server.read.read-index.type` | +|:----------------|:-----------------------------------------------------------------------------| +| **Description** | type of read index returned | +| **Type** | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`, `APPLIED_INDEX`, `REPLIED_INDEX` | +| **Default** | `Read.ReadIndex.Type.COMMIT_INDEX` | + +* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft Paper section 6.4) + * The safest type as it is specified in the Raft dissertation + * This ReadIndex type can be chosen if the base linearizable read from followers performance already meets expectations. + +* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex + * Allow leader to return AppliedIndex (instead of CommitIndex) as the ReadIndex + * This reduces the time follower applying logs up to ReadIndex since AppliedIndex ≤ CommitIndex + * This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high. + +* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex + * RepliedIndex is defined as the AppliedIndex of the last write request replied by the leader. + * Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`. + * This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives. + * This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice. + * There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write. + * RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied requests. + * If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX` + +Note that theoretically all the ReadIndex types still guarantee linearizability, +but there are tradeoffs (e.g. Write and Read performance) between different types. + +| **Property** | `raft.server.read.read-index.replied-index.batch-interval` | +|:----------------|:---------------------------------------------------------------------------------------------------------------------------------------------| +| **Description** | if `Read.ReadIndex.Type` is `REAPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced | +| **Type** | TimeDuration | +| **Default** | 10ms | | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------| diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index ef16f67f67..2d55594782 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -280,15 +280,34 @@ static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration interface ReadIndex { String PREFIX = Read.PREFIX + ".read-index"; - String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled"; - boolean APPLIED_INDEX_ENABLED_DEFAULT = false; - static boolean appliedIndexEnabled(RaftProperties properties) { - return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY, - APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog()); + enum Type { + /** ReadIndex returns leader's commitIndex (see Raft Paper section 6.4). */ + COMMIT_INDEX, + + /** ReadIndex returns leader's appliedIndex to reduce the ReadIndex latency. */ + APPLIED_INDEX, + + /** ReadIndex returns leader's repliedIndex, the index of the last replied request. */ + REPLIED_INDEX + } + + String TYPE_KEY = PREFIX + ".type"; + Type TYPE_DEFAULT = Type.COMMIT_INDEX; + static Type type(RaftProperties properties) { + return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT, getDefaultLog()); + } + static void setType(RaftProperties properties, Type type) { + set(properties::setEnum, TYPE_KEY, type); } - static void setAppliedIndexEnabled(RaftProperties properties, boolean enabled) { - setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled); + String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX + ".replied-index.batch-interval"; + TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS); + static TimeDuration repliedIndexBatchInterval(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()), + REPLIED_INDEX_BATCH_INTERVAL_KEY, REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog()); + } + static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) { + setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index ef0bb6b700..36291354d6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -39,6 +39,7 @@ import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener; import org.apache.ratis.server.leader.FollowerInfo; import org.apache.ratis.server.leader.LeaderState; @@ -68,6 +69,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -80,8 +82,11 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -224,6 +229,19 @@ CompletableFuture stopAll() { } } + /** A write reply that has been built but not yet sent to the client */ + private static class HeldReply { + private final PendingRequest pending; + private final RaftClientReply reply; + private final long index; + + HeldReply(PendingRequest pending, RaftClientReply reply, long index) { + this.pending = pending; + this.reply = reply; + this.index = index; + } + } + /** For caching {@link FollowerInfo}s. This class is immutable. */ static class CurrentOldFollowerInfos { private final RaftConfigurationImpl conf; @@ -353,10 +371,21 @@ boolean isApplied() { private final PendingStepDown pendingStepDown; private final ReadIndexHeartbeats readIndexHeartbeats; - private final boolean readIndexAppliedIndexEnabled; + private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType; + private final Supplier readIndexSupplier; + private final MemoizedSupplier readIndexLogPrefixSupplier; private final boolean leaderHeartbeatCheckEnabled; private final LeaderLease lease; + /** The interval at which held write replies are flushed. */ + private final TimeDuration repliedIndexBatchInterval; + /** The highest log index for which a write reply has been flushed (sent to the client). */ + private final AtomicLong repliedIndex; + /** Buffer holding write replies waiting to be flushed. Guarded by itself. */ + private final AtomicReference> heldReplies; + /** Daemon thread that periodically flushes held replies. */ + private volatile Daemon replyFlusher; + LeaderStateImpl(RaftServerImpl server) { this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()); this.server = server; @@ -391,8 +420,33 @@ boolean isApplied() { } else { this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests); } - this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex - .appliedIndexEnabled(properties); + this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); + + this.repliedIndexBatchInterval = + RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties); + this.repliedIndex = new AtomicLong(state.getLastAppliedIndex()); + this.heldReplies = new AtomicReference<>(new LinkedList<>()); + + switch (readIndexType) { + case REPLIED_INDEX: + readIndexSupplier = repliedIndex::get; + readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "replied"); + final Daemon flusher = Daemon.newBuilder() + .setName(name + "-ReplyFlusher") + .setRunnable(this::runReplyFlusher) + .build(); + this.replyFlusher = flusher; + flusher.start(); + break; + case APPLIED_INDEX: + readIndexSupplier = () -> server.getState().getLastAppliedIndex(); + readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "applied"); + break; + case COMMIT_INDEX: + default: + readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex(); + readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "commit"); + } this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read .leaderHeartbeatCheckEnabled(properties); @@ -1140,14 +1194,13 @@ public boolean checkLeadership() { /** * Obtain the current readIndex for read only requests. See Raft paper section 6.4. * 1. Leader makes sure at least one log from current term is committed. - * 2. Leader record last committed index or applied index (depending on configuration) as readIndex. + * 2. Leader record last committed index or applied index or replied index (depending on configuration) as readIndex. * 3. Leader broadcast heartbeats to followers and waits for acknowledgements. * 4. If majority respond success, returns readIndex. * @return current readIndex. */ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { - final long index = readIndexAppliedIndexEnabled ? - server.getState().getLastAppliedIndex() : server.getRaftLog().getLastCommittedIndex(); + final long index = readIndexSupplier.get(); final long readIndex; if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex > index) { readIndex = readAfterWriteConsistentIndex; @@ -1155,7 +1208,7 @@ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { readIndex = index; } LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})", - readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit", + readIndex, readIndexLogPrefixSupplier.get(), index, readAfterWriteConsistentIndex); // if group contains only one member, fast path @@ -1218,9 +1271,70 @@ private boolean checkLeaderLease() { } void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { - pendingRequests.replyPendingRequest(termIndex, reply); + if (readIndexType == Type.REPLIED_INDEX) { + // Remove from pending map but hold the reply for batch flushing. + final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); + if (pending != null) { + holdReply(pending, reply, termIndex.getIndex()); + } + } else { + pendingRequests.replyPendingRequest(termIndex, reply); + } + } + + /** Hold a write reply for later batch flushing. */ + private void holdReply(PendingRequest pending, RaftClientReply reply, long index) { + heldReplies.getAndUpdate(prev -> { + prev.add(new HeldReply(pending, reply, index)); + return prev; + }); } + /** Flush all held replies and advance {@link #repliedIndex}. */ + private void flushReplies() { + if (heldReplies.get().isEmpty()) { + return; + } + final List toFlush = heldReplies.getAndSet(new LinkedList<>()); + + long maxIndex = repliedIndex.get(); + for (HeldReply held : toFlush) { + held.pending.setReply(held.reply); + maxIndex = Math.max(maxIndex, held.index); + } + repliedIndex.set(maxIndex); + LOG.debug("{}: flushed {} replies, repliedIndex={}", name, toFlush.size(), maxIndex); + } + + /** The reply flusher daemon loop. */ + private void runReplyFlusher() { + while (isRunning()) { + try { + Thread.sleep(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + flushReplies(); + } + // Flush remaining on exit. + flushReplies(); + } + + /** Stop the reply flusher daemon. */ + private void stopReplyFlusher() { + final Daemon flusher = this.replyFlusher; + if (flusher != null) { + flusher.interrupt(); + try { + flusher.join(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS) * 2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + TransactionContext getTransactionContext(TermIndex termIndex) { return pendingRequests.getTransactionContext(termIndex); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index c6a9dd2794..6eb059a0a9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -272,6 +272,18 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { } } + /** + * Remove the {@link PendingRequest} for the given {@link TermIndex} without sending a reply. + * @return the removed {@link PendingRequest}, or null if not found. + */ + PendingRequest removePendingRequest(TermIndex termIndex) { + final PendingRequest pending = pendingRequests.remove(termIndex); + if (pending != null) { + Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex"); + } + return pending; + } + /** * The leader state is stopped. Send NotLeaderException to all the pending * requests since they have not got applied to the state machine yet. diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index b15ae3067f..07529d1050 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -27,6 +27,7 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; @@ -60,7 +61,7 @@ public abstract class LinearizableReadTests public abstract boolean isLeaderLeaseEnabled(); - public abstract boolean readIndexAppliedIndexEnabled(); + public abstract Type readIndexType(); public abstract void assertRaftProperties(RaftProperties properties); @@ -77,7 +78,7 @@ public void setup() { CounterStateMachine.setProperties(p); RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); - RaftServerConfigKeys.Read.ReadIndex.setAppliedIndexEnabled(p, readIndexAppliedIndexEnabled()); + RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); } @Test diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java index d637498d73..f17686e109 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java @@ -20,6 +20,7 @@ import org.apache.ratis.LinearizableReadTests; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; @@ -35,8 +36,8 @@ public boolean isLeaderLeaseEnabled() { } @Override - public boolean readIndexAppliedIndexEnabled() { - return false; + public Type readIndexType() { + return Type.COMMIT_INDEX; } @Override diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java index 9bf3e307be..3705fb3ffc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + public class TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc extends TestLinearizableLeaderLeaseReadWithGrpc { @Override - public boolean readIndexAppliedIndexEnabled() { - return true; + public Type readIndexType() { + return Type.APPLIED_INDEX; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java index c019aac166..b119f32a6f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + public class TestLinearizableReadAppliedIndexWithGrpc extends TestLinearizableReadWithGrpc { @Override - public boolean readIndexAppliedIndexEnabled() { - return true; + public Type readIndexType() { + return Type.APPLIED_INDEX; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java new file mode 100644 index 0000000000..92b158c56c --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + +public class TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc + extends TestLinearizableLeaderLeaseReadWithGrpc { + + @Override + public Type readIndexType() { + return Type.REPLIED_INDEX; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java new file mode 100644 index 0000000000..f13ce82120 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + +public class TestLinearizableReadRepliedIndexWithGrpc + extends TestLinearizableReadWithGrpc { + + @Override + public Type readIndexType() { + return Type.REPLIED_INDEX; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java index 3e8860dd19..ce12050b19 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java @@ -20,6 +20,7 @@ import org.apache.ratis.LinearizableReadTests; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; @@ -35,8 +36,8 @@ public boolean isLeaderLeaseEnabled() { } @Override - public boolean readIndexAppliedIndexEnabled() { - return false; + public Type readIndexType() { + return Type.COMMIT_INDEX; } @Override From 534240e0631906944d7e69c6ccd739aca622f22e Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 26 Feb 2026 18:31:04 +0800 Subject: [PATCH 02/29] Move start daemon from constructor to start method to prevent race Generated-by: Cursor --- .../org/apache/ratis/server/impl/LeaderStateImpl.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 36291354d6..1b37435437 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -431,12 +431,10 @@ boolean isApplied() { case REPLIED_INDEX: readIndexSupplier = repliedIndex::get; readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "replied"); - final Daemon flusher = Daemon.newBuilder() + this.replyFlusher = Daemon.newBuilder() .setName(name + "-ReplyFlusher") .setRunnable(this::runReplyFlusher) .build(); - this.replyFlusher = flusher; - flusher.start(); break; case APPLIED_INDEX: readIndexSupplier = () -> server.getState().getLastAppliedIndex(); @@ -473,6 +471,10 @@ void start() { startupLogEntry.get(); processor.start(); senders.forEach(LogAppender::start); + + if (replyFlusher != null) { + replyFlusher.start(); + } } boolean isReady() { From 5898f390f0604cf26798cd15539b66b8b14f9aab Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 27 Feb 2026 16:10:35 +0800 Subject: [PATCH 03/29] Try to adapt test to the new REPLIED_INDEX guarantee. Generated-by: Cursor --- .../apache/ratis/LinearizableReadTests.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 07529d1050..63514ea602 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -144,10 +144,12 @@ static void runTestFollowerLinearizableRead(C cluste @Test public void testFollowerLinearizableReadParallel() throws Exception { - runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); + final Type type = readIndexType(); + runWithNewCluster(cluster -> runTestFollowerReadOnlyParallel(type, cluster)); } - static void runTestFollowerReadOnlyParallel(C cluster) throws Exception { + static void runTestFollowerReadOnlyParallel(Type readIndexType, C cluster) + throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); @@ -170,8 +172,17 @@ static void runTestFollowerReadOnlyParallel(C cluste writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); Thread.sleep(100); - assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); + if (readIndexType == Type.REPLIED_INDEX) { + // With REPLIED_INDEX the read index only advances after the leader has applied the + // transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in + // the state machine but we only waited 100 ms, so its reply has not been generated + // yet and the follower read may only see the preceding sync INCREMENT (count - 1). + assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1))); + } else { + assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); + } } for (int i = 0; i < n; i++) { From 7f58060eae4314ae9a47a4153e803ca7cb83584c Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 27 Feb 2026 16:49:17 +0800 Subject: [PATCH 04/29] Revert AtomicReference since it's not atomic and use synchronized list swap, add stopReplyFlusher to stop Generated-by: Cursor --- .../ratis/server/impl/LeaderStateImpl.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 1b37435437..211c0938fe 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -69,7 +69,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -83,7 +82,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; + import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; @@ -381,8 +380,10 @@ boolean isApplied() { private final TimeDuration repliedIndexBatchInterval; /** The highest log index for which a write reply has been flushed (sent to the client). */ private final AtomicLong repliedIndex; - /** Buffer holding write replies waiting to be flushed. Guarded by itself. */ - private final AtomicReference> heldReplies; + /** Guards {@link #heldReplies}. */ + private final Object heldRepliesLock = new Object(); + /** Buffer holding write replies waiting to be flushed. Guarded by {@link #heldRepliesLock}. */ + private List heldReplies = new ArrayList<>(); /** Daemon thread that periodically flushes held replies. */ private volatile Daemon replyFlusher; @@ -425,7 +426,6 @@ boolean isApplied() { this.repliedIndexBatchInterval = RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties); this.repliedIndex = new AtomicLong(state.getLastAppliedIndex()); - this.heldReplies = new AtomicReference<>(new LinkedList<>()); switch (readIndexType) { case REPLIED_INDEX: @@ -509,6 +509,7 @@ CompletableFuture stop() { startupLogEntry.get().getAppliedIndexFuture().completeExceptionally( new ReadIndexException("failed to obtain read index since: ", nle)); server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); + stopReplyFlusher(); logAppenderMetrics.unregister(); raftServerMetrics.unregister(); pendingRequests.close(); @@ -1286,18 +1287,21 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { /** Hold a write reply for later batch flushing. */ private void holdReply(PendingRequest pending, RaftClientReply reply, long index) { - heldReplies.getAndUpdate(prev -> { - prev.add(new HeldReply(pending, reply, index)); - return prev; - }); + synchronized (heldRepliesLock) { + heldReplies.add(new HeldReply(pending, reply, index)); + } } /** Flush all held replies and advance {@link #repliedIndex}. */ private void flushReplies() { - if (heldReplies.get().isEmpty()) { - return; + final List toFlush; + synchronized (heldRepliesLock) { + if (heldReplies.isEmpty()) { + return; + } + toFlush = heldReplies; + heldReplies = new ArrayList<>(); } - final List toFlush = heldReplies.getAndSet(new LinkedList<>()); long maxIndex = repliedIndex.get(); for (HeldReply held : toFlush) { From 36f8903810de0be1d8f2bfeb8c67dbb8219d1c8f Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 1 Mar 2026 20:31:55 +0800 Subject: [PATCH 05/29] Introduce ReplyFlusher and update log --- .../ratis/server/impl/LeaderStateImpl.java | 110 ++------------ .../ratis/server/impl/ReplyFlusher.java | 143 ++++++++++++++++++ 2 files changed, 154 insertions(+), 99 deletions(-) create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 211c0938fe..8d3c07de8b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -81,7 +81,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -228,19 +227,6 @@ CompletableFuture stopAll() { } } - /** A write reply that has been built but not yet sent to the client */ - private static class HeldReply { - private final PendingRequest pending; - private final RaftClientReply reply; - private final long index; - - HeldReply(PendingRequest pending, RaftClientReply reply, long index) { - this.pending = pending; - this.reply = reply; - this.index = index; - } - } - /** For caching {@link FollowerInfo}s. This class is immutable. */ static class CurrentOldFollowerInfos { private final RaftConfigurationImpl conf; @@ -372,20 +358,10 @@ boolean isApplied() { private final ReadIndexHeartbeats readIndexHeartbeats; private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType; private final Supplier readIndexSupplier; - private final MemoizedSupplier readIndexLogPrefixSupplier; private final boolean leaderHeartbeatCheckEnabled; private final LeaderLease lease; - /** The interval at which held write replies are flushed. */ - private final TimeDuration repliedIndexBatchInterval; - /** The highest log index for which a write reply has been flushed (sent to the client). */ - private final AtomicLong repliedIndex; - /** Guards {@link #heldReplies}. */ - private final Object heldRepliesLock = new Object(); - /** Buffer holding write replies waiting to be flushed. Guarded by {@link #heldRepliesLock}. */ - private List heldReplies = new ArrayList<>(); - /** Daemon thread that periodically flushes held replies. */ - private volatile Daemon replyFlusher; + private ReplyFlusher replyFlusher; LeaderStateImpl(RaftServerImpl server) { this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()); @@ -421,29 +397,20 @@ boolean isApplied() { } else { this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests); } - this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); - - this.repliedIndexBatchInterval = - RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties); - this.repliedIndex = new AtomicLong(state.getLastAppliedIndex()); + this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); switch (readIndexType) { case REPLIED_INDEX: - readIndexSupplier = repliedIndex::get; - readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "replied"); - this.replyFlusher = Daemon.newBuilder() - .setName(name + "-ReplyFlusher") - .setRunnable(this::runReplyFlusher) - .build(); + this.replyFlusher = new ReplyFlusher(name, state.getLastAppliedIndex(), + RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties)); + readIndexSupplier = replyFlusher::getRepliedIndex; break; case APPLIED_INDEX: readIndexSupplier = () -> server.getState().getLastAppliedIndex(); - readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "applied"); break; case COMMIT_INDEX: default: readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex(); - readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "commit"); } this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read .leaderHeartbeatCheckEnabled(properties); @@ -509,7 +476,9 @@ CompletableFuture stop() { startupLogEntry.get().getAppliedIndexFuture().completeExceptionally( new ReadIndexException("failed to obtain read index since: ", nle)); server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); - stopReplyFlusher(); + if (replyFlusher != null) { + replyFlusher.stop(); + } logAppenderMetrics.unregister(); raftServerMetrics.unregister(); pendingRequests.close(); @@ -1210,9 +1179,8 @@ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { } else { readIndex = index; } - LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})", - readIndex, readIndexLogPrefixSupplier.get(), - index, readAfterWriteConsistentIndex); + LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})", + readIndex, readIndexType, index, readAfterWriteConsistentIndex); // if group contains only one member, fast path if (server.getRaftConf().isSingleton()) { @@ -1278,69 +1246,13 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { // Remove from pending map but hold the reply for batch flushing. final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); if (pending != null) { - holdReply(pending, reply, termIndex.getIndex()); + replyFlusher.hold(pending, reply, termIndex.getIndex()); } } else { pendingRequests.replyPendingRequest(termIndex, reply); } } - /** Hold a write reply for later batch flushing. */ - private void holdReply(PendingRequest pending, RaftClientReply reply, long index) { - synchronized (heldRepliesLock) { - heldReplies.add(new HeldReply(pending, reply, index)); - } - } - - /** Flush all held replies and advance {@link #repliedIndex}. */ - private void flushReplies() { - final List toFlush; - synchronized (heldRepliesLock) { - if (heldReplies.isEmpty()) { - return; - } - toFlush = heldReplies; - heldReplies = new ArrayList<>(); - } - - long maxIndex = repliedIndex.get(); - for (HeldReply held : toFlush) { - held.pending.setReply(held.reply); - maxIndex = Math.max(maxIndex, held.index); - } - repliedIndex.set(maxIndex); - LOG.debug("{}: flushed {} replies, repliedIndex={}", name, toFlush.size(), maxIndex); - } - - /** The reply flusher daemon loop. */ - private void runReplyFlusher() { - while (isRunning()) { - try { - Thread.sleep(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - flushReplies(); - } - // Flush remaining on exit. - flushReplies(); - } - - /** Stop the reply flusher daemon. */ - private void stopReplyFlusher() { - final Daemon flusher = this.replyFlusher; - if (flusher != null) { - flusher.interrupt(); - try { - flusher.join(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS) * 2); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - TransactionContext getTransactionContext(TermIndex termIndex) { return pendingRequests.getTransactionContext(termIndex); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java new file mode 100644 index 0000000000..61a3429e45 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.server.raftlog.RaftLogIndex; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; + +/** + * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. + */ +public class ReplyFlusher { + static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class); + + /** A write reply that has been built but not yet sent to the client */ + static class HeldReply { + private final PendingRequest pending; + private final RaftClientReply reply; + private final long index; + + HeldReply(PendingRequest pending, RaftClientReply reply, long index) { + this.pending = pending; + this.reply = reply; + this.index = index; + } + + long release() { + pending.setReply(reply); + return index; + } + } + + static class Replies { + private LinkedList list = new LinkedList<>(); + + synchronized void add(PendingRequest pending, RaftClientReply reply, long index) { + list.add(new HeldReply(pending, reply, index)); + } + + synchronized LinkedList getAndSetNewList() { + final LinkedList old = list; + list = new LinkedList<>(); + return old; + } + } + + private final String name; + private final LifeCycle lifeCycle; + private final Daemon daemon; + private Replies replies = new Replies(); + private final RaftLogIndex repliedIndex; + /** The interval at which held write replies are flushed. */ + private final TimeDuration batchInterval; + + ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { + this.name = name = "-ReplyFlusher"; + this.lifeCycle = new LifeCycle(this.name); + this.daemon = Daemon.newBuilder() + .setName(this.name) + .setRunnable(this::run) + .build(); + this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex); + this.batchInterval = batchInterval; + } + + long getRepliedIndex() { + return repliedIndex.get(); + } + + /** Hold a write reply for later batch flushing */ + void hold(PendingRequest pending, RaftClientReply reply, long index) { + replies.add(pending, reply, index); + } + + void start() { + lifeCycle.startAndTransition(daemon::start); + } + + /** The reply flusher daemon loop. */ + private void run() { + try { + while (lifeCycle.getCurrentState() == LifeCycle.State.RUNNING) { + try { + Thread.sleep(batchInterval.toLong(TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + flush(); + } + } finally { + // Flush remaining on exit + flush(); + } + } + + /** Flush all held replies and advance {@link #repliedIndex}. */ + private void flush() { + final LinkedList toFlush = replies.getAndSetNewList(); + if (toFlush.isEmpty()) { + return; + } + long maxIndex = toFlush.removeLast().release(); + for (HeldReply held : toFlush) { + maxIndex = Math.max(maxIndex, held.release()); + } + repliedIndex.updateToMax(maxIndex, s -> + LOG.debug("{}: flushed {} replies, {}", name, toFlush.size(), s)); + } + + /** Stop the reply flusher daemon. */ + void stop() { + lifeCycle.checkStateAndClose(); + daemon.interrupt(); + try { + daemon.join(batchInterval.toLong(TimeUnit.MILLISECONDS )* 2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} From bfc51822dc0f2dd4f5b3b51e4d43a58d193a90e7 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 1 Mar 2026 20:34:41 +0800 Subject: [PATCH 06/29] Remove unnecessary blank line --- ratis-docs/src/site/markdown/configurations.md | 2 +- .../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 3d59c6b9f4..31c66293fd 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -241,7 +241,7 @@ if it fails to receive any RPC responses from this peer within this specified ti * This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives. * This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice. * There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write. - * RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied requests. + * RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied request. * If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX` Note that theoretically all the ReadIndex types still guarantee linearizability, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 8d3c07de8b..6a9d96bbc7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -81,7 +81,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; From 589117b65243ed3ae3def4d1ef7916474faff513 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 1 Mar 2026 20:38:18 +0800 Subject: [PATCH 07/29] Fix findbugs --- .../main/java/org/apache/ratis/server/impl/ReplyFlusher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 61a3429e45..6a6e922f89 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -75,7 +75,7 @@ synchronized LinkedList getAndSetNewList() { private final TimeDuration batchInterval; ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { - this.name = name = "-ReplyFlusher"; + this.name = name + "-ReplyFlusher"; this.lifeCycle = new LifeCycle(this.name); this.daemon = Daemon.newBuilder() .setName(this.name) From f1a2bae32dd2739177078f5ac0c32cebfaba9def Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 2 Mar 2026 12:05:20 +0800 Subject: [PATCH 08/29] Use appliedIndex during flush instead --- .../ratis/server/impl/LeaderStateImpl.java | 3 +- .../ratis/server/impl/ReplyFlusher.java | 36 +++++++++---------- .../apache/ratis/LinearizableReadTests.java | 19 +++------- 3 files changed, 23 insertions(+), 35 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 6a9d96bbc7..a3613f0796 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -401,6 +401,7 @@ boolean isApplied() { switch (readIndexType) { case REPLIED_INDEX: this.replyFlusher = new ReplyFlusher(name, state.getLastAppliedIndex(), + () -> server.getState().getLastAppliedIndex(), RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties)); readIndexSupplier = replyFlusher::getRepliedIndex; break; @@ -1245,7 +1246,7 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { // Remove from pending map but hold the reply for batch flushing. final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); if (pending != null) { - replyFlusher.hold(pending, reply, termIndex.getIndex()); + replyFlusher.hold(pending, reply); } } else { pendingRequests.replyPendingRequest(termIndex, reply); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 6a6e922f89..b4c37dac9f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; /** * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. @@ -34,29 +35,26 @@ public class ReplyFlusher { static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class); - /** A write reply that has been built but not yet sent to the client */ + /** A write reply that has been built but not yet sent to the client. */ static class HeldReply { private final PendingRequest pending; private final RaftClientReply reply; - private final long index; - HeldReply(PendingRequest pending, RaftClientReply reply, long index) { + HeldReply(PendingRequest pending, RaftClientReply reply) { this.pending = pending; this.reply = reply; - this.index = index; } - long release() { + void release() { pending.setReply(reply); - return index; } } static class Replies { private LinkedList list = new LinkedList<>(); - synchronized void add(PendingRequest pending, RaftClientReply reply, long index) { - list.add(new HeldReply(pending, reply, index)); + synchronized void add(PendingRequest pending, RaftClientReply reply) { + list.add(new HeldReply(pending, reply)); } synchronized LinkedList getAndSetNewList() { @@ -71,10 +69,12 @@ synchronized LinkedList getAndSetNewList() { private final Daemon daemon; private Replies replies = new Replies(); private final RaftLogIndex repliedIndex; + /** Supplies the last applied index from the state machine. */ + private final LongSupplier appliedIndexSupplier; /** The interval at which held write replies are flushed. */ private final TimeDuration batchInterval; - ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { + ReplyFlusher(String name, long repliedIndex, LongSupplier appliedIndexSupplier, TimeDuration batchInterval) { this.name = name + "-ReplyFlusher"; this.lifeCycle = new LifeCycle(this.name); this.daemon = Daemon.newBuilder() @@ -82,6 +82,7 @@ synchronized LinkedList getAndSetNewList() { .setRunnable(this::run) .build(); this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex); + this.appliedIndexSupplier = appliedIndexSupplier; this.batchInterval = batchInterval; } @@ -89,9 +90,9 @@ long getRepliedIndex() { return repliedIndex.get(); } - /** Hold a write reply for later batch flushing */ - void hold(PendingRequest pending, RaftClientReply reply, long index) { - replies.add(pending, reply, index); + /** Hold a write reply for later batch flushing. */ + void hold(PendingRequest pending, RaftClientReply reply) { + replies.add(pending, reply); } void start() { @@ -116,17 +117,14 @@ private void run() { } } - /** Flush all held replies and advance {@link #repliedIndex}. */ + /** Flush all held replies and advance {@link #repliedIndex} to the applied index. */ private void flush() { final LinkedList toFlush = replies.getAndSetNewList(); - if (toFlush.isEmpty()) { - return; - } - long maxIndex = toFlush.removeLast().release(); for (HeldReply held : toFlush) { - maxIndex = Math.max(maxIndex, held.release()); + held.release(); } - repliedIndex.updateToMax(maxIndex, s -> + final long appliedIndex = appliedIndexSupplier.getAsLong(); + repliedIndex.updateToMax(appliedIndex, s -> LOG.debug("{}: flushed {} replies, {}", name, toFlush.size(), s)); } diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 63514ea602..07529d1050 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -144,12 +144,10 @@ static void runTestFollowerLinearizableRead(C cluste @Test public void testFollowerLinearizableReadParallel() throws Exception { - final Type type = readIndexType(); - runWithNewCluster(cluster -> runTestFollowerReadOnlyParallel(type, cluster)); + runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); } - static void runTestFollowerReadOnlyParallel(Type readIndexType, C cluster) - throws Exception { + static void runTestFollowerReadOnlyParallel(C cluster) throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); @@ -172,17 +170,8 @@ static void runTestFollowerReadOnlyParallel(Type rea writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); Thread.sleep(100); - if (readIndexType == Type.REPLIED_INDEX) { - // With REPLIED_INDEX the read index only advances after the leader has applied the - // transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in - // the state machine but we only waited 100 ms, so its reply has not been generated - // yet and the follower read may only see the preceding sync INCREMENT (count - 1). - assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1))); - } else { - assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); - } + assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); } for (int i = 0; i < n; i++) { From e595eab517d7bdb74c50c4d084b2fca89f90ea30 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 2 Mar 2026 12:56:23 +0800 Subject: [PATCH 09/29] Fix replyFlusher never run issue --- .../java/org/apache/ratis/server/impl/ReplyFlusher.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index b4c37dac9f..c98987db7c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -96,7 +96,11 @@ void hold(PendingRequest pending, RaftClientReply reply) { } void start() { - lifeCycle.startAndTransition(daemon::start); + lifeCycle.transition(LifeCycle.State.STARTING); + // We need to transition to RUNNING first so that ReplyFlusher#run always + // see that the lifecycle state is in RUNNING state. + lifeCycle.transition(LifeCycle.State.RUNNING); + daemon.start(); } /** The reply flusher daemon loop. */ From 60380eea78eb3710cf761c7d931a5b37fded4274 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 2 Mar 2026 12:58:11 +0800 Subject: [PATCH 10/29] Update documentation --- ratis-docs/src/site/markdown/configurations.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 31c66293fd..f5189ed862 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -236,7 +236,7 @@ if it fails to receive any RPC responses from this peer within this specified ti * This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high. * `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex - * RepliedIndex is defined as the AppliedIndex of the last write request replied by the leader. + * RepliedIndex is defined as the last AppliedIndex of the leader when returning the last batch. * Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`. * This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives. * This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice. @@ -247,11 +247,11 @@ if it fails to receive any RPC responses from this peer within this specified ti Note that theoretically all the ReadIndex types still guarantee linearizability, but there are tradeoffs (e.g. Write and Read performance) between different types. -| **Property** | `raft.server.read.read-index.replied-index.batch-interval` | -|:----------------|:---------------------------------------------------------------------------------------------------------------------------------------------| -| **Description** | if `Read.ReadIndex.Type` is `REAPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced | -| **Type** | TimeDuration | -| **Default** | 10ms | +| **Property** | `raft.server.read.read-index.replied-index.batch-interval` | +|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------| +| **Description** | if `Read.ReadIndex.Type` is `REPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced | +| **Type** | TimeDuration | +| **Default** | 10ms | | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------| From 51287166f3ffbd72fd9815794405884a7955b9ac Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 12 Mar 2026 15:49:20 +0800 Subject: [PATCH 11/29] Try to use read index as effective commit index --- .../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index a3613f0796..42abd385d5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -646,7 +646,7 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo final boolean initializing = !isCaughtUp(follower); final RaftPeerId targetId = follower.getId(); return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, getCurrentTerm(), entries, - ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()), + ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(), previous, entries.size()), initializing, previous, server.getCommitInfos(), callId); } From 19321e30a7008af1cc8602cf50a8e330944818c8 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 12 Mar 2026 15:51:16 +0800 Subject: [PATCH 12/29] Revert "Use appliedIndex during flush instead" This reverts commit f1a2bae32dd2739177078f5ac0c32cebfaba9def. --- .../ratis/server/impl/LeaderStateImpl.java | 3 +- .../ratis/server/impl/ReplyFlusher.java | 36 ++++++++++--------- .../apache/ratis/LinearizableReadTests.java | 19 +++++++--- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 42abd385d5..ed0ebbf1a7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -401,7 +401,6 @@ boolean isApplied() { switch (readIndexType) { case REPLIED_INDEX: this.replyFlusher = new ReplyFlusher(name, state.getLastAppliedIndex(), - () -> server.getState().getLastAppliedIndex(), RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties)); readIndexSupplier = replyFlusher::getRepliedIndex; break; @@ -1246,7 +1245,7 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { // Remove from pending map but hold the reply for batch flushing. final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); if (pending != null) { - replyFlusher.hold(pending, reply); + replyFlusher.hold(pending, reply, termIndex.getIndex()); } } else { pendingRequests.replyPendingRequest(termIndex, reply); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index c98987db7c..d4fe6dedc6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -27,7 +27,6 @@ import java.util.LinkedList; import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; /** * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. @@ -35,26 +34,29 @@ public class ReplyFlusher { static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class); - /** A write reply that has been built but not yet sent to the client. */ + /** A write reply that has been built but not yet sent to the client */ static class HeldReply { private final PendingRequest pending; private final RaftClientReply reply; + private final long index; - HeldReply(PendingRequest pending, RaftClientReply reply) { + HeldReply(PendingRequest pending, RaftClientReply reply, long index) { this.pending = pending; this.reply = reply; + this.index = index; } - void release() { + long release() { pending.setReply(reply); + return index; } } static class Replies { private LinkedList list = new LinkedList<>(); - synchronized void add(PendingRequest pending, RaftClientReply reply) { - list.add(new HeldReply(pending, reply)); + synchronized void add(PendingRequest pending, RaftClientReply reply, long index) { + list.add(new HeldReply(pending, reply, index)); } synchronized LinkedList getAndSetNewList() { @@ -69,12 +71,10 @@ synchronized LinkedList getAndSetNewList() { private final Daemon daemon; private Replies replies = new Replies(); private final RaftLogIndex repliedIndex; - /** Supplies the last applied index from the state machine. */ - private final LongSupplier appliedIndexSupplier; /** The interval at which held write replies are flushed. */ private final TimeDuration batchInterval; - ReplyFlusher(String name, long repliedIndex, LongSupplier appliedIndexSupplier, TimeDuration batchInterval) { + ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { this.name = name + "-ReplyFlusher"; this.lifeCycle = new LifeCycle(this.name); this.daemon = Daemon.newBuilder() @@ -82,7 +82,6 @@ synchronized LinkedList getAndSetNewList() { .setRunnable(this::run) .build(); this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex); - this.appliedIndexSupplier = appliedIndexSupplier; this.batchInterval = batchInterval; } @@ -90,9 +89,9 @@ long getRepliedIndex() { return repliedIndex.get(); } - /** Hold a write reply for later batch flushing. */ - void hold(PendingRequest pending, RaftClientReply reply) { - replies.add(pending, reply); + /** Hold a write reply for later batch flushing */ + void hold(PendingRequest pending, RaftClientReply reply, long index) { + replies.add(pending, reply, index); } void start() { @@ -121,14 +120,17 @@ private void run() { } } - /** Flush all held replies and advance {@link #repliedIndex} to the applied index. */ + /** Flush all held replies and advance {@link #repliedIndex}. */ private void flush() { final LinkedList toFlush = replies.getAndSetNewList(); + if (toFlush.isEmpty()) { + return; + } + long maxIndex = toFlush.removeLast().release(); for (HeldReply held : toFlush) { - held.release(); + maxIndex = Math.max(maxIndex, held.release()); } - final long appliedIndex = appliedIndexSupplier.getAsLong(); - repliedIndex.updateToMax(appliedIndex, s -> + repliedIndex.updateToMax(maxIndex, s -> LOG.debug("{}: flushed {} replies, {}", name, toFlush.size(), s)); } diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 07529d1050..63514ea602 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -144,10 +144,12 @@ static void runTestFollowerLinearizableRead(C cluste @Test public void testFollowerLinearizableReadParallel() throws Exception { - runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); + final Type type = readIndexType(); + runWithNewCluster(cluster -> runTestFollowerReadOnlyParallel(type, cluster)); } - static void runTestFollowerReadOnlyParallel(C cluster) throws Exception { + static void runTestFollowerReadOnlyParallel(Type readIndexType, C cluster) + throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); @@ -170,8 +172,17 @@ static void runTestFollowerReadOnlyParallel(C cluste writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); Thread.sleep(100); - assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); + if (readIndexType == Type.REPLIED_INDEX) { + // With REPLIED_INDEX the read index only advances after the leader has applied the + // transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in + // the state machine but we only waited 100 ms, so its reply has not been generated + // yet and the follower read may only see the preceding sync INCREMENT (count - 1). + assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1))); + } else { + assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); + } } for (int i = 0; i < n; i++) { From 511e7df988b2aab958b5914f33833a66ec7b6ba7 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 12 Mar 2026 15:52:49 +0800 Subject: [PATCH 13/29] Retain the original test --- .../org/apache/ratis/LinearizableReadTests.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 63514ea602..e765a82869 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -172,17 +172,8 @@ static void runTestFollowerReadOnlyParallel(Type rea writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); Thread.sleep(100); - if (readIndexType == Type.REPLIED_INDEX) { - // With REPLIED_INDEX the read index only advances after the leader has applied the - // transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in - // the state machine but we only waited 100 ms, so its reply has not been generated - // yet and the follower read may only see the preceding sync INCREMENT (count - 1). - assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1))); - } else { - assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); - } + assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); } for (int i = 0; i < n; i++) { From 302b53ece55229aa93ad8d96723b7e787dc3a174 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 12 Mar 2026 17:26:31 +0800 Subject: [PATCH 14/29] Remove unnecessary parameter --- .../test/java/org/apache/ratis/LinearizableReadTests.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index e765a82869..f9e2daaa3b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine; @@ -144,12 +145,10 @@ static void runTestFollowerLinearizableRead(C cluste @Test public void testFollowerLinearizableReadParallel() throws Exception { - final Type type = readIndexType(); - runWithNewCluster(cluster -> runTestFollowerReadOnlyParallel(type, cluster)); + runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); } - static void runTestFollowerReadOnlyParallel(Type readIndexType, C cluster) - throws Exception { + static void runTestFollowerReadOnlyParallel(C cluster) throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); From c69b2b544c5800f637072ffe504df17a0673e545 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 23 Mar 2026 21:25:09 +0900 Subject: [PATCH 15/29] Assert read index type sync with the config --- .../org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java index ce12050b19..c211466305 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java @@ -25,6 +25,7 @@ import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; public class TestLinearizableReadWithGrpc extends LinearizableReadTests @@ -45,5 +46,6 @@ public void assertRaftProperties(RaftProperties p) { assertOption(LINEARIZABLE, p); assertFalse(RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); assertFalse(isLeaderLeaseEnabled()); + assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p)); } } From fb11a137f0ad4d93b640fdaa58f8f40a8574272e Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sat, 28 Mar 2026 07:53:55 +0900 Subject: [PATCH 16/29] Address comments --- .../ratis/server/impl/LeaderStateImpl.java | 21 ++-- .../ratis/server/impl/PendingRequests.java | 5 +- .../ratis/server/impl/RaftServerImpl.java | 8 +- .../ratis/server/impl/ReplyFlusher.java | 61 +++++----- .../apache/ratis/LinearizableReadTests.java | 32 ++++-- .../apache/ratis/ReadOnlyRequestTests.java | 17 ++- ...stLinearizableLeaderLeaseReadWithGrpc.java | 7 -- ...tLinearizableReadRepliedIndexWithGrpc.java | 106 ++++++++++++++++++ .../grpc/TestLinearizableReadWithGrpc.java | 15 --- 9 files changed, 189 insertions(+), 83 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index ed0ebbf1a7..d2169ef820 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -1240,15 +1240,22 @@ private boolean checkLeaderLease() { && (server.getRaftConf().isSingleton() || lease.isValid()); } - void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { + void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheImpl.CacheEntry cacheEntry) { + final PendingRequest pending = pendingRequests.remove(termIndex); + if (pending == null) { + return; + } + + final LongSupplier replyMethod = () -> { + cacheEntry.updateResult(reply); + pending.setReply(reply); + return termIndex.getIndex(); + }; + if (readIndexType == Type.REPLIED_INDEX) { - // Remove from pending map but hold the reply for batch flushing. - final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); - if (pending != null) { - replyFlusher.hold(pending, reply, termIndex.getIndex()); - } + replyFlusher.hold(replyMethod); } else { - pendingRequests.replyPendingRequest(termIndex, reply); + replyMethod.getAsLong(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index 6eb059a0a9..fd9a5b73b7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -264,12 +264,13 @@ TransactionContext getTransactionContext(TermIndex termIndex) { return pendingRequest != null ? pendingRequest.getEntry() : null; } - void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { + /** @return the removed the {@link PendingRequest} for the given {@link TermIndex}. */ + PendingRequest remove(TermIndex termIndex) { final PendingRequest pending = pendingRequests.remove(termIndex); if (pending != null) { Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex"); - pending.setReply(reply); } + return pending; } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d9dd09d966..12fd34d9ce 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1827,8 +1827,12 @@ private CompletableFuture replyPendingRequest( } // update pending request - role.getLeaderState().ifPresent(leader -> leader.replyPendingRequest(termIndex, r)); - cacheEntry.updateResult(r); + final LeaderStateImpl leader = role.getLeaderState().orElse(null); + if (leader != null) { + leader.replyPendingRequest(termIndex, r, cacheEntry); + } else { + cacheEntry.updateResult(r); + } }); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index d4fe6dedc6..2b9674ba2b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -17,9 +17,10 @@ */ package org.apache.ratis.server.impl; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.server.raftlog.RaftLogIndex; +import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -27,6 +28,7 @@ import java.util.LinkedList; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; /** * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. @@ -34,39 +36,25 @@ public class ReplyFlusher { static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class); - /** A write reply that has been built but not yet sent to the client */ - static class HeldReply { - private final PendingRequest pending; - private final RaftClientReply reply; - private final long index; - - HeldReply(PendingRequest pending, RaftClientReply reply, long index) { - this.pending = pending; - this.reply = reply; - this.index = index; - } - - long release() { - pending.setReply(reply); - return index; - } - } + private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class); + public static final String FLUSH = CLASS_NAME + ".flush"; static class Replies { - private LinkedList list = new LinkedList<>(); + /** When a {@link LongSupplier} is invoked, it completes a write reply and return the log index. */ + private LinkedList list = new LinkedList<>(); - synchronized void add(PendingRequest pending, RaftClientReply reply, long index) { - list.add(new HeldReply(pending, reply, index)); + synchronized void add(LongSupplier replyMethod) { + list.add(replyMethod); } - synchronized LinkedList getAndSetNewList() { - final LinkedList old = list; + synchronized LinkedList getAndSetNewList() { + final LinkedList old = list; list = new LinkedList<>(); return old; } } - private final String name; + private final Object id; private final LifeCycle lifeCycle; private final Daemon daemon; private Replies replies = new Replies(); @@ -74,11 +62,12 @@ synchronized LinkedList getAndSetNewList() { /** The interval at which held write replies are flushed. */ private final TimeDuration batchInterval; - ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { - this.name = name + "-ReplyFlusher"; - this.lifeCycle = new LifeCycle(this.name); + ReplyFlusher(Object id, long repliedIndex, TimeDuration batchInterval) { + this.id = id; + final String name = id + "-ReplyFlusher"; + this.lifeCycle = new LifeCycle(name); this.daemon = Daemon.newBuilder() - .setName(this.name) + .setName(name) .setRunnable(this::run) .build(); this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex); @@ -90,8 +79,8 @@ long getRepliedIndex() { } /** Hold a write reply for later batch flushing */ - void hold(PendingRequest pending, RaftClientReply reply, long index) { - replies.add(pending, reply, index); + void hold(LongSupplier replyMethod) { + replies.add(replyMethod); } void start() { @@ -122,16 +111,18 @@ private void run() { /** Flush all held replies and advance {@link #repliedIndex}. */ private void flush() { - final LinkedList toFlush = replies.getAndSetNewList(); + CodeInjectionForTesting.execute(FLUSH, id, null); + + final LinkedList toFlush = replies.getAndSetNewList(); if (toFlush.isEmpty()) { return; } - long maxIndex = toFlush.removeLast().release(); - for (HeldReply held : toFlush) { - maxIndex = Math.max(maxIndex, held.release()); + long maxIndex = toFlush.removeLast().getAsLong(); + for (LongSupplier held : toFlush) { + maxIndex = Math.max(maxIndex, held.getAsLong()); } repliedIndex.updateToMax(maxIndex, s -> - LOG.debug("{}: flushed {} replies, {}", name, toFlush.size(), s)); + LOG.debug("{}: flushed {} replies, {}", id, toFlush.size(), s)); } /** Stop the reply flusher daemon. */ diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index f9e2daaa3b..c01ca6f747 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -47,9 +47,12 @@ import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; import static org.apache.ratis.ReadOnlyRequestTests.QUERY; import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT; +import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast; import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; /** Test for the {@link RaftServerConfigKeys.Read.Option#LINEARIZABLE} feature. */ public abstract class LinearizableReadTests @@ -58,15 +61,20 @@ public abstract class LinearizableReadTests { Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); + Slf4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } public abstract boolean isLeaderLeaseEnabled(); public abstract Type readIndexType(); - public abstract void assertRaftProperties(RaftProperties properties); + public final void assertRaftProperties(RaftProperties p) { + assertOption(LINEARIZABLE, p); + assertEquals(isLeaderLeaseEnabled(), RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); + assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p)); + } - void runWithNewCluster(CheckedConsumer testCase) throws Exception { + protected void runWithNewCluster(CheckedConsumer testCase) throws Exception { runWithNewCluster(3, 0, true, cluster -> { assertRaftProperties(cluster.getProperties()); testCase.accept(cluster); @@ -97,20 +105,24 @@ public void testFollowerLinearizableRead() throws Exception { runWithNewCluster(LinearizableReadTests::runTestFollowerLinearizableRead); } - static class Reply { + public static class Reply { private final int count; private final CompletableFuture future; - Reply(int count, CompletableFuture future) { + public Reply(int count, CompletableFuture future) { this.count = count; this.future = future; } - void assertExact() { + public boolean isDone() { + return future.isDone(); + } + + public void assertExact() { assertReplyExact(count, future.join()); } - void assertAtLeast() { + public void assertAtLeast() { assertReplyAtLeast(count, future.join()); } } @@ -119,7 +131,7 @@ static void runTestFollowerLinearizableRead(C cluste final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); - Assertions.assertEquals(2, followers.size()); + assertEquals(2, followers.size()); final RaftPeerId f0 = followers.get(0).getId(); final RaftPeerId f1 = followers.get(1).getId(); @@ -152,7 +164,7 @@ static void runTestFollowerReadOnlyParallel(C cluste final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); - Assertions.assertEquals(2, followers.size()); + assertEquals(2, followers.size()); final RaftPeerId f0 = followers.get(0).getId(); final RaftPeerId f1 = followers.get(1).getId(); @@ -169,8 +181,10 @@ static void runTestFollowerReadOnlyParallel(C cluste count++; writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); + // sleep to let the commitIndex/appliedIndex get updated Thread.sleep(100); + // WAIT_AND_INCREMENT will delay 500ms to update the count, the read must wait for it. assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); } @@ -191,7 +205,7 @@ static void runTestLinearizableReadFailWhenLeaderDow final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); - Assertions.assertEquals(2, followers.size()); + assertEquals(2, followers.size()); final RaftPeerId f0 = followers.get(0).getId(); try (RaftClient leaderClient = cluster.createClient(leaderId); diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index aa77ee5c77..94e9433b15 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -58,9 +58,9 @@ public abstract class ReadOnlyRequestTests static final String WAIT_AND_INCREMENT_STRING = "WAIT_AND_INCREMENT"; static final String QUERY_STRING = "QUERY"; - static final Message INCREMENT = new RaftTestUtil.SimpleMessage(INCREMENT_STRING); - static final Message WAIT_AND_INCREMENT = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING); - static final Message QUERY = new RaftTestUtil.SimpleMessage(QUERY_STRING); + public static final Message INCREMENT = new RaftTestUtil.SimpleMessage(INCREMENT_STRING); + public static final Message WAIT_AND_INCREMENT = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING); + public static final Message QUERY = new RaftTestUtil.SimpleMessage(QUERY_STRING); @BeforeEach public void setup() { @@ -144,7 +144,7 @@ static int retrieve(RaftClientReply reply) { return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8)); } - static void assertReplyExact(int expectedCount, RaftClientReply reply) { + public static void assertReplyExact(int expectedCount, RaftClientReply reply) { Assertions.assertTrue(reply.isSuccess()); final int retrieved = retrieve(reply); Assertions.assertEquals(expectedCount, retrieved, () -> "reply=" + reply); @@ -163,7 +163,7 @@ static void assertReplyAtLeast(int minCount, RaftClientReply reply) { * 2. get * 3. waitAndIncrement */ - static class CounterStateMachine extends BaseStateMachine { + public static class CounterStateMachine extends BaseStateMachine { static void setProperties(RaftProperties properties) { properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, CounterStateMachine.class, StateMachine.class); } @@ -193,6 +193,10 @@ private void sleepQuietly(int millis) { } } + public long getCount() { + return counter.get(); + } + private long increment() { return counter.incrementAndGet(); } @@ -213,6 +217,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { final LogEntryProto logEntry = trx.getLogEntry(); final TermIndex ti = TermIndex.valueOf(logEntry); updateLastAppliedTermIndex(ti); + LOG.info("{}: updateLastAppliedTermIndex {}", getId(), ti); final String command = logEntry.getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8); @@ -224,7 +229,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { } else { updatedCount = timeoutIncrement(); } - LOG.info("Applied {} command {}, updatedCount={}", ti, command, updatedCount); + LOG.info("{}: Applied {} command {}, updatedCount={}", getId(), ti, command, updatedCount); return toMessageFuture(updatedCount); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java index f17686e109..4772c668f7 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java @@ -39,11 +39,4 @@ public boolean isLeaderLeaseEnabled() { public Type readIndexType() { return Type.COMMIT_INDEX; } - - @Override - public void assertRaftProperties(RaftProperties p) { - assertOption(LINEARIZABLE, p); - assertTrue(RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); - assertTrue(isLeaderLeaseEnabled()); - } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index f13ce82120..591bb10ec3 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -17,7 +17,28 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServer.Division; import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; +import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.server.impl.ReplyFlusher; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; +import static org.apache.ratis.ReadOnlyRequestTests.QUERY; +import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; public class TestLinearizableReadRepliedIndexWithGrpc extends TestLinearizableReadWithGrpc { @@ -26,4 +47,89 @@ public class TestLinearizableReadRepliedIndexWithGrpc public Type readIndexType() { return Type.REPLIED_INDEX; } + + @Test + @Override + public void testFollowerLinearizableReadParallel() throws Exception { + runWithNewCluster(TestLinearizableReadRepliedIndexWithGrpc::runTestFollowerReadOnlyParallelRepliedIndex); + } + + static void runTestFollowerReadOnlyParallelRepliedIndex(C cluster) throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final CounterStateMachine leaderStateMachine = (CounterStateMachine) leader.getStateMachine(); + + final List followers = cluster.getFollowers(); + assertEquals(2, followers.size()); + final RaftPeerId f0 = followers.get(0).getId(); + final RaftPeerId f1 = followers.get(1).getId(); + + final BlockingCode blockingReplyFlusher = new BlockingCode(); + CodeInjectionForTesting.put(ReplyFlusher.FLUSH, blockingReplyFlusher); + + try (RaftClient leaderClient = cluster.createClient(leader.getId()); + RaftClient f0Client = cluster.createClient(f0); + RaftClient f1Client = cluster.createClient(f1)) { + final int n = 10; + final List writeReplies = new ArrayList<>(n); + final List f0Replies = new ArrayList<>(n); + final List f1Replies = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + final int count = i + 1; + writeReplies.add(new Reply(count, leaderClient.async().send(INCREMENT))); + + // Because of read-after-write consistency, the reads must wait for all the writes + // Therefore, the expected count is n + f0Replies.add(new Reply(n, f0Client.async().sendReadOnly(QUERY, f0))); + f1Replies.add(new Reply(n, f1Client.async().sendReadOnly(QUERY, f1))); + + // sleep in order to make sure + // (1) the count is incremented, and + // (2) the reads will wait for the repliedIndex. + Thread.sleep(100); + assertEquals(count, leaderStateMachine.getCount()); + } + + // All replies should not yet complete since ReplyFlusher remains blocked + for (int i = 0; i < n; i++) { + assertFalse(writeReplies.get(i).isDone()); + assertFalse(f0Replies.get(i).isDone()); + assertFalse(f1Replies.get(i).isDone()); + } + + // unblock ReplyFlusher + blockingReplyFlusher.complete(); + assertReplyExact(n, f0Client.io().sendReadOnly(QUERY, f0)); + assertReplyExact(n, f1Client.io().sendReadOnly(QUERY, f0)); + + for (int i = 0; i < n; i++) { + // write reply should get the exact count at the write time + writeReplies.get(i).assertExact(); + // read reply should be delayed and get the count at the unblocked time + f0Replies.get(i).assertExact(); + f1Replies.get(i).assertExact(); + } + + } + } + + static class BlockingCode implements CodeInjectionForTesting.Code { + private final CompletableFuture future = new CompletableFuture<>(); + + void complete() { + future.complete(null); + } + + @Override + public boolean execute(Object localId, Object remoteId, Object... args) { + final boolean blocked = !future.isDone(); + if (blocked) { + LOG.info("{}: ReplyFlusher is blocked", localId, new Throwable()); + } + future.join(); + if (blocked) { + LOG.info("{}: ReplyFlusher is unblocked", localId); + } + return true; + } + } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java index c211466305..77593ff85e 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java @@ -18,15 +18,8 @@ package org.apache.ratis.grpc; import org.apache.ratis.LinearizableReadTests; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; -import static org.apache.ratis.ReadOnlyRequestTests.assertOption; -import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertSame; - public class TestLinearizableReadWithGrpc extends LinearizableReadTests implements MiniRaftClusterWithGrpc.FactoryGet { @@ -40,12 +33,4 @@ public boolean isLeaderLeaseEnabled() { public Type readIndexType() { return Type.COMMIT_INDEX; } - - @Override - public void assertRaftProperties(RaftProperties p) { - assertOption(LINEARIZABLE, p); - assertFalse(RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); - assertFalse(isLeaderLeaseEnabled()); - assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p)); - } } From 57a7a64cbe599c94f8f6b9a3db5b759db8a7ceed Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sat, 28 Mar 2026 08:29:12 +0900 Subject: [PATCH 17/29] Remove unused imports and fix tests --- .../test/java/org/apache/ratis/LinearizableReadTests.java | 1 - .../grpc/TestLinearizableLeaderLeaseReadWithGrpc.java | 6 ------ ...stLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java | 8 +++----- .../grpc/TestLinearizableReadRepliedIndexWithGrpc.java | 1 - 4 files changed, 3 insertions(+), 13 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index c01ca6f747..7abfc8773f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -40,7 +40,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine; diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java index 4772c668f7..120cce48cc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java @@ -18,14 +18,8 @@ package org.apache.ratis.grpc; import org.apache.ratis.LinearizableReadTests; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; -import static org.apache.ratis.ReadOnlyRequestTests.assertOption; -import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; -import static org.junit.jupiter.api.Assertions.assertTrue; - public class TestLinearizableLeaderLeaseReadWithGrpc extends LinearizableReadTests implements MiniRaftClusterWithGrpc.FactoryGet { diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java index 92b158c56c..bb50eafbfc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java @@ -17,13 +17,11 @@ */ package org.apache.ratis.grpc; -import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; - public class TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc - extends TestLinearizableLeaderLeaseReadWithGrpc { + extends TestLinearizableReadRepliedIndexWithGrpc { @Override - public Type readIndexType() { - return Type.REPLIED_INDEX; + public boolean isLeaderLeaseEnabled() { + return true; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index 591bb10ec3..f62f4851d7 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -27,7 +27,6 @@ import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.server.impl.ReplyFlusher; import org.apache.ratis.util.CodeInjectionForTesting; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.ArrayList; From 17d20ec69d29eeb82e6f6ffced20679bf321c8a4 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 31 Mar 2026 09:45:03 +0800 Subject: [PATCH 18/29] Update based on patch diff --- .../ratis/server/impl/LeaderStateImpl.java | 2 +- .../ratis/server/impl/PendingRequests.java | 12 ---------- .../ratis/server/impl/ReplyFlusher.java | 12 ++++------ .../apache/ratis/LinearizableReadTests.java | 7 +++--- ...tLinearizableReadRepliedIndexWithGrpc.java | 24 ++++++++++--------- 5 files changed, 22 insertions(+), 35 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index d2169ef820..b0f6c24298 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -400,7 +400,7 @@ boolean isApplied() { this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); switch (readIndexType) { case REPLIED_INDEX: - this.replyFlusher = new ReplyFlusher(name, state.getLastAppliedIndex(), + this.replyFlusher = new ReplyFlusher(server.getId(), state.getLastAppliedIndex(), RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties)); readIndexSupplier = replyFlusher::getRepliedIndex; break; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index fd9a5b73b7..f89d354e6a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -273,18 +273,6 @@ PendingRequest remove(TermIndex termIndex) { return pending; } - /** - * Remove the {@link PendingRequest} for the given {@link TermIndex} without sending a reply. - * @return the removed {@link PendingRequest}, or null if not found. - */ - PendingRequest removePendingRequest(TermIndex termIndex) { - final PendingRequest pending = pendingRequests.remove(termIndex); - if (pending != null) { - Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex"); - } - return pending; - } - /** * The leader state is stopped. Send NotLeaderException to all the pending * requests since they have not got applied to the state machine yet. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 2b9674ba2b..2f1975cfe0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -57,7 +57,7 @@ synchronized LinkedList getAndSetNewList() { private final Object id; private final LifeCycle lifeCycle; private final Daemon daemon; - private Replies replies = new Replies(); + private final Replies replies = new Replies(); private final RaftLogIndex repliedIndex; /** The interval at which held write replies are flushed. */ private final TimeDuration batchInterval; @@ -95,14 +95,12 @@ void start() { private void run() { try { while (lifeCycle.getCurrentState() == LifeCycle.State.RUNNING) { - try { - Thread.sleep(batchInterval.toLong(TimeUnit.MILLISECONDS)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } + batchInterval.sleep(); flush(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("{}: Interrupted ", daemon.getName(), e); } finally { // Flush remaining on exit flush(); diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 7abfc8773f..6a60ba9154 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -130,7 +130,7 @@ static void runTestFollowerLinearizableRead(C cluste final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); - assertEquals(2, followers.size()); + Assertions.assertEquals(2, followers.size()); final RaftPeerId f0 = followers.get(0).getId(); final RaftPeerId f1 = followers.get(1).getId(); @@ -163,7 +163,7 @@ static void runTestFollowerReadOnlyParallel(C cluste final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); - assertEquals(2, followers.size()); + Assertions.assertEquals(2, followers.size()); final RaftPeerId f0 = followers.get(0).getId(); final RaftPeerId f1 = followers.get(1).getId(); @@ -180,9 +180,8 @@ static void runTestFollowerReadOnlyParallel(C cluste count++; writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); - // sleep to let the commitIndex/appliedIndex get updated + // sleep to let the commitIndex/appliedIndex get updated. Thread.sleep(100); - // WAIT_AND_INCREMENT will delay 500ms to update the count, the read must wait for it. assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index f62f4851d7..f58eb94518 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -22,11 +22,11 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.RaftServer.Division; import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.server.impl.ReplyFlusher; import org.apache.ratis.util.CodeInjectionForTesting; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -53,12 +53,13 @@ public void testFollowerLinearizableReadParallel() throws Exception { runWithNewCluster(TestLinearizableReadRepliedIndexWithGrpc::runTestFollowerReadOnlyParallelRepliedIndex); } - static void runTestFollowerReadOnlyParallelRepliedIndex(C cluster) throws Exception { + static void runTestFollowerReadOnlyParallelRepliedIndex(C cluster) + throws Exception { final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); - final CounterStateMachine leaderStateMachine = (CounterStateMachine) leader.getStateMachine(); + final CounterStateMachine leaderStateMachine = (CounterStateMachine)leader.getStateMachine(); - final List followers = cluster.getFollowers(); - assertEquals(2, followers.size()); + final List followers = cluster.getFollowers(); + Assertions.assertEquals(2, followers.size()); final RaftPeerId f0 = followers.get(0).getId(); final RaftPeerId f1 = followers.get(1).getId(); @@ -76,8 +77,8 @@ static void runTestFollowerReadOnlyParallelRepliedIn final int count = i + 1; writeReplies.add(new Reply(count, leaderClient.async().send(INCREMENT))); - // Because of read-after-write consistency, the reads must wait for all the writes - // Therefore, the expected count is n + // Because of read-after-write consistency, the reads must wait for all the writes. + // Therefore, the expected count is n. f0Replies.add(new Reply(n, f0Client.async().sendReadOnly(QUERY, f0))); f1Replies.add(new Reply(n, f1Client.async().sendReadOnly(QUERY, f1))); @@ -88,7 +89,7 @@ static void runTestFollowerReadOnlyParallelRepliedIn assertEquals(count, leaderStateMachine.getCount()); } - // All replies should not yet complete since ReplyFlusher remains blocked + // All replies should not yet complete since ReplyFlusher remains blocked. for (int i = 0; i < n; i++) { assertFalse(writeReplies.get(i).isDone()); assertFalse(f0Replies.get(i).isDone()); @@ -101,13 +102,12 @@ static void runTestFollowerReadOnlyParallelRepliedIn assertReplyExact(n, f1Client.io().sendReadOnly(QUERY, f0)); for (int i = 0; i < n; i++) { - // write reply should get the exact count at the write time + //write reply should get the exact count at the write time writeReplies.get(i).assertExact(); - // read reply should be delayed and get the count at the unblocked time + //read reply should be delayed and get the count at the unblocked time f0Replies.get(i).assertExact(); f1Replies.get(i).assertExact(); } - } } @@ -131,4 +131,6 @@ public boolean execute(Object localId, Object remoteId, Object... args) { return true; } } + + } From 676c52b1997ab3fe2e1959bdfd9cfa33ea4c218b Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 31 Mar 2026 11:04:59 +0800 Subject: [PATCH 19/29] Add latch assertions to prevent test NPE --- .../src/test/java/org/apache/ratis/OutputStreamBaseTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java index a17cdb0d58..9821126ce6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java @@ -300,8 +300,10 @@ private void runTestKillLeader(CLUSTER cluster) throws Exception { Thread.sleep(500); running.set(false); - latch.await(5, TimeUnit.SECONDS); + final boolean latchCompleted = latch.await(5, TimeUnit.SECONDS); + Assertions.assertTrue(latchCompleted, "Writer thread did not finish within the timeout"); LOG.info("Writer success? " + success.get()); + Assertions.assertNotNull(success.get(), "Writer thread completed but success was not set"); Assertions.assertTrue(success.get()); // total number of tx should be >= result + 2, where 2 means two NoOp from // leaders. It may be larger than result+2 because the client may resend From cea054915a53e657902a32208d8ef280c9b8083e Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 31 Mar 2026 17:23:59 +0800 Subject: [PATCH 20/29] Update repliedIndex to startupLogEntry if needed --- .../java/org/apache/ratis/server/impl/LeaderStateImpl.java | 3 +++ .../main/java/org/apache/ratis/server/impl/ReplyFlusher.java | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index b0f6c24298..3197bfe34b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -450,6 +450,9 @@ boolean isReady() { void checkReady(LogEntryProto entry) { if (entry.getTerm() == server.getState().getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry)) { server.getStateMachine().leaderEvent().notifyLeaderReady(); + if (replyFlusher != null) { + replyFlusher.updateRepliedIndexToMax(entry.getIndex()); + } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 2f1975cfe0..2e210a8734 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -78,6 +78,11 @@ long getRepliedIndex() { return repliedIndex.get(); } + /** Update the replied index to at least the given value. */ + void updateRepliedIndexToMax(long newIndex) { + repliedIndex.updateToMax(newIndex, s -> LOG.debug("{}: {}", id, s)); + } + /** Hold a write reply for later batch flushing */ void hold(LongSupplier replyMethod) { replies.add(replyMethod); From 2cf81b831e469ba89c2960985d88be7229c1586c Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 31 Mar 2026 17:32:26 +0800 Subject: [PATCH 21/29] Update replyFlusher first to not blocked by notifyLeaderReady implementation --- .../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 3197bfe34b..4e9b45073a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -449,10 +449,10 @@ boolean isReady() { void checkReady(LogEntryProto entry) { if (entry.getTerm() == server.getState().getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry)) { - server.getStateMachine().leaderEvent().notifyLeaderReady(); if (replyFlusher != null) { replyFlusher.updateRepliedIndexToMax(entry.getIndex()); } + server.getStateMachine().leaderEvent().notifyLeaderReady(); } } From 85dd858a5f09f618fa3b87e0a08e9336a47c9029 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 31 Mar 2026 17:47:11 +0800 Subject: [PATCH 22/29] Initialize startupLogEntry once at ReplyFlusher start --- .../org/apache/ratis/server/impl/LeaderStateImpl.java | 8 +++----- .../java/org/apache/ratis/server/impl/ReplyFlusher.java | 8 ++------ 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 4e9b45073a..70d571236f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -436,11 +436,12 @@ void start() { // Initialize startup log entry and append it to the RaftLog startupLogEntry.get(); processor.start(); - senders.forEach(LogAppender::start); if (replyFlusher != null) { - replyFlusher.start(); + replyFlusher.start(startupLogEntry.get().startIndex); } + + senders.forEach(LogAppender::start); } boolean isReady() { @@ -449,9 +450,6 @@ boolean isReady() { void checkReady(LogEntryProto entry) { if (entry.getTerm() == server.getState().getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry)) { - if (replyFlusher != null) { - replyFlusher.updateRepliedIndexToMax(entry.getIndex()); - } server.getStateMachine().leaderEvent().notifyLeaderReady(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 2e210a8734..47e9967c11 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -78,17 +78,13 @@ long getRepliedIndex() { return repliedIndex.get(); } - /** Update the replied index to at least the given value. */ - void updateRepliedIndexToMax(long newIndex) { - repliedIndex.updateToMax(newIndex, s -> LOG.debug("{}: {}", id, s)); - } - /** Hold a write reply for later batch flushing */ void hold(LongSupplier replyMethod) { replies.add(replyMethod); } - void start() { + void start(long startIndex) { + repliedIndex.updateToMax(startIndex, s -> LOG.debug("{}: {}", id, s)); lifeCycle.transition(LifeCycle.State.STARTING); // We need to transition to RUNNING first so that ReplyFlusher#run always // see that the lifecycle state is in RUNNING state. From bcec68e70306a456409bb734de1fa6a31dc2c5e3 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 1 Apr 2026 14:30:28 +0800 Subject: [PATCH 23/29] Add some assertion logs --- .../grpc/TestLinearizableReadRepliedIndexWithGrpc.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index f58eb94518..429ba406f8 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -91,9 +91,9 @@ static void runTestFollowerReadOnlyParallelRepliedIn // All replies should not yet complete since ReplyFlusher remains blocked. for (int i = 0; i < n; i++) { - assertFalse(writeReplies.get(i).isDone()); - assertFalse(f0Replies.get(i).isDone()); - assertFalse(f1Replies.get(i).isDone()); + assertFalse(writeReplies.get(i).isDone(), "Received unexpected Write reply " + writeReplies.get(i)); + assertFalse(f0Replies.get(i).isDone(), "Received unexpected Read reply " + f0Replies.get(i)); + assertFalse(f1Replies.get(i).isDone(), "Received unexpected Read reply " + f1Replies.get(i)); } // unblock ReplyFlusher From e9fe28b9e5afeacee8f83570ebcdc0405550cfb6 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 1 Apr 2026 14:30:55 +0800 Subject: [PATCH 24/29] Disable dummy request to not cause follower client to failover to leader --- .../src/test/java/org/apache/ratis/LinearizableReadTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 6a60ba9154..6cb7442d42 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -18,6 +18,7 @@ package org.apache.ratis; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; @@ -87,6 +88,7 @@ public void setup() { RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); + RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, false); } @Test From ece8ab6b9cde60e36c6159bb1862f3667cb56c0a Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 1 Apr 2026 14:31:23 +0800 Subject: [PATCH 25/29] Fix NoSuchElementException when dummy request is disabled --- .../main/java/org/apache/ratis/client/impl/OrderedAsync.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java index fbeb4b992a..ecf4db3dce 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java @@ -198,7 +198,7 @@ private void sendRequestWithRetry(PendingOrderedRequest pending) { return; } - if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) { + if (getSlidingWindow(request).isFirst(pending.getSeqNum())) { pending.setFirstRequest(); } LOG.debug("{}: send* {}", client.getId(), request); From 632e4555a3c3ef138d945723add9376722429829 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 1 Apr 2026 15:18:31 +0800 Subject: [PATCH 26/29] Implement Reply toString --- .../test/java/org/apache/ratis/LinearizableReadTests.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 6cb7442d42..a87af767e1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -126,6 +126,14 @@ public void assertExact() { public void assertAtLeast() { assertReplyAtLeast(count, future.join()); } + + @Override + public String toString() { + return "Reply{" + + "count=" + count + + ", reply=" + (isDone() ? future.join() : "pending") + + '}'; + } } static void runTestFollowerLinearizableRead(C cluster) throws Exception { From 78db6989b728bb1aff9845fed9d69ae50558375b Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 1 Apr 2026 15:26:22 +0800 Subject: [PATCH 27/29] Add logic to warm up clients to send first request --- .../grpc/TestLinearizableReadRepliedIndexWithGrpc.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index 429ba406f8..40caa17143 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -64,11 +64,17 @@ static void runTestFollowerReadOnlyParallelRepliedIn final RaftPeerId f1 = followers.get(1).getId(); final BlockingCode blockingReplyFlusher = new BlockingCode(); - CodeInjectionForTesting.put(ReplyFlusher.FLUSH, blockingReplyFlusher); try (RaftClient leaderClient = cluster.createClient(leader.getId()); RaftClient f0Client = cluster.createClient(f0); RaftClient f1Client = cluster.createClient(f1)) { + // Warm up the clients first + assertReplyExact(0, leaderClient.async().sendReadOnly(QUERY).get()); + assertReplyExact(0, f0Client.async().sendReadOnly(QUERY, f0).get()); + assertReplyExact(0, f1Client.async().sendReadOnly(QUERY, f1).get()); + + CodeInjectionForTesting.put(ReplyFlusher.FLUSH, blockingReplyFlusher); + final int n = 10; final List writeReplies = new ArrayList<>(n); final List f0Replies = new ArrayList<>(n); From d48bcc9d54af6c96d85d9e005a497e7cd97a00c5 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 1 Apr 2026 15:38:28 +0800 Subject: [PATCH 28/29] Fix test --- ...tLinearizableReadRepliedIndexWithGrpc.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index 40caa17143..da8816b588 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -38,6 +38,7 @@ import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestLinearizableReadRepliedIndexWithGrpc extends TestLinearizableReadWithGrpc { @@ -83,10 +84,10 @@ static void runTestFollowerReadOnlyParallelRepliedIn final int count = i + 1; writeReplies.add(new Reply(count, leaderClient.async().send(INCREMENT))); - // Because of read-after-write consistency, the reads must wait for all the writes. - // Therefore, the expected count is n. - f0Replies.add(new Reply(n, f0Client.async().sendReadOnly(QUERY, f0))); - f1Replies.add(new Reply(n, f1Client.async().sendReadOnly(QUERY, f1))); + // Read reply returns immediately, but they all should return 0 since the repliedIndex has not been updated + // and write operations should not been applied by the followers + f0Replies.add(new Reply(0, f0Client.async().sendReadOnly(QUERY, f0))); + f1Replies.add(new Reply(0, f1Client.async().sendReadOnly(QUERY, f1))); // sleep in order to make sure // (1) the count is incremented, and @@ -95,11 +96,16 @@ static void runTestFollowerReadOnlyParallelRepliedIn assertEquals(count, leaderStateMachine.getCount()); } - // All replies should not yet complete since ReplyFlusher remains blocked. for (int i = 0; i < n; i++) { + // Write reply should not yet complete since ReplyFlusher remains blocked. assertFalse(writeReplies.get(i).isDone(), "Received unexpected Write reply " + writeReplies.get(i)); - assertFalse(f0Replies.get(i).isDone(), "Received unexpected Read reply " + f0Replies.get(i)); - assertFalse(f1Replies.get(i).isDone(), "Received unexpected Read reply " + f1Replies.get(i)); + + // Follower reads should be immediately served, but the read value should return the value before the + // replyFlusher is blocked + assertTrue(f0Replies.get(i).isDone(), "Follower read should return immediately"); + f0Replies.get(i).assertExact(); + assertTrue(f1Replies.get(i).isDone(), "Follower read should return immediately"); + f1Replies.get(i).assertExact(); } // unblock ReplyFlusher @@ -110,9 +116,6 @@ static void runTestFollowerReadOnlyParallelRepliedIn for (int i = 0; i < n; i++) { //write reply should get the exact count at the write time writeReplies.get(i).assertExact(); - //read reply should be delayed and get the count at the unblocked time - f0Replies.get(i).assertExact(); - f1Replies.get(i).assertExact(); } } } From a078432c4493f6a4aed1829a9a838cc7ed170e18 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 1 Apr 2026 15:44:54 +0800 Subject: [PATCH 29/29] Update comments --- .../src/test/java/org/apache/ratis/LinearizableReadTests.java | 3 +++ .../ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index a87af767e1..dd536508ce 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -88,6 +88,9 @@ public void setup() { RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); + // Disable dummy request since currently the request is implemented as a watch request + // which can cause follower client to trigger failover to leader which will cause the + // all reads to be sent to the leader, making the follower read moot. RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, false); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index da8816b588..f08346fc02 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -69,7 +69,7 @@ static void runTestFollowerReadOnlyParallelRepliedIn try (RaftClient leaderClient = cluster.createClient(leader.getId()); RaftClient f0Client = cluster.createClient(f0); RaftClient f1Client = cluster.createClient(f1)) { - // Warm up the clients first + // Warm up the clients first before blocking the reply flusher assertReplyExact(0, leaderClient.async().sendReadOnly(QUERY).get()); assertReplyExact(0, f0Client.async().sendReadOnly(QUERY, f0).get()); assertReplyExact(0, f1Client.async().sendReadOnly(QUERY, f1).get());