From 0cf682814cdacfa6fb3d19c69b41a762d7512fa5 Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Fri, 27 Mar 2026 11:31:12 -0700 Subject: [PATCH 1/6] HDDS-14793. Add test to reproduce intermittent NPE in `XceiverClientGrpc#sendCommandAsync`. --- ...estXceiverClientGrpcConcurrentConnect.java | 335 ++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java new file mode 100644 index 000000000000..abbb6e5464f8 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.DefaultReplicationConfig; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.BucketArgs; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test reproducing HDDS-14793: intermittent NPE in + * {@link XceiverClientGrpc#sendCommandAsync} caused by a race condition + * between the two non-atomic map updates in + * {@link XceiverClientGrpc#connectToDatanode}. + * + *

Root cause: {@code channels} and {@code asyncStubs} are two separate + * {@link java.util.concurrent.ConcurrentHashMap}s updated in sequence. There + * is a window between + *

+ *   channels.computeIfAbsent(dnId, createChannel)  // DN becomes "visible"
+ *   asyncStubs.computeIfAbsent(dnId, newStub)       // stub added afterwards
+ * 
+ * during which a second thread can observe a live channel in {@code channels} + * (and therefore skip reconnect in {@code checkOpen}), then call + * {@code asyncStubs.get(dnId)} and receive {@code null}, causing an NPE. + * + *

How the race is triggered naturally here: + *

    + *
  1. An EC-3-2 pipeline has 5 DataNodes. {@link XceiverClientGrpc#connect} + * pre-connects only {@code pipeline.getFirstNode()} (DN-0).
  2. + *
  3. {@link NUM_READ_THREADS} concurrent threads all begin reading files + * at the same instant (enforced by a {@link CyclicBarrier}), sharing + * the same {@link OzoneClient} and therefore the same cached + * {@link ECXceiverClientGrpc} instance.
  4. + *
  5. Every file spans at least two EC data chunks, so each read must + * contact DN-1 (second data node) in addition to the pre-connected + * DN-0. All threads arrive at their first attempt to connect to DN-1 + * at roughly the same time.
  6. + *
  7. Thread-A wins {@code channels.computeIfAbsent(DN-1, ...)} and + * inserts the channel. Before Thread-A can run + * {@code asyncStubs.computeIfAbsent(DN-1, ...)}, Thread-B calls + * {@code checkOpen(DN-1)}, observes the channel already present, + * skips reconnect, and then executes + * {@code asyncStubs.get(DN-1)} — which returns {@code null} — causing + * the NPE.
  8. + *
+ * + *

This mirrors the robot test failure seen in production: + * {@code hadoop-ozone/dist/src/main/smoketest/freon/metadata-generate.robot} + * "[Read] File in FILE_SYSTEM_OPTIMIZED Bucket". + * + *

Note: because the race window between the two map operations is very + * small (nanoseconds of CPU time), a single run may not always trigger the + * NPE. However, when the bug is present, this test will fail on a significant + * fraction of runs. With the fix applied (the two maps are collapsed into a + * single {@code ConcurrentHashMap} of a compound value holding both the + * channel and the stub), this test should pass deterministically. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class TestXceiverClientGrpcConcurrentConnect { + + private static final Logger LOG = + LoggerFactory.getLogger(TestXceiverClientGrpcConcurrentConnect.class); + + /** + * EC-3-2 needs at least 5 DataNodes. Having all 5 active means DN-1 through + * DN-4 are NOT pre-connected after {@link XceiverClientGrpc#connect()}, + * giving the race more opportunities to manifest. + */ + private static final int NUM_DATANODES = 5; + + /** + * 1 MB EC chunk size. Each data file spans {@code 2 * CHUNK_SIZE + 1} bytes + * so that it occupies a full chunk on DN-0 and a non-empty chunk on DN-1. + * This guarantees that every read must contact the non-pre-connected DN-1. + */ + private static final int EC_CHUNK_SIZE = 1024 * 1024; + + /** + * File size is chosen so that the EC stripe uses both DN-0 (full chunk) and + * DN-1 (partial chunk). DN-0 is pre-connected; DN-1 is not. + * All {@link #NUM_READ_THREADS} threads racing to connect to DN-1 for the + * first time is what triggers the NPE. + */ + private static final int FILE_SIZE = EC_CHUNK_SIZE + 1; + + /** + * Number of pre-written files. Each thread reads a distinct file, which + * means all threads need DN-1 simultaneously — maximising the race window. + */ + private static final int NUM_FILES = 20; + + /** + * Number of concurrent reading threads. A value significantly larger than + * the number of unconnected DNs (4) increases the probability that at + * least two threads are scheduled within the nanosecond race window between + * {@code channels.computeIfAbsent} and {@code asyncStubs.computeIfAbsent}. + */ + private static final int NUM_READ_THREADS = 32; + + private static final ECReplicationConfig EC_CONFIG = + new ECReplicationConfig(3, 2, + ECReplicationConfig.EcCodec.RS, EC_CHUNK_SIZE); + + private static final String VOLUME_NAME = "vol-concurrent-read-test"; + private static final String BUCKET_NAME = "bucket-ec-fso-concurrent"; + + private MiniOzoneCluster cluster; + + private static void readFully(OzoneBucket bucket, String fileName) + throws Exception { + try (OzoneInputStream in = bucket.readFile(fileName)) { + byte[] buf = new byte[8192]; + //noinspection StatementWithEmptyBody + while (in.read(buf) != -1) { + // drain the stream + } + } + } + + /** + * Blocks at the barrier, converting checked exceptions to unchecked. + */ + private static void awaitBarrier(CyclicBarrier barrier) { + try { + barrier.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (BrokenBarrierException e) { + throw new RuntimeException(e); + } + } + + /** + * Runs {@code task}, catching any {@link Throwable} and returning it. + * Returns {@code null} on success. + */ + private static Throwable captureThrowable(ThrowingRunnable task) { + try { + task.run(); + return null; + } catch (Throwable t) { + LOG.error("Thread failed during concurrent read", t); + return t; + } + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static List collectFailures( + List> futures) throws Exception { + List failures = new ArrayList<>(); + for (Future future : futures) { + Throwable t = future.get(); + if (t != null) { + failures.add(t.toString()); + } + } + return failures; + } + + private static String fileName(int index) { + return String.format("file-%04d", index); + } + + @BeforeAll + void startClusterAndWriteFiles() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + // Allow the EC-3-2-1024k replication config used in this test. + conf.set("ozone.replication.allowed-configs", + "(^((STANDALONE|RATIS)/(ONE|THREE))|(EC/(3-2|6-3|10-4)-(512|1024|2048|4096|1)k)$)"); + conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); + + // Fast heartbeat so the cluster reaches ready state quickly. + conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, + 500, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, + 1, TimeUnit.SECONDS); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(NUM_DATANODES) + .build(); + cluster.waitForClusterToBeReady(); + // Wait for a Ratis THREE pipeline to be ready (needed for the DN pool). + cluster.waitForPipelineTobeReady( + HddsProtos.ReplicationFactor.THREE, 120_000); + + // Write files using a dedicated client that is closed before reads. + // This ensures the reading client starts with zero pre-existing connections. + try (OzoneClient writeClient = cluster.newClient()) { + OzoneVolume volume = prepareVolume(writeClient); + OzoneBucket bucket = prepareBucket(volume); + writeTestFiles(bucket); + } + } + + @AfterAll + void shutdownCluster() { + IOUtils.closeQuietly(cluster); + } + + /** + * Verifies that concurrent reads from an EC-3-2 FILE_SYSTEM_OPTIMIZED bucket + * do not produce a NullPointerException due to the race condition in + * {@link XceiverClientGrpc#connectToDatanode}. + * + *

All {@link #NUM_READ_THREADS} threads share a single {@link OzoneClient} + * (and therefore the same cached {@link ECXceiverClientGrpc} instance). + * A {@link CyclicBarrier} forces all threads to start their first read at + * exactly the same instant, maximising the probability that multiple threads + * simultaneously attempt the first connection to DN-1. + */ + @Test + public void testConcurrentFirstConnectionToDatanodeDoesNotCauseNpe() + throws Exception { + + // A fresh client has a fresh XceiverClientManager. + // ECXceiverClientGrpc.connect() will pre-connect only to DN-0. + // All reads also need DN-1 (FILE_SIZE > EC_CHUNK_SIZE), so all threads + // race on the first connection to DN-1. + try (OzoneClient readClient = cluster.newClient()) { + OzoneBucket bucket = readClient.getObjectStore() + .getVolume(VOLUME_NAME) + .getBucket(BUCKET_NAME); + + CyclicBarrier barrier = new CyclicBarrier(NUM_READ_THREADS); + ExecutorService executor = Executors.newFixedThreadPool(NUM_READ_THREADS); + List> futures = new ArrayList<>(NUM_READ_THREADS); + + for (int t = 0; t < NUM_READ_THREADS; t++) { + final String fileName = fileName(t % NUM_FILES); + futures.add(executor.submit(() -> captureThrowable(() -> { + // All threads cross the barrier together so their first reads — + // and therefore their first connection attempts to DN-1 — are + // concurrent, hitting the race window between + // channels.computeIfAbsent and asyncStubs.computeIfAbsent. + awaitBarrier(barrier); + readFully(bucket, fileName); + }))); + } + + executor.shutdown(); + boolean terminated = executor.awaitTermination(120, TimeUnit.SECONDS); + assertThat(terminated).as("Executor timed out").isTrue(); + + List failures = collectFailures(futures); + assertThat(failures) + .as("Concurrent reads produced failures. Without the fix, " + + "asyncStubs.get(dnId) returns null when channels already has " + + "an entry for dnId but asyncStubs has not yet been updated " + + "(race between channels.computeIfAbsent and " + + "asyncStubs.computeIfAbsent in XceiverClientGrpc). " + + "Failures: " + failures) + .isEmpty(); + } + } + + private OzoneVolume prepareVolume(OzoneClient client) throws Exception { + client.getObjectStore().createVolume(VOLUME_NAME); + return client.getObjectStore().getVolume(VOLUME_NAME); + } + + private OzoneBucket prepareBucket(OzoneVolume volume) throws Exception { + volume.createBucket(BUCKET_NAME, BucketArgs.newBuilder() + .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED) + .setDefaultReplicationConfig(new DefaultReplicationConfig(EC_CONFIG)) + .build()); + return volume.getBucket(BUCKET_NAME); + } + + private void writeTestFiles(OzoneBucket bucket) throws Exception { + byte[] content = new byte[FILE_SIZE]; + new Random().nextBytes(content); + for (int i = 0; i < NUM_FILES; i++) { + try (OzoneOutputStream out = + bucket.createFile(fileName(i), FILE_SIZE, EC_CONFIG, + true, false)) { + out.write(content); + } + } + LOG.info("Wrote {} test files of {} bytes each to EC-3-2 bucket {}", + NUM_FILES, FILE_SIZE, BUCKET_NAME); + } + + @FunctionalInterface + private interface ThrowingRunnable { + void run() throws Exception; + } +} From bfdcec7d3695055385fb5f91b5bfb7ed18cec468 Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Sat, 28 Mar 2026 23:49:19 -0700 Subject: [PATCH 2/6] HDDS-14793. Fix race condition in `XceiverClientGrpc#connectToDatanode` causing intermittent NPEs. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 141 ++++---- ...estXceiverClientGrpcConcurrentConnect.java | 335 ------------------ 2 files changed, 75 insertions(+), 401 deletions(-) delete mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 7c639e766eb6..d3492c288da4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -91,14 +91,13 @@ public class XceiverClientGrpc extends XceiverClientSpi { private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5; private final Pipeline pipeline; private final ConfigurationSource config; - private final Map asyncStubs; private final XceiverClientMetrics metrics; - private final Map channels; private final Semaphore semaphore; private long timeout; private final SecurityConfig secConfig; private final boolean topologyAwareRead; private final ClientTrustManager trustManager; + private final Map dnChannelInfoMap; // Cache the DN which returned the GetBlock command so that the ReadChunk // command can be sent to the same DN. private final Map getBlockDNcache; @@ -126,8 +125,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config, this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); - this.channels = new ConcurrentHashMap<>(); - this.asyncStubs = new ConcurrentHashMap<>(); + this.dnChannelInfoMap = new ConcurrentHashMap<>(); this.topologyAwareRead = config.getBoolean( OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); @@ -161,8 +159,7 @@ public void connect() throws Exception { connectToDatanode(dn); } - private void connectToDatanode(DatanodeDetails dn) - throws IOException { + private void connectToDatanode(DatanodeDetails dn) throws IOException { if (isClosed.get()) { throw new IOException("Client is closed."); } @@ -170,40 +167,47 @@ private void connectToDatanode(DatanodeDetails dn) if (isConnected(dn)) { return; } - // read port from the data node, on failure use default configured port - int port = dn.getStandalonePort().getValue(); - if (port == 0) { - port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, - OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT); - } - final int finalPort = port; - LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, pipeline.getNodes()); + LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); - channels.computeIfPresent(dn.getID(), (dnId, channel) -> { - if (channel.isTerminated() || channel.isShutdown()) { - asyncStubs.remove(dnId); - return null; // removes from channels map - } + removeStaleChannel(dn); + generateNewChannel(dn); + } - return channel; - }); + /** + * Checks if the client has a live connection channel to the specified Datanode. + * + * @return True if the connection is alive, false otherwise. + */ + @VisibleForTesting + public boolean isConnected(DatanodeDetails details) { + if (details == null || !dnChannelInfoMap.containsKey(details.getID())) { + return false; + } - ManagedChannel channel; - try { - channel = channels.computeIfAbsent(dn.getID(), dnId -> { - try { - return createChannel(dn, finalPort).build(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } catch (RuntimeException e) { - LOG.error("Failed to create channel to datanode {}", dn, e); - throw new IOException(e.getCause()); + ManagedChannel channel = dnChannelInfoMap.get(details.getID()).getChannel(); + return channel != null + && !channel.isTerminated() + && !channel.isShutdown(); + } + + private void removeStaleChannel(DatanodeDetails dn) { + if (!isConnected(dn)) { + dnChannelInfoMap.remove(dn.getID()); + } + } + + private void generateNewChannel(DatanodeDetails dn) throws IOException { + // read port from the data node, on failure use default configured port + int port = dn.getStandalonePort().getValue(); + if (port == 0) { + port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT); } - asyncStubs.computeIfAbsent(dn.getID(), dnId -> XceiverClientProtocolServiceGrpc.newStub(channel)); + ManagedChannel channel = createChannel(dn, port).build(); + XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel); + ChannelInfo channelInfo = new ChannelInfo(channel, stub); + dnChannelInfoMap.put(dn.getID(), channelInfo); } protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) @@ -240,21 +244,6 @@ private boolean datanodeUseHostName() { HddsConfigKeys.HDDS_DATANODE_USE_DN_HOSTNAME_DEFAULT); } - /** - * Checks if the client has a live connection channel to the specified - * Datanode. - * - * @return True if the connection is alive, false otherwise. - */ - @VisibleForTesting - public boolean isConnected(DatanodeDetails details) { - return isConnected(channels.get(details.getID())); - } - - private boolean isConnected(ManagedChannel channel) { - return channel != null && !channel.isTerminated() && !channel.isShutdown(); - } - /** * Closes all the communication channels of the client one-by-one. * When a channel is closed, no further requests can be sent via the channel, @@ -267,13 +256,16 @@ public void close() { return; } - for (ManagedChannel channel : channels.values()) { - channel.shutdown(); + for (ChannelInfo channelInfo : dnChannelInfoMap.values()) { + channelInfo.getChannel().shutdown(); } final long maxWaitNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS); long deadline = System.nanoTime() + maxWaitNanos; - List nonTerminatedChannels = new ArrayList<>(channels.values()); + List nonTerminatedChannels = dnChannelInfoMap.values() + .stream() + .map(ChannelInfo::getChannel) + .collect(Collectors.toList()); while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) { nonTerminatedChannels.removeIf(ManagedChannel::isTerminated); @@ -286,16 +278,17 @@ public void close() { } } - List failedChannels = channels.entrySet().stream() - .filter(e -> !e.getValue().isTerminated()) + List failedChannels = dnChannelInfoMap.entrySet() + .stream() + .filter(e -> !e.getValue().getChannel().isTerminated()) .map(Map.Entry::getKey) .collect(Collectors.toList()); + if (!failedChannels.isEmpty()) { LOG.warn("Channels {} did not terminate within timeout.", failedChannels); } - channels.clear(); - asyncStubs.clear(); + dnChannelInfoMap.clear(); } @Override @@ -581,7 +574,7 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t try { checkOpen(dn); semaphore.acquire(); - XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID()); + XceiverClientProtocolServiceStub stub = dnChannelInfoMap.get(dn.getID()).getStub(); if (stub == null) { throw new IOException("Failed to get gRPC stub for DataNode: " + dn); } @@ -698,7 +691,7 @@ public XceiverClientReply sendCommandAsync( // create a new grpc message stream pair for each call. final StreamObserver requestObserver = - asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS) + dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS) .send(new StreamObserver() { @Override public void onNext(ContainerCommandResponseProto value) { @@ -743,26 +736,21 @@ private void checkOpen(DatanodeDetails dn) throw new IOException("This channel is not connected."); } - ManagedChannel channel = channels.get(dn.getID()); - // If the channel doesn't exist for this specific datanode or the channel - // is closed, just reconnect - if (!isConnected(channel)) { + // If the channel doesn't exist for this specific datanode or the channel is closed, just reconnect + if (!isConnected(dn)) { reconnect(dn); } - } private void reconnect(DatanodeDetails dn) throws IOException { - ManagedChannel channel; try { connectToDatanode(dn); - channel = channels.get(dn.getID()); } catch (Exception e) { throw new IOException("Error while connecting", e); } - if (!isConnected(channel)) { + if (!isConnected(dn)) { throw new IOException("This channel is not connected."); } } @@ -784,4 +772,25 @@ public ConfigurationSource getConfig() { public void setTimeout(long timeout) { this.timeout = timeout; } + + /** + * Group the channel and stub so that they are published together. + */ + private class ChannelInfo { + private ManagedChannel channel; + private XceiverClientProtocolServiceStub stub; + + ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) { + this.channel = channel; + this.stub = stub; + } + + public ManagedChannel getChannel() { + return channel; + } + + public XceiverClientProtocolServiceStub getStub() { + return stub; + } + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java deleted file mode 100644 index abbb6e5464f8..000000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpcConcurrentConnect.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.client.DefaultReplicationConfig; -import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.utils.IOUtils; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.client.BucketArgs; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneVolume; -import org.apache.hadoop.ozone.client.io.OzoneInputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.om.helpers.BucketLayout; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Integration test reproducing HDDS-14793: intermittent NPE in - * {@link XceiverClientGrpc#sendCommandAsync} caused by a race condition - * between the two non-atomic map updates in - * {@link XceiverClientGrpc#connectToDatanode}. - * - *

Root cause: {@code channels} and {@code asyncStubs} are two separate - * {@link java.util.concurrent.ConcurrentHashMap}s updated in sequence. There - * is a window between - *

- *   channels.computeIfAbsent(dnId, createChannel)  // DN becomes "visible"
- *   asyncStubs.computeIfAbsent(dnId, newStub)       // stub added afterwards
- * 
- * during which a second thread can observe a live channel in {@code channels} - * (and therefore skip reconnect in {@code checkOpen}), then call - * {@code asyncStubs.get(dnId)} and receive {@code null}, causing an NPE. - * - *

How the race is triggered naturally here: - *

    - *
  1. An EC-3-2 pipeline has 5 DataNodes. {@link XceiverClientGrpc#connect} - * pre-connects only {@code pipeline.getFirstNode()} (DN-0).
  2. - *
  3. {@link NUM_READ_THREADS} concurrent threads all begin reading files - * at the same instant (enforced by a {@link CyclicBarrier}), sharing - * the same {@link OzoneClient} and therefore the same cached - * {@link ECXceiverClientGrpc} instance.
  4. - *
  5. Every file spans at least two EC data chunks, so each read must - * contact DN-1 (second data node) in addition to the pre-connected - * DN-0. All threads arrive at their first attempt to connect to DN-1 - * at roughly the same time.
  6. - *
  7. Thread-A wins {@code channels.computeIfAbsent(DN-1, ...)} and - * inserts the channel. Before Thread-A can run - * {@code asyncStubs.computeIfAbsent(DN-1, ...)}, Thread-B calls - * {@code checkOpen(DN-1)}, observes the channel already present, - * skips reconnect, and then executes - * {@code asyncStubs.get(DN-1)} — which returns {@code null} — causing - * the NPE.
  8. - *
- * - *

This mirrors the robot test failure seen in production: - * {@code hadoop-ozone/dist/src/main/smoketest/freon/metadata-generate.robot} - * "[Read] File in FILE_SYSTEM_OPTIMIZED Bucket". - * - *

Note: because the race window between the two map operations is very - * small (nanoseconds of CPU time), a single run may not always trigger the - * NPE. However, when the bug is present, this test will fail on a significant - * fraction of runs. With the fix applied (the two maps are collapsed into a - * single {@code ConcurrentHashMap} of a compound value holding both the - * channel and the stub), this test should pass deterministically. - */ -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class TestXceiverClientGrpcConcurrentConnect { - - private static final Logger LOG = - LoggerFactory.getLogger(TestXceiverClientGrpcConcurrentConnect.class); - - /** - * EC-3-2 needs at least 5 DataNodes. Having all 5 active means DN-1 through - * DN-4 are NOT pre-connected after {@link XceiverClientGrpc#connect()}, - * giving the race more opportunities to manifest. - */ - private static final int NUM_DATANODES = 5; - - /** - * 1 MB EC chunk size. Each data file spans {@code 2 * CHUNK_SIZE + 1} bytes - * so that it occupies a full chunk on DN-0 and a non-empty chunk on DN-1. - * This guarantees that every read must contact the non-pre-connected DN-1. - */ - private static final int EC_CHUNK_SIZE = 1024 * 1024; - - /** - * File size is chosen so that the EC stripe uses both DN-0 (full chunk) and - * DN-1 (partial chunk). DN-0 is pre-connected; DN-1 is not. - * All {@link #NUM_READ_THREADS} threads racing to connect to DN-1 for the - * first time is what triggers the NPE. - */ - private static final int FILE_SIZE = EC_CHUNK_SIZE + 1; - - /** - * Number of pre-written files. Each thread reads a distinct file, which - * means all threads need DN-1 simultaneously — maximising the race window. - */ - private static final int NUM_FILES = 20; - - /** - * Number of concurrent reading threads. A value significantly larger than - * the number of unconnected DNs (4) increases the probability that at - * least two threads are scheduled within the nanosecond race window between - * {@code channels.computeIfAbsent} and {@code asyncStubs.computeIfAbsent}. - */ - private static final int NUM_READ_THREADS = 32; - - private static final ECReplicationConfig EC_CONFIG = - new ECReplicationConfig(3, 2, - ECReplicationConfig.EcCodec.RS, EC_CHUNK_SIZE); - - private static final String VOLUME_NAME = "vol-concurrent-read-test"; - private static final String BUCKET_NAME = "bucket-ec-fso-concurrent"; - - private MiniOzoneCluster cluster; - - private static void readFully(OzoneBucket bucket, String fileName) - throws Exception { - try (OzoneInputStream in = bucket.readFile(fileName)) { - byte[] buf = new byte[8192]; - //noinspection StatementWithEmptyBody - while (in.read(buf) != -1) { - // drain the stream - } - } - } - - /** - * Blocks at the barrier, converting checked exceptions to unchecked. - */ - private static void awaitBarrier(CyclicBarrier barrier) { - try { - barrier.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (BrokenBarrierException e) { - throw new RuntimeException(e); - } - } - - /** - * Runs {@code task}, catching any {@link Throwable} and returning it. - * Returns {@code null} on success. - */ - private static Throwable captureThrowable(ThrowingRunnable task) { - try { - task.run(); - return null; - } catch (Throwable t) { - LOG.error("Thread failed during concurrent read", t); - return t; - } - } - - // --------------------------------------------------------------------------- - // Helpers - // --------------------------------------------------------------------------- - - private static List collectFailures( - List> futures) throws Exception { - List failures = new ArrayList<>(); - for (Future future : futures) { - Throwable t = future.get(); - if (t != null) { - failures.add(t.toString()); - } - } - return failures; - } - - private static String fileName(int index) { - return String.format("file-%04d", index); - } - - @BeforeAll - void startClusterAndWriteFiles() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - - // Allow the EC-3-2-1024k replication config used in this test. - conf.set("ozone.replication.allowed-configs", - "(^((STANDALONE|RATIS)/(ONE|THREE))|(EC/(3-2|6-3|10-4)-(512|1024|2048|4096|1)k)$)"); - conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); - - // Fast heartbeat so the cluster reaches ready state quickly. - conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, - 500, TimeUnit.MILLISECONDS); - conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, - 1, TimeUnit.SECONDS); - - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(NUM_DATANODES) - .build(); - cluster.waitForClusterToBeReady(); - // Wait for a Ratis THREE pipeline to be ready (needed for the DN pool). - cluster.waitForPipelineTobeReady( - HddsProtos.ReplicationFactor.THREE, 120_000); - - // Write files using a dedicated client that is closed before reads. - // This ensures the reading client starts with zero pre-existing connections. - try (OzoneClient writeClient = cluster.newClient()) { - OzoneVolume volume = prepareVolume(writeClient); - OzoneBucket bucket = prepareBucket(volume); - writeTestFiles(bucket); - } - } - - @AfterAll - void shutdownCluster() { - IOUtils.closeQuietly(cluster); - } - - /** - * Verifies that concurrent reads from an EC-3-2 FILE_SYSTEM_OPTIMIZED bucket - * do not produce a NullPointerException due to the race condition in - * {@link XceiverClientGrpc#connectToDatanode}. - * - *

All {@link #NUM_READ_THREADS} threads share a single {@link OzoneClient} - * (and therefore the same cached {@link ECXceiverClientGrpc} instance). - * A {@link CyclicBarrier} forces all threads to start their first read at - * exactly the same instant, maximising the probability that multiple threads - * simultaneously attempt the first connection to DN-1. - */ - @Test - public void testConcurrentFirstConnectionToDatanodeDoesNotCauseNpe() - throws Exception { - - // A fresh client has a fresh XceiverClientManager. - // ECXceiverClientGrpc.connect() will pre-connect only to DN-0. - // All reads also need DN-1 (FILE_SIZE > EC_CHUNK_SIZE), so all threads - // race on the first connection to DN-1. - try (OzoneClient readClient = cluster.newClient()) { - OzoneBucket bucket = readClient.getObjectStore() - .getVolume(VOLUME_NAME) - .getBucket(BUCKET_NAME); - - CyclicBarrier barrier = new CyclicBarrier(NUM_READ_THREADS); - ExecutorService executor = Executors.newFixedThreadPool(NUM_READ_THREADS); - List> futures = new ArrayList<>(NUM_READ_THREADS); - - for (int t = 0; t < NUM_READ_THREADS; t++) { - final String fileName = fileName(t % NUM_FILES); - futures.add(executor.submit(() -> captureThrowable(() -> { - // All threads cross the barrier together so their first reads — - // and therefore their first connection attempts to DN-1 — are - // concurrent, hitting the race window between - // channels.computeIfAbsent and asyncStubs.computeIfAbsent. - awaitBarrier(barrier); - readFully(bucket, fileName); - }))); - } - - executor.shutdown(); - boolean terminated = executor.awaitTermination(120, TimeUnit.SECONDS); - assertThat(terminated).as("Executor timed out").isTrue(); - - List failures = collectFailures(futures); - assertThat(failures) - .as("Concurrent reads produced failures. Without the fix, " - + "asyncStubs.get(dnId) returns null when channels already has " - + "an entry for dnId but asyncStubs has not yet been updated " - + "(race between channels.computeIfAbsent and " - + "asyncStubs.computeIfAbsent in XceiverClientGrpc). " - + "Failures: " + failures) - .isEmpty(); - } - } - - private OzoneVolume prepareVolume(OzoneClient client) throws Exception { - client.getObjectStore().createVolume(VOLUME_NAME); - return client.getObjectStore().getVolume(VOLUME_NAME); - } - - private OzoneBucket prepareBucket(OzoneVolume volume) throws Exception { - volume.createBucket(BUCKET_NAME, BucketArgs.newBuilder() - .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED) - .setDefaultReplicationConfig(new DefaultReplicationConfig(EC_CONFIG)) - .build()); - return volume.getBucket(BUCKET_NAME); - } - - private void writeTestFiles(OzoneBucket bucket) throws Exception { - byte[] content = new byte[FILE_SIZE]; - new Random().nextBytes(content); - for (int i = 0; i < NUM_FILES; i++) { - try (OzoneOutputStream out = - bucket.createFile(fileName(i), FILE_SIZE, EC_CONFIG, - true, false)) { - out.write(content); - } - } - LOG.info("Wrote {} test files of {} bytes each to EC-3-2 bucket {}", - NUM_FILES, FILE_SIZE, BUCKET_NAME); - } - - @FunctionalInterface - private interface ThrowingRunnable { - void run() throws Exception; - } -} From cfc88c5b705cbf576457e9cf6de2d2105e4b2c2f Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Sun, 29 Mar 2026 00:31:15 -0700 Subject: [PATCH 3/6] HDDS-14793. Make ChannelInfo static to fix findbugs --- .../main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index d3492c288da4..ff2e937bc4ae 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -776,7 +776,7 @@ public void setTimeout(long timeout) { /** * Group the channel and stub so that they are published together. */ - private class ChannelInfo { + private static class ChannelInfo { private ManagedChannel channel; private XceiverClientProtocolServiceStub stub; From 30942d75c9a151fd6829a3d396445c4e1151d08a Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Sun, 29 Mar 2026 01:32:25 -0700 Subject: [PATCH 4/6] HDDS-14793. Replace Map with ConcurrentMap --- .../java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index ff2e937bc4ae..01239e0e0e38 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -31,6 +31,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -97,7 +98,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { private final SecurityConfig secConfig; private final boolean topologyAwareRead; private final ClientTrustManager trustManager; - private final Map dnChannelInfoMap; + private final ConcurrentMap dnChannelInfoMap; // Cache the DN which returned the GetBlock command so that the ReadChunk // command can be sent to the same DN. private final Map getBlockDNcache; From e3d07105912652871e0dcaac4036b6ef1ad3740b Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Sun, 29 Mar 2026 16:00:36 -0700 Subject: [PATCH 5/6] HDDS-14793. Fix review comments --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 01239e0e0e38..c0c711c89440 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -165,40 +166,23 @@ private void connectToDatanode(DatanodeDetails dn) throws IOException { throw new IOException("Client is closed."); } - if (isConnected(dn)) { - return; - } - - LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); - - removeStaleChannel(dn); - generateNewChannel(dn); - } - - /** - * Checks if the client has a live connection channel to the specified Datanode. - * - * @return True if the connection is alive, false otherwise. - */ - @VisibleForTesting - public boolean isConnected(DatanodeDetails details) { - if (details == null || !dnChannelInfoMap.containsKey(details.getID())) { - return false; - } - - ManagedChannel channel = dnChannelInfoMap.get(details.getID()).getChannel(); - return channel != null - && !channel.isTerminated() - && !channel.isShutdown(); - } + dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> { + // channel is absent or stale + if (channelInfo == null || channelInfo.isChannelInactive()) { + LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); + try { + return generateNewChannel(dn); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } - private void removeStaleChannel(DatanodeDetails dn) { - if (!isConnected(dn)) { - dnChannelInfoMap.remove(dn.getID()); - } + // channel is present and active + return channelInfo; + }); } - private void generateNewChannel(DatanodeDetails dn) throws IOException { + private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException { // read port from the data node, on failure use default configured port int port = dn.getStandalonePort().getValue(); if (port == 0) { @@ -207,8 +191,7 @@ private void generateNewChannel(DatanodeDetails dn) throws IOException { ManagedChannel channel = createChannel(dn, port).build(); XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel); - ChannelInfo channelInfo = new ChannelInfo(channel, stub); - dnChannelInfoMap.put(dn.getID(), channelInfo); + return new ChannelInfo(channel, stub); } protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) @@ -245,6 +228,21 @@ private boolean datanodeUseHostName() { HddsConfigKeys.HDDS_DATANODE_USE_DN_HOSTNAME_DEFAULT); } + /** + * Checks if the client has a live connection channel to the specified + * Datanode. + * + * @return True if the connection is alive, false otherwise. + */ + @VisibleForTesting + public boolean isConnected(DatanodeDetails details) { + if (details == null || !dnChannelInfoMap.containsKey(details.getID())) { + return false; + } + + return !dnChannelInfoMap.get(details.getID()).isChannelInactive(); + } + /** * Closes all the communication channels of the client one-by-one. * When a channel is closed, no further requests can be sent via the channel, @@ -778,8 +776,8 @@ public void setTimeout(long timeout) { * Group the channel and stub so that they are published together. */ private static class ChannelInfo { - private ManagedChannel channel; - private XceiverClientProtocolServiceStub stub; + private final ManagedChannel channel; + private final XceiverClientProtocolServiceStub stub; ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) { this.channel = channel; @@ -793,5 +791,11 @@ public ManagedChannel getChannel() { public XceiverClientProtocolServiceStub getStub() { return stub; } + + public boolean isChannelInactive() { + return channel == null + || channel.isTerminated() + || channel.isShutdown(); + } } } From 3b137a7bba3e0826e93fc5d2f4fe8f30e6f49647 Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Thu, 2 Apr 2026 17:27:49 -0700 Subject: [PATCH 6/6] HDDS-14793. Improve error handling and null checks in XceiverClientGrpc to prevent runtime exceptions. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index c0c711c89440..a9dbcb9456c3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -166,20 +166,25 @@ private void connectToDatanode(DatanodeDetails dn) throws IOException { throw new IOException("Client is closed."); } - dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> { - // channel is absent or stale - if (channelInfo == null || channelInfo.isChannelInactive()) { - LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); - try { - return generateNewChannel(dn); - } catch (IOException e) { - throw new UncheckedIOException(e); + try { + dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> { + // channel is absent or stale + if (channelInfo == null || channelInfo.isChannelInactive()) { + LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); + try { + return generateNewChannel(dn); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - } - // channel is present and active - return channelInfo; - }); + // channel is present and active + return channelInfo; + }); + } catch (UncheckedIOException e) { + LOG.error("Failed to create channel to datanode {}", dn, e); + throw e.getCause(); + } } private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException { @@ -236,11 +241,12 @@ private boolean datanodeUseHostName() { */ @VisibleForTesting public boolean isConnected(DatanodeDetails details) { - if (details == null || !dnChannelInfoMap.containsKey(details.getID())) { + if (details == null) { return false; } - return !dnChannelInfoMap.get(details.getID()).isChannelInactive(); + ChannelInfo channelInfo = dnChannelInfoMap.get(details.getID()); + return channelInfo != null && !channelInfo.isChannelInactive(); } /** @@ -264,6 +270,7 @@ public void close() { List nonTerminatedChannels = dnChannelInfoMap.values() .stream() .map(ChannelInfo::getChannel) + .filter(Objects::nonNull) .collect(Collectors.toList()); while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) { @@ -731,18 +738,6 @@ private void decreasePendingMetricsAndReleaseSemaphore() { private void checkOpen(DatanodeDetails dn) throws IOException { - if (isClosed.get()) { - throw new IOException("This channel is not connected."); - } - - // If the channel doesn't exist for this specific datanode or the channel is closed, just reconnect - if (!isConnected(dn)) { - reconnect(dn); - } - } - - private void reconnect(DatanodeDetails dn) - throws IOException { try { connectToDatanode(dn); } catch (Exception e) {