-
Notifications
You must be signed in to change notification settings - Fork 597
HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. #9997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. #9997
Changes from all commits
0cf6828
bfdcec7
cfc88c5
30942d7
e3d0710
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |||||||||||
| import com.google.common.annotations.VisibleForTesting; | ||||||||||||
| import java.io.IOException; | ||||||||||||
| import java.io.InterruptedIOException; | ||||||||||||
| import java.io.UncheckedIOException; | ||||||||||||
| import java.util.ArrayList; | ||||||||||||
| import java.util.Collections; | ||||||||||||
| import java.util.Comparator; | ||||||||||||
|
|
@@ -31,6 +32,7 @@ | |||||||||||
| import java.util.Objects; | ||||||||||||
| import java.util.concurrent.CompletableFuture; | ||||||||||||
| import java.util.concurrent.ConcurrentHashMap; | ||||||||||||
| import java.util.concurrent.ConcurrentMap; | ||||||||||||
| import java.util.concurrent.ExecutionException; | ||||||||||||
| import java.util.concurrent.Semaphore; | ||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||
|
|
@@ -91,14 +93,13 @@ public class XceiverClientGrpc extends XceiverClientSpi { | |||||||||||
| private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5; | ||||||||||||
| private final Pipeline pipeline; | ||||||||||||
| private final ConfigurationSource config; | ||||||||||||
| private final Map<DatanodeID, XceiverClientProtocolServiceStub> asyncStubs; | ||||||||||||
| private final XceiverClientMetrics metrics; | ||||||||||||
| private final Map<DatanodeID, ManagedChannel> channels; | ||||||||||||
| private final Semaphore semaphore; | ||||||||||||
| private long timeout; | ||||||||||||
| private final SecurityConfig secConfig; | ||||||||||||
| private final boolean topologyAwareRead; | ||||||||||||
| private final ClientTrustManager trustManager; | ||||||||||||
| private final ConcurrentMap<DatanodeID, ChannelInfo> dnChannelInfoMap; | ||||||||||||
| // Cache the DN which returned the GetBlock command so that the ReadChunk | ||||||||||||
| // command can be sent to the same DN. | ||||||||||||
| private final Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache; | ||||||||||||
|
|
@@ -126,8 +127,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config, | |||||||||||
| this.semaphore = | ||||||||||||
| new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); | ||||||||||||
| this.metrics = XceiverClientManager.getXceiverClientMetrics(); | ||||||||||||
| this.channels = new ConcurrentHashMap<>(); | ||||||||||||
| this.asyncStubs = new ConcurrentHashMap<>(); | ||||||||||||
| this.dnChannelInfoMap = new ConcurrentHashMap<>(); | ||||||||||||
| this.topologyAwareRead = config.getBoolean( | ||||||||||||
| OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, | ||||||||||||
| OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); | ||||||||||||
|
|
@@ -161,49 +161,37 @@ public void connect() throws Exception { | |||||||||||
| connectToDatanode(dn); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private void connectToDatanode(DatanodeDetails dn) | ||||||||||||
| throws IOException { | ||||||||||||
| private void connectToDatanode(DatanodeDetails dn) throws IOException { | ||||||||||||
| if (isClosed.get()) { | ||||||||||||
| throw new IOException("Client is closed."); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| if (isConnected(dn)) { | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| // read port from the data node, on failure use default configured port | ||||||||||||
| int port = dn.getStandalonePort().getValue(); | ||||||||||||
| if (port == 0) { | ||||||||||||
| port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, | ||||||||||||
| OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT); | ||||||||||||
| } | ||||||||||||
| final int finalPort = port; | ||||||||||||
|
|
||||||||||||
| LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, pipeline.getNodes()); | ||||||||||||
|
|
||||||||||||
| channels.computeIfPresent(dn.getID(), (dnId, channel) -> { | ||||||||||||
| if (channel.isTerminated() || channel.isShutdown()) { | ||||||||||||
| asyncStubs.remove(dnId); | ||||||||||||
| return null; // removes from channels map | ||||||||||||
| dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> { | ||||||||||||
| // channel is absent or stale | ||||||||||||
| if (channelInfo == null || channelInfo.isChannelInactive()) { | ||||||||||||
| LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); | ||||||||||||
| try { | ||||||||||||
| return generateNewChannel(dn); | ||||||||||||
| } catch (IOException e) { | ||||||||||||
| throw new UncheckedIOException(e); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| return channel; | ||||||||||||
| // channel is present and active | ||||||||||||
| return channelInfo; | ||||||||||||
| }); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| ManagedChannel channel; | ||||||||||||
| try { | ||||||||||||
| channel = channels.computeIfAbsent(dn.getID(), dnId -> { | ||||||||||||
| try { | ||||||||||||
| return createChannel(dn, finalPort).build(); | ||||||||||||
| } catch (IOException e) { | ||||||||||||
| throw new RuntimeException(e); | ||||||||||||
| } | ||||||||||||
| }); | ||||||||||||
| } catch (RuntimeException e) { | ||||||||||||
| LOG.error("Failed to create channel to datanode {}", dn, e); | ||||||||||||
| throw new IOException(e.getCause()); | ||||||||||||
|
Comment on lines
-201
to
-203
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to keep the translation back from unchecked exception to try {
dnChannelInfoMap.compute(...);
} catch (UncheckedIOException e) {
LOG.error("Failed to create channel to datanode {}", dn, e);
throw e.getCause();
} |
||||||||||||
| private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException { | ||||||||||||
| // read port from the data node, on failure use default configured port | ||||||||||||
| int port = dn.getStandalonePort().getValue(); | ||||||||||||
| if (port == 0) { | ||||||||||||
| port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| asyncStubs.computeIfAbsent(dn.getID(), dnId -> XceiverClientProtocolServiceGrpc.newStub(channel)); | ||||||||||||
| ManagedChannel channel = createChannel(dn, port).build(); | ||||||||||||
| XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel); | ||||||||||||
| return new ChannelInfo(channel, stub); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) | ||||||||||||
|
|
@@ -248,11 +236,11 @@ private boolean datanodeUseHostName() { | |||||||||||
| */ | ||||||||||||
| @VisibleForTesting | ||||||||||||
| public boolean isConnected(DatanodeDetails details) { | ||||||||||||
| return isConnected(channels.get(details.getID())); | ||||||||||||
| } | ||||||||||||
| if (details == null || !dnChannelInfoMap.containsKey(details.getID())) { | ||||||||||||
| return false; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private boolean isConnected(ManagedChannel channel) { | ||||||||||||
| return channel != null && !channel.isTerminated() && !channel.isShutdown(); | ||||||||||||
| return !dnChannelInfoMap.get(details.getID()).isChannelInactive(); | ||||||||||||
|
Comment on lines
+239
to
+243
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid separate if (details == null) {
return false;
}
ChannelInfo info = dnChannelInfoMap.get(details.getID());
return info != null && !info.isChannelInactive(); |
||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
|
|
@@ -267,13 +255,16 @@ public void close() { | |||||||||||
| return; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| for (ManagedChannel channel : channels.values()) { | ||||||||||||
| channel.shutdown(); | ||||||||||||
| for (ChannelInfo channelInfo : dnChannelInfoMap.values()) { | ||||||||||||
| channelInfo.getChannel().shutdown(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| final long maxWaitNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS); | ||||||||||||
| long deadline = System.nanoTime() + maxWaitNanos; | ||||||||||||
| List<ManagedChannel> nonTerminatedChannels = new ArrayList<>(channels.values()); | ||||||||||||
| List<ManagedChannel> nonTerminatedChannels = dnChannelInfoMap.values() | ||||||||||||
| .stream() | ||||||||||||
| .map(ChannelInfo::getChannel) | ||||||||||||
| .collect(Collectors.toList()); | ||||||||||||
|
Comment on lines
+266
to
+267
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
|
|
||||||||||||
| while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) { | ||||||||||||
| nonTerminatedChannels.removeIf(ManagedChannel::isTerminated); | ||||||||||||
|
|
@@ -286,16 +277,17 @@ public void close() { | |||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| List<DatanodeID> failedChannels = channels.entrySet().stream() | ||||||||||||
| .filter(e -> !e.getValue().isTerminated()) | ||||||||||||
| List<DatanodeID> failedChannels = dnChannelInfoMap.entrySet() | ||||||||||||
| .stream() | ||||||||||||
| .filter(e -> !e.getValue().getChannel().isTerminated()) | ||||||||||||
| .map(Map.Entry::getKey) | ||||||||||||
| .collect(Collectors.toList()); | ||||||||||||
|
|
||||||||||||
| if (!failedChannels.isEmpty()) { | ||||||||||||
| LOG.warn("Channels {} did not terminate within timeout.", failedChannels); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| channels.clear(); | ||||||||||||
| asyncStubs.clear(); | ||||||||||||
| dnChannelInfoMap.clear(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Override | ||||||||||||
|
|
@@ -581,7 +573,7 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t | |||||||||||
| try { | ||||||||||||
| checkOpen(dn); | ||||||||||||
| semaphore.acquire(); | ||||||||||||
| XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID()); | ||||||||||||
| XceiverClientProtocolServiceStub stub = dnChannelInfoMap.get(dn.getID()).getStub(); | ||||||||||||
| if (stub == null) { | ||||||||||||
| throw new IOException("Failed to get gRPC stub for DataNode: " + dn); | ||||||||||||
| } | ||||||||||||
|
|
@@ -698,7 +690,7 @@ public XceiverClientReply sendCommandAsync( | |||||||||||
|
|
||||||||||||
| // create a new grpc message stream pair for each call. | ||||||||||||
| final StreamObserver<ContainerCommandRequestProto> requestObserver = | ||||||||||||
| asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS) | ||||||||||||
| dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS) | ||||||||||||
| .send(new StreamObserver<ContainerCommandResponseProto>() { | ||||||||||||
| @Override | ||||||||||||
| public void onNext(ContainerCommandResponseProto value) { | ||||||||||||
|
|
@@ -743,26 +735,21 @@ private void checkOpen(DatanodeDetails dn) | |||||||||||
| throw new IOException("This channel is not connected."); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| ManagedChannel channel = channels.get(dn.getID()); | ||||||||||||
| // If the channel doesn't exist for this specific datanode or the channel | ||||||||||||
| // is closed, just reconnect | ||||||||||||
| if (!isConnected(channel)) { | ||||||||||||
| // If the channel doesn't exist for this specific datanode or the channel is closed, just reconnect | ||||||||||||
| if (!isConnected(dn)) { | ||||||||||||
| reconnect(dn); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private void reconnect(DatanodeDetails dn) | ||||||||||||
| throws IOException { | ||||||||||||
| ManagedChannel channel; | ||||||||||||
| try { | ||||||||||||
| connectToDatanode(dn); | ||||||||||||
| channel = channels.get(dn.getID()); | ||||||||||||
| } catch (Exception e) { | ||||||||||||
| throw new IOException("Error while connecting", e); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| if (!isConnected(channel)) { | ||||||||||||
| if (!isConnected(dn)) { | ||||||||||||
| throw new IOException("This channel is not connected."); | ||||||||||||
| } | ||||||||||||
|
Comment on lines
735
to
754
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that connectToDatanode(dn);
if (!isConnected(dn)) {
throw new IOException("This channel is not connected.");
}and |
||||||||||||
| } | ||||||||||||
|
|
@@ -784,4 +771,31 @@ public ConfigurationSource getConfig() { | |||||||||||
| public void setTimeout(long timeout) { | ||||||||||||
| this.timeout = timeout; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Group the channel and stub so that they are published together. | ||||||||||||
| */ | ||||||||||||
| private static class ChannelInfo { | ||||||||||||
| private final ManagedChannel channel; | ||||||||||||
| private final XceiverClientProtocolServiceStub stub; | ||||||||||||
|
|
||||||||||||
| ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) { | ||||||||||||
| this.channel = channel; | ||||||||||||
| this.stub = stub; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public ManagedChannel getChannel() { | ||||||||||||
| return channel; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public XceiverClientProtocolServiceStub getStub() { | ||||||||||||
| return stub; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public boolean isChannelInactive() { | ||||||||||||
| return channel == null | ||||||||||||
| || channel.isTerminated() | ||||||||||||
| || channel.isShutdown(); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to close inactive channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the channel is inactive, it is already either terminated or shutdown. In both cases the underlying resources have already been released.