diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 7c639e766eb6..a9dbcb9456c3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -31,6 +32,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -91,14 +93,13 @@ public class XceiverClientGrpc extends XceiverClientSpi { private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5; private final Pipeline pipeline; private final ConfigurationSource config; - private final Map asyncStubs; private final XceiverClientMetrics metrics; - private final Map channels; private final Semaphore semaphore; private long timeout; private final SecurityConfig secConfig; private final boolean topologyAwareRead; private final ClientTrustManager trustManager; + private final ConcurrentMap dnChannelInfoMap; // Cache the DN which returned the GetBlock command so that the ReadChunk // command can be sent to the same DN. private final Map getBlockDNcache; @@ -126,8 +127,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config, this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); - this.channels = new ConcurrentHashMap<>(); - this.asyncStubs = new ConcurrentHashMap<>(); + this.dnChannelInfoMap = new ConcurrentHashMap<>(); this.topologyAwareRead = config.getBoolean( OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); @@ -161,49 +161,42 @@ public void connect() throws Exception { connectToDatanode(dn); } - private void connectToDatanode(DatanodeDetails dn) - throws IOException { + private void connectToDatanode(DatanodeDetails dn) throws IOException { if (isClosed.get()) { throw new IOException("Client is closed."); } - if (isConnected(dn)) { - return; - } - // read port from the data node, on failure use default configured port - int port = dn.getStandalonePort().getValue(); - if (port == 0) { - port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, - OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT); - } - final int finalPort = port; - - LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, pipeline.getNodes()); - - channels.computeIfPresent(dn.getID(), (dnId, channel) -> { - if (channel.isTerminated() || channel.isShutdown()) { - asyncStubs.remove(dnId); - return null; // removes from channels map - } - - return channel; - }); - - ManagedChannel channel; try { - channel = channels.computeIfAbsent(dn.getID(), dnId -> { - try { - return createChannel(dn, finalPort).build(); - } catch (IOException e) { - throw new RuntimeException(e); + dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> { + // channel is absent or stale + if (channelInfo == null || channelInfo.isChannelInactive()) { + LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); + try { + return generateNewChannel(dn); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } + + // channel is present and active + return channelInfo; }); - } catch (RuntimeException e) { + } catch (UncheckedIOException e) { LOG.error("Failed to create channel to datanode {}", dn, e); - throw new IOException(e.getCause()); + throw e.getCause(); + } + } + + private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException { + // read port from the data node, on failure use default configured port + int port = dn.getStandalonePort().getValue(); + if (port == 0) { + port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT); } - asyncStubs.computeIfAbsent(dn.getID(), dnId -> XceiverClientProtocolServiceGrpc.newStub(channel)); + ManagedChannel channel = createChannel(dn, port).build(); + XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel); + return new ChannelInfo(channel, stub); } protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) @@ -248,11 +241,12 @@ private boolean datanodeUseHostName() { */ @VisibleForTesting public boolean isConnected(DatanodeDetails details) { - return isConnected(channels.get(details.getID())); - } + if (details == null) { + return false; + } - private boolean isConnected(ManagedChannel channel) { - return channel != null && !channel.isTerminated() && !channel.isShutdown(); + ChannelInfo channelInfo = dnChannelInfoMap.get(details.getID()); + return channelInfo != null && !channelInfo.isChannelInactive(); } /** @@ -267,13 +261,17 @@ public void close() { return; } - for (ManagedChannel channel : channels.values()) { - channel.shutdown(); + for (ChannelInfo channelInfo : dnChannelInfoMap.values()) { + channelInfo.getChannel().shutdown(); } final long maxWaitNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS); long deadline = System.nanoTime() + maxWaitNanos; - List nonTerminatedChannels = new ArrayList<>(channels.values()); + List nonTerminatedChannels = dnChannelInfoMap.values() + .stream() + .map(ChannelInfo::getChannel) + .filter(Objects::nonNull) + .collect(Collectors.toList()); while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) { nonTerminatedChannels.removeIf(ManagedChannel::isTerminated); @@ -286,16 +284,17 @@ public void close() { } } - List failedChannels = channels.entrySet().stream() - .filter(e -> !e.getValue().isTerminated()) + List failedChannels = dnChannelInfoMap.entrySet() + .stream() + .filter(e -> !e.getValue().getChannel().isTerminated()) .map(Map.Entry::getKey) .collect(Collectors.toList()); + if (!failedChannels.isEmpty()) { LOG.warn("Channels {} did not terminate within timeout.", failedChannels); } - channels.clear(); - asyncStubs.clear(); + dnChannelInfoMap.clear(); } @Override @@ -581,7 +580,7 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t try { checkOpen(dn); semaphore.acquire(); - XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID()); + XceiverClientProtocolServiceStub stub = dnChannelInfoMap.get(dn.getID()).getStub(); if (stub == null) { throw new IOException("Failed to get gRPC stub for DataNode: " + dn); } @@ -698,7 +697,7 @@ public XceiverClientReply sendCommandAsync( // create a new grpc message stream pair for each call. final StreamObserver requestObserver = - asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS) + dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS) .send(new StreamObserver() { @Override public void onNext(ContainerCommandResponseProto value) { @@ -739,30 +738,13 @@ private void decreasePendingMetricsAndReleaseSemaphore() { private void checkOpen(DatanodeDetails dn) throws IOException { - if (isClosed.get()) { - throw new IOException("This channel is not connected."); - } - - ManagedChannel channel = channels.get(dn.getID()); - // If the channel doesn't exist for this specific datanode or the channel - // is closed, just reconnect - if (!isConnected(channel)) { - reconnect(dn); - } - - } - - private void reconnect(DatanodeDetails dn) - throws IOException { - ManagedChannel channel; try { connectToDatanode(dn); - channel = channels.get(dn.getID()); } catch (Exception e) { throw new IOException("Error while connecting", e); } - if (!isConnected(channel)) { + if (!isConnected(dn)) { throw new IOException("This channel is not connected."); } } @@ -784,4 +766,31 @@ public ConfigurationSource getConfig() { public void setTimeout(long timeout) { this.timeout = timeout; } + + /** + * Group the channel and stub so that they are published together. + */ + private static class ChannelInfo { + private final ManagedChannel channel; + private final XceiverClientProtocolServiceStub stub; + + ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) { + this.channel = channel; + this.stub = stub; + } + + public ManagedChannel getChannel() { + return channel; + } + + public XceiverClientProtocolServiceStub getStub() { + return stub; + } + + public boolean isChannelInactive() { + return channel == null + || channel.isTerminated() + || channel.isShutdown(); + } + } }