diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java index fbeb4b992a..ecf4db3dce 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java @@ -198,7 +198,7 @@ private void sendRequestWithRetry(PendingOrderedRequest pending) { return; } - if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) { + if (getSlidingWindow(request).isFirst(pending.getSeqNum())) { pending.setFirstRequest(); } LOG.debug("{}: send* {}", client.getId(), request); diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 67e988348c..f5189ed862 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer within this specified ti ### Read Index - Configurations related to ReadIndex used in linearizable read -| **Property** | `raft.server.read.read-index.applied-index.enabled` | -|:----------------|:----------------------------------------------------------------------| -| **Description** | whether applied index (instead of commit index) is used for ReadIndex | -| **Type** | boolean | -| **Default** | false | +| **Property** | `raft.server.read.read-index.type` | +|:----------------|:-----------------------------------------------------------------------------| +| **Description** | type of read index returned | +| **Type** | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`, `APPLIED_INDEX`, `REPLIED_INDEX` | +| **Default** | `Read.ReadIndex.Type.COMMIT_INDEX` | + +* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft Paper section 6.4) + * The safest type as it is specified in the Raft dissertation + * This ReadIndex type can be chosen if the base linearizable read from followers performance already meets expectations. + +* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex + * Allow leader to return AppliedIndex (instead of CommitIndex) as the ReadIndex + * This reduces the time follower applying logs up to ReadIndex since AppliedIndex ≤ CommitIndex + * This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high. + +* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex + * RepliedIndex is defined as the last AppliedIndex of the leader when returning the last batch. + * Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`. + * This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives. + * This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice. + * There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write. + * RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied request. + * If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX` + +Note that theoretically all the ReadIndex types still guarantee linearizability, +but there are tradeoffs (e.g. Write and Read performance) between different types. + +| **Property** | `raft.server.read.read-index.replied-index.batch-interval` | +|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------| +| **Description** | if `Read.ReadIndex.Type` is `REPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced | +| **Type** | TimeDuration | +| **Default** | 10ms | | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------| diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index ef16f67f67..2d55594782 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -280,15 +280,34 @@ static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration interface ReadIndex { String PREFIX = Read.PREFIX + ".read-index"; - String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled"; - boolean APPLIED_INDEX_ENABLED_DEFAULT = false; - static boolean appliedIndexEnabled(RaftProperties properties) { - return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY, - APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog()); + enum Type { + /** ReadIndex returns leader's commitIndex (see Raft Paper section 6.4). */ + COMMIT_INDEX, + + /** ReadIndex returns leader's appliedIndex to reduce the ReadIndex latency. */ + APPLIED_INDEX, + + /** ReadIndex returns leader's repliedIndex, the index of the last replied request. */ + REPLIED_INDEX + } + + String TYPE_KEY = PREFIX + ".type"; + Type TYPE_DEFAULT = Type.COMMIT_INDEX; + static Type type(RaftProperties properties) { + return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT, getDefaultLog()); + } + static void setType(RaftProperties properties, Type type) { + set(properties::setEnum, TYPE_KEY, type); } - static void setAppliedIndexEnabled(RaftProperties properties, boolean enabled) { - setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled); + String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX + ".replied-index.batch-interval"; + TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS); + static TimeDuration repliedIndexBatchInterval(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()), + REPLIED_INDEX_BATCH_INTERVAL_KEY, REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog()); + } + static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) { + setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index ef0bb6b700..70d571236f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -39,6 +39,7 @@ import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener; import org.apache.ratis.server.leader.FollowerInfo; import org.apache.ratis.server.leader.LeaderState; @@ -82,6 +83,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -353,10 +355,13 @@ boolean isApplied() { private final PendingStepDown pendingStepDown; private final ReadIndexHeartbeats readIndexHeartbeats; - private final boolean readIndexAppliedIndexEnabled; + private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType; + private final Supplier readIndexSupplier; private final boolean leaderHeartbeatCheckEnabled; private final LeaderLease lease; + private ReplyFlusher replyFlusher; + LeaderStateImpl(RaftServerImpl server) { this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()); this.server = server; @@ -391,8 +396,21 @@ boolean isApplied() { } else { this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests); } - this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex - .appliedIndexEnabled(properties); + + this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); + switch (readIndexType) { + case REPLIED_INDEX: + this.replyFlusher = new ReplyFlusher(server.getId(), state.getLastAppliedIndex(), + RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties)); + readIndexSupplier = replyFlusher::getRepliedIndex; + break; + case APPLIED_INDEX: + readIndexSupplier = () -> server.getState().getLastAppliedIndex(); + break; + case COMMIT_INDEX: + default: + readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex(); + } this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read .leaderHeartbeatCheckEnabled(properties); @@ -418,6 +436,11 @@ void start() { // Initialize startup log entry and append it to the RaftLog startupLogEntry.get(); processor.start(); + + if (replyFlusher != null) { + replyFlusher.start(startupLogEntry.get().startIndex); + } + senders.forEach(LogAppender::start); } @@ -453,6 +476,9 @@ CompletableFuture stop() { startupLogEntry.get().getAppliedIndexFuture().completeExceptionally( new ReadIndexException("failed to obtain read index since: ", nle)); server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); + if (replyFlusher != null) { + replyFlusher.stop(); + } logAppenderMetrics.unregister(); raftServerMetrics.unregister(); pendingRequests.close(); @@ -620,7 +646,7 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo final boolean initializing = !isCaughtUp(follower); final RaftPeerId targetId = follower.getId(); return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, getCurrentTerm(), entries, - ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()), + ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(), previous, entries.size()), initializing, previous, server.getCommitInfos(), callId); } @@ -1140,23 +1166,21 @@ public boolean checkLeadership() { /** * Obtain the current readIndex for read only requests. See Raft paper section 6.4. * 1. Leader makes sure at least one log from current term is committed. - * 2. Leader record last committed index or applied index (depending on configuration) as readIndex. + * 2. Leader record last committed index or applied index or replied index (depending on configuration) as readIndex. * 3. Leader broadcast heartbeats to followers and waits for acknowledgements. * 4. If majority respond success, returns readIndex. * @return current readIndex. */ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { - final long index = readIndexAppliedIndexEnabled ? - server.getState().getLastAppliedIndex() : server.getRaftLog().getLastCommittedIndex(); + final long index = readIndexSupplier.get(); final long readIndex; if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex > index) { readIndex = readAfterWriteConsistentIndex; } else { readIndex = index; } - LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})", - readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit", - index, readAfterWriteConsistentIndex); + LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})", + readIndex, readIndexType, index, readAfterWriteConsistentIndex); // if group contains only one member, fast path if (server.getRaftConf().isSingleton()) { @@ -1217,8 +1241,23 @@ private boolean checkLeaderLease() { && (server.getRaftConf().isSingleton() || lease.isValid()); } - void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { - pendingRequests.replyPendingRequest(termIndex, reply); + void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheImpl.CacheEntry cacheEntry) { + final PendingRequest pending = pendingRequests.remove(termIndex); + if (pending == null) { + return; + } + + final LongSupplier replyMethod = () -> { + cacheEntry.updateResult(reply); + pending.setReply(reply); + return termIndex.getIndex(); + }; + + if (readIndexType == Type.REPLIED_INDEX) { + replyFlusher.hold(replyMethod); + } else { + replyMethod.getAsLong(); + } } TransactionContext getTransactionContext(TermIndex termIndex) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index c6a9dd2794..f89d354e6a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -264,12 +264,13 @@ TransactionContext getTransactionContext(TermIndex termIndex) { return pendingRequest != null ? pendingRequest.getEntry() : null; } - void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { + /** @return the removed the {@link PendingRequest} for the given {@link TermIndex}. */ + PendingRequest remove(TermIndex termIndex) { final PendingRequest pending = pendingRequests.remove(termIndex); if (pending != null) { Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex"); - pending.setReply(reply); } + return pending; } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 958da846d2..d4c6f164e3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1837,8 +1837,12 @@ private CompletableFuture replyPendingRequest( } // update pending request - role.getLeaderState().ifPresent(leader -> leader.replyPendingRequest(termIndex, r)); - cacheEntry.updateResult(r); + final LeaderStateImpl leader = role.getLeaderState().orElse(null); + if (leader != null) { + leader.replyPendingRequest(termIndex, r, cacheEntry); + } else { + cacheEntry.updateResult(r); + } }); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java new file mode 100644 index 0000000000..47e9967c11 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.server.raftlog.RaftLogIndex; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +/** + * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. + */ +public class ReplyFlusher { + static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class); + + private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class); + public static final String FLUSH = CLASS_NAME + ".flush"; + + static class Replies { + /** When a {@link LongSupplier} is invoked, it completes a write reply and return the log index. */ + private LinkedList list = new LinkedList<>(); + + synchronized void add(LongSupplier replyMethod) { + list.add(replyMethod); + } + + synchronized LinkedList getAndSetNewList() { + final LinkedList old = list; + list = new LinkedList<>(); + return old; + } + } + + private final Object id; + private final LifeCycle lifeCycle; + private final Daemon daemon; + private final Replies replies = new Replies(); + private final RaftLogIndex repliedIndex; + /** The interval at which held write replies are flushed. */ + private final TimeDuration batchInterval; + + ReplyFlusher(Object id, long repliedIndex, TimeDuration batchInterval) { + this.id = id; + final String name = id + "-ReplyFlusher"; + this.lifeCycle = new LifeCycle(name); + this.daemon = Daemon.newBuilder() + .setName(name) + .setRunnable(this::run) + .build(); + this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex); + this.batchInterval = batchInterval; + } + + long getRepliedIndex() { + return repliedIndex.get(); + } + + /** Hold a write reply for later batch flushing */ + void hold(LongSupplier replyMethod) { + replies.add(replyMethod); + } + + void start(long startIndex) { + repliedIndex.updateToMax(startIndex, s -> LOG.debug("{}: {}", id, s)); + lifeCycle.transition(LifeCycle.State.STARTING); + // We need to transition to RUNNING first so that ReplyFlusher#run always + // see that the lifecycle state is in RUNNING state. + lifeCycle.transition(LifeCycle.State.RUNNING); + daemon.start(); + } + + /** The reply flusher daemon loop. */ + private void run() { + try { + while (lifeCycle.getCurrentState() == LifeCycle.State.RUNNING) { + batchInterval.sleep(); + flush(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("{}: Interrupted ", daemon.getName(), e); + } finally { + // Flush remaining on exit + flush(); + } + } + + /** Flush all held replies and advance {@link #repliedIndex}. */ + private void flush() { + CodeInjectionForTesting.execute(FLUSH, id, null); + + final LinkedList toFlush = replies.getAndSetNewList(); + if (toFlush.isEmpty()) { + return; + } + long maxIndex = toFlush.removeLast().getAsLong(); + for (LongSupplier held : toFlush) { + maxIndex = Math.max(maxIndex, held.getAsLong()); + } + repliedIndex.updateToMax(maxIndex, s -> + LOG.debug("{}: flushed {} replies, {}", id, toFlush.size(), s)); + } + + /** Stop the reply flusher daemon. */ + void stop() { + lifeCycle.checkStateAndClose(); + daemon.interrupt(); + try { + daemon.join(batchInterval.toLong(TimeUnit.MILLISECONDS )* 2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index b15ae3067f..dd536508ce 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -18,6 +18,7 @@ package org.apache.ratis; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; @@ -27,6 +28,7 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; @@ -45,9 +47,12 @@ import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; import static org.apache.ratis.ReadOnlyRequestTests.QUERY; import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT; +import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast; import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; /** Test for the {@link RaftServerConfigKeys.Read.Option#LINEARIZABLE} feature. */ public abstract class LinearizableReadTests @@ -56,15 +61,20 @@ public abstract class LinearizableReadTests { Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); + Slf4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } public abstract boolean isLeaderLeaseEnabled(); - public abstract boolean readIndexAppliedIndexEnabled(); + public abstract Type readIndexType(); - public abstract void assertRaftProperties(RaftProperties properties); + public final void assertRaftProperties(RaftProperties p) { + assertOption(LINEARIZABLE, p); + assertEquals(isLeaderLeaseEnabled(), RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); + assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p)); + } - void runWithNewCluster(CheckedConsumer testCase) throws Exception { + protected void runWithNewCluster(CheckedConsumer testCase) throws Exception { runWithNewCluster(3, 0, true, cluster -> { assertRaftProperties(cluster.getProperties()); testCase.accept(cluster); @@ -77,7 +87,11 @@ public void setup() { CounterStateMachine.setProperties(p); RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); - RaftServerConfigKeys.Read.ReadIndex.setAppliedIndexEnabled(p, readIndexAppliedIndexEnabled()); + RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); + // Disable dummy request since currently the request is implemented as a watch request + // which can cause follower client to trigger failover to leader which will cause the + // all reads to be sent to the leader, making the follower read moot. + RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, false); } @Test @@ -95,22 +109,34 @@ public void testFollowerLinearizableRead() throws Exception { runWithNewCluster(LinearizableReadTests::runTestFollowerLinearizableRead); } - static class Reply { + public static class Reply { private final int count; private final CompletableFuture future; - Reply(int count, CompletableFuture future) { + public Reply(int count, CompletableFuture future) { this.count = count; this.future = future; } - void assertExact() { + public boolean isDone() { + return future.isDone(); + } + + public void assertExact() { assertReplyExact(count, future.join()); } - void assertAtLeast() { + public void assertAtLeast() { assertReplyAtLeast(count, future.join()); } + + @Override + public String toString() { + return "Reply{" + + "count=" + count + + ", reply=" + (isDone() ? future.join() : "pending") + + '}'; + } } static void runTestFollowerLinearizableRead(C cluster) throws Exception { @@ -167,8 +193,9 @@ static void runTestFollowerReadOnlyParallel(C cluste count++; writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); + // sleep to let the commitIndex/appliedIndex get updated. Thread.sleep(100); - + // WAIT_AND_INCREMENT will delay 500ms to update the count, the read must wait for it. assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); } @@ -189,7 +216,7 @@ static void runTestLinearizableReadFailWhenLeaderDow final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); - Assertions.assertEquals(2, followers.size()); + assertEquals(2, followers.size()); final RaftPeerId f0 = followers.get(0).getId(); try (RaftClient leaderClient = cluster.createClient(leaderId); diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java index a17cdb0d58..9821126ce6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java @@ -300,8 +300,10 @@ private void runTestKillLeader(CLUSTER cluster) throws Exception { Thread.sleep(500); running.set(false); - latch.await(5, TimeUnit.SECONDS); + final boolean latchCompleted = latch.await(5, TimeUnit.SECONDS); + Assertions.assertTrue(latchCompleted, "Writer thread did not finish within the timeout"); LOG.info("Writer success? " + success.get()); + Assertions.assertNotNull(success.get(), "Writer thread completed but success was not set"); Assertions.assertTrue(success.get()); // total number of tx should be >= result + 2, where 2 means two NoOp from // leaders. It may be larger than result+2 because the client may resend diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index aa77ee5c77..94e9433b15 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -58,9 +58,9 @@ public abstract class ReadOnlyRequestTests static final String WAIT_AND_INCREMENT_STRING = "WAIT_AND_INCREMENT"; static final String QUERY_STRING = "QUERY"; - static final Message INCREMENT = new RaftTestUtil.SimpleMessage(INCREMENT_STRING); - static final Message WAIT_AND_INCREMENT = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING); - static final Message QUERY = new RaftTestUtil.SimpleMessage(QUERY_STRING); + public static final Message INCREMENT = new RaftTestUtil.SimpleMessage(INCREMENT_STRING); + public static final Message WAIT_AND_INCREMENT = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING); + public static final Message QUERY = new RaftTestUtil.SimpleMessage(QUERY_STRING); @BeforeEach public void setup() { @@ -144,7 +144,7 @@ static int retrieve(RaftClientReply reply) { return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8)); } - static void assertReplyExact(int expectedCount, RaftClientReply reply) { + public static void assertReplyExact(int expectedCount, RaftClientReply reply) { Assertions.assertTrue(reply.isSuccess()); final int retrieved = retrieve(reply); Assertions.assertEquals(expectedCount, retrieved, () -> "reply=" + reply); @@ -163,7 +163,7 @@ static void assertReplyAtLeast(int minCount, RaftClientReply reply) { * 2. get * 3. waitAndIncrement */ - static class CounterStateMachine extends BaseStateMachine { + public static class CounterStateMachine extends BaseStateMachine { static void setProperties(RaftProperties properties) { properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, CounterStateMachine.class, StateMachine.class); } @@ -193,6 +193,10 @@ private void sleepQuietly(int millis) { } } + public long getCount() { + return counter.get(); + } + private long increment() { return counter.incrementAndGet(); } @@ -213,6 +217,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { final LogEntryProto logEntry = trx.getLogEntry(); final TermIndex ti = TermIndex.valueOf(logEntry); updateLastAppliedTermIndex(ti); + LOG.info("{}: updateLastAppliedTermIndex {}", getId(), ti); final String command = logEntry.getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8); @@ -224,7 +229,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { } else { updatedCount = timeoutIncrement(); } - LOG.info("Applied {} command {}, updatedCount={}", ti, command, updatedCount); + LOG.info("{}: Applied {} command {}, updatedCount={}", getId(), ti, command, updatedCount); return toMessageFuture(updatedCount); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java index d637498d73..120cce48cc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java @@ -18,12 +18,7 @@ package org.apache.ratis.grpc; import org.apache.ratis.LinearizableReadTests; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.server.RaftServerConfigKeys; - -import static org.apache.ratis.ReadOnlyRequestTests.assertOption; -import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; -import static org.junit.jupiter.api.Assertions.assertTrue; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; public class TestLinearizableLeaderLeaseReadWithGrpc extends LinearizableReadTests @@ -35,14 +30,7 @@ public boolean isLeaderLeaseEnabled() { } @Override - public boolean readIndexAppliedIndexEnabled() { - return false; - } - - @Override - public void assertRaftProperties(RaftProperties p) { - assertOption(LINEARIZABLE, p); - assertTrue(RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); - assertTrue(isLeaderLeaseEnabled()); + public Type readIndexType() { + return Type.COMMIT_INDEX; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java index 9bf3e307be..3705fb3ffc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + public class TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc extends TestLinearizableLeaderLeaseReadWithGrpc { @Override - public boolean readIndexAppliedIndexEnabled() { - return true; + public Type readIndexType() { + return Type.APPLIED_INDEX; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java index c019aac166..b119f32a6f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + public class TestLinearizableReadAppliedIndexWithGrpc extends TestLinearizableReadWithGrpc { @Override - public boolean readIndexAppliedIndexEnabled() { - return true; + public Type readIndexType() { + return Type.APPLIED_INDEX; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java new file mode 100644 index 0000000000..bb50eafbfc --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc + extends TestLinearizableReadRepliedIndexWithGrpc { + + @Override + public boolean isLeaderLeaseEnabled() { + return true; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java new file mode 100644 index 0000000000..f08346fc02 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; +import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.server.impl.ReplyFlusher; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; +import static org.apache.ratis.ReadOnlyRequestTests.QUERY; +import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestLinearizableReadRepliedIndexWithGrpc + extends TestLinearizableReadWithGrpc { + + @Override + public Type readIndexType() { + return Type.REPLIED_INDEX; + } + + @Test + @Override + public void testFollowerLinearizableReadParallel() throws Exception { + runWithNewCluster(TestLinearizableReadRepliedIndexWithGrpc::runTestFollowerReadOnlyParallelRepliedIndex); + } + + static void runTestFollowerReadOnlyParallelRepliedIndex(C cluster) + throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final CounterStateMachine leaderStateMachine = (CounterStateMachine)leader.getStateMachine(); + + final List followers = cluster.getFollowers(); + Assertions.assertEquals(2, followers.size()); + final RaftPeerId f0 = followers.get(0).getId(); + final RaftPeerId f1 = followers.get(1).getId(); + + final BlockingCode blockingReplyFlusher = new BlockingCode(); + + try (RaftClient leaderClient = cluster.createClient(leader.getId()); + RaftClient f0Client = cluster.createClient(f0); + RaftClient f1Client = cluster.createClient(f1)) { + // Warm up the clients first before blocking the reply flusher + assertReplyExact(0, leaderClient.async().sendReadOnly(QUERY).get()); + assertReplyExact(0, f0Client.async().sendReadOnly(QUERY, f0).get()); + assertReplyExact(0, f1Client.async().sendReadOnly(QUERY, f1).get()); + + CodeInjectionForTesting.put(ReplyFlusher.FLUSH, blockingReplyFlusher); + + final int n = 10; + final List writeReplies = new ArrayList<>(n); + final List f0Replies = new ArrayList<>(n); + final List f1Replies = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + final int count = i + 1; + writeReplies.add(new Reply(count, leaderClient.async().send(INCREMENT))); + + // Read reply returns immediately, but they all should return 0 since the repliedIndex has not been updated + // and write operations should not been applied by the followers + f0Replies.add(new Reply(0, f0Client.async().sendReadOnly(QUERY, f0))); + f1Replies.add(new Reply(0, f1Client.async().sendReadOnly(QUERY, f1))); + + // sleep in order to make sure + // (1) the count is incremented, and + // (2) the reads will wait for the repliedIndex. + Thread.sleep(100); + assertEquals(count, leaderStateMachine.getCount()); + } + + for (int i = 0; i < n; i++) { + // Write reply should not yet complete since ReplyFlusher remains blocked. + assertFalse(writeReplies.get(i).isDone(), "Received unexpected Write reply " + writeReplies.get(i)); + + // Follower reads should be immediately served, but the read value should return the value before the + // replyFlusher is blocked + assertTrue(f0Replies.get(i).isDone(), "Follower read should return immediately"); + f0Replies.get(i).assertExact(); + assertTrue(f1Replies.get(i).isDone(), "Follower read should return immediately"); + f1Replies.get(i).assertExact(); + } + + // unblock ReplyFlusher + blockingReplyFlusher.complete(); + assertReplyExact(n, f0Client.io().sendReadOnly(QUERY, f0)); + assertReplyExact(n, f1Client.io().sendReadOnly(QUERY, f0)); + + for (int i = 0; i < n; i++) { + //write reply should get the exact count at the write time + writeReplies.get(i).assertExact(); + } + } + } + + static class BlockingCode implements CodeInjectionForTesting.Code { + private final CompletableFuture future = new CompletableFuture<>(); + + void complete() { + future.complete(null); + } + + @Override + public boolean execute(Object localId, Object remoteId, Object... args) { + final boolean blocked = !future.isDone(); + if (blocked) { + LOG.info("{}: ReplyFlusher is blocked", localId, new Throwable()); + } + future.join(); + if (blocked) { + LOG.info("{}: ReplyFlusher is unblocked", localId); + } + return true; + } + } + + +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java index 3e8860dd19..77593ff85e 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java @@ -18,12 +18,7 @@ package org.apache.ratis.grpc; import org.apache.ratis.LinearizableReadTests; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.server.RaftServerConfigKeys; - -import static org.apache.ratis.ReadOnlyRequestTests.assertOption; -import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; -import static org.junit.jupiter.api.Assertions.assertFalse; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; public class TestLinearizableReadWithGrpc extends LinearizableReadTests @@ -35,14 +30,7 @@ public boolean isLeaderLeaseEnabled() { } @Override - public boolean readIndexAppliedIndexEnabled() { - return false; - } - - @Override - public void assertRaftProperties(RaftProperties p) { - assertOption(LINEARIZABLE, p); - assertFalse(RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); - assertFalse(isLeaderLeaseEnabled()); + public Type readIndexType() { + return Type.COMMIT_INDEX; } }