HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs.#9997
Conversation
…rpc#sendCommandAsync`.
…e` causing intermittent NPEs.
adoroszlai
left a comment
There was a problem hiding this comment.
Thanks @ptlrs for working on this.
| /** | ||
| * Checks if the client has a live connection channel to the specified Datanode. | ||
| * | ||
| * @return True if the connection is alive, false otherwise. | ||
| */ | ||
| @VisibleForTesting | ||
| public boolean isConnected(DatanodeDetails details) { |
There was a problem hiding this comment.
nit: please don't move isConnected unnecessarily, it inflates the diff
| private void connectToDatanode(DatanodeDetails dn) throws IOException { | ||
| if (isClosed.get()) { | ||
| throw new IOException("Client is closed."); | ||
| } | ||
|
|
||
| if (isConnected(dn)) { | ||
| return; | ||
| } | ||
| // read port from the data node, on failure use default configured port | ||
| int port = dn.getStandalonePort().getValue(); | ||
| if (port == 0) { | ||
| port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, | ||
| OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT); | ||
| } | ||
| final int finalPort = port; | ||
|
|
||
| LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, pipeline.getNodes()); | ||
| LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes()); | ||
|
|
||
| channels.computeIfPresent(dn.getID(), (dnId, channel) -> { | ||
| if (channel.isTerminated() || channel.isShutdown()) { | ||
| asyncStubs.remove(dnId); | ||
| return null; // removes from channels map | ||
| } | ||
| removeStaleChannel(dn); | ||
| generateNewChannel(dn); | ||
| } |
There was a problem hiding this comment.
connectToDatanode still performs multiple map operations in a non-atomic way:
- calls
isConnectedtwice, once directly, once viaremoveStaleChannel isConnectedcallscontainsKeyandgetseparatelyremoveStaleChannelcallsisConnectedandremoveseparatelygenerateNewChannelcalls unconditionalput
Use only one compute operation, and distinguish between the three cases in that call:
- absent
- present but stale
- present and active
| private ManagedChannel channel; | ||
| private XceiverClientProtocolServiceStub stub; |
There was a problem hiding this comment.
nit: these can be final
|
Thanks for the review @adoroszlai, I have pushed the fixes. |
| return null; // removes from channels map | ||
| dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> { | ||
| // channel is absent or stale | ||
| if (channelInfo == null || channelInfo.isChannelInactive()) { |
There was a problem hiding this comment.
do we need to close inactive channel?
There was a problem hiding this comment.
If the channel is inactive, it is already either terminated or shutdown. In both cases the underlying resources have already been released.
adoroszlai
left a comment
There was a problem hiding this comment.
Thanks @ptlrs for updating the patch. compute logic looks good now.
| if (details == null || !dnChannelInfoMap.containsKey(details.getID())) { | ||
| return false; | ||
| } | ||
|
|
||
| private boolean isConnected(ManagedChannel channel) { | ||
| return channel != null && !channel.isTerminated() && !channel.isShutdown(); | ||
| return !dnChannelInfoMap.get(details.getID()).isChannelInactive(); |
There was a problem hiding this comment.
Avoid separate containsKey and get.
if (details == null) {
return false;
}
ChannelInfo info = dnChannelInfoMap.get(details.getID());
return info != null && !info.isChannelInactive();| .map(ChannelInfo::getChannel) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
ChannelInfo.channel may be null (or at least there are checks for that elsewhere).
| .map(ChannelInfo::getChannel) | |
| .collect(Collectors.toList()); | |
| .map(ChannelInfo::getChannel) | |
| .filter(Objects::nonNull) | |
| .collect(Collectors.toList()); |
| throw new IOException("This channel is not connected."); | ||
| } | ||
|
|
||
| ManagedChannel channel = channels.get(dn.getID()); | ||
| // If the channel doesn't exist for this specific datanode or the channel | ||
| // is closed, just reconnect | ||
| if (!isConnected(channel)) { | ||
| // If the channel doesn't exist for this specific datanode or the channel is closed, just reconnect | ||
| if (!isConnected(dn)) { | ||
| reconnect(dn); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private void reconnect(DatanodeDetails dn) | ||
| throws IOException { | ||
| ManagedChannel channel; | ||
| try { | ||
| connectToDatanode(dn); | ||
| channel = channels.get(dn.getID()); | ||
| } catch (Exception e) { | ||
| throw new IOException("Error while connecting", e); | ||
| } | ||
|
|
||
| if (!isConnected(channel)) { | ||
| if (!isConnected(dn)) { | ||
| throw new IOException("This channel is not connected."); | ||
| } |
There was a problem hiding this comment.
Given that connectToDatanode handles all cases ("closed", "already connected", "needs new connection"), checkOpen can be simplified:
connectToDatanode(dn);
if (!isConnected(dn)) {
throw new IOException("This channel is not connected.");
}and reconnect can be removed.
| } catch (RuntimeException e) { | ||
| LOG.error("Failed to create channel to datanode {}", dn, e); | ||
| throw new IOException(e.getCause()); |
There was a problem hiding this comment.
We need to keep the translation back from unchecked exception to IOException, because callers may not handle the former.
try {
dnChannelInfoMap.compute(...);
} catch (UncheckedIOException e) {
LOG.error("Failed to create channel to datanode {}", dn, e);
throw e.getCause();
}
What changes were proposed in this pull request?
The
XceiverClientGrpc#connectToDatanodeintermittently fails with an NPE.The problem is that for a given datanode, there is a race condition between creating a channel and creating a stub.
When a new channel is created for a DN, it is put into the
channelsmap. However, presence of a channel in the map does not imply that the corresponding stub for the same DN also exists in theasyncStubsmap.If the stub is accessed after creating a channel but before the creation of stub, we can get an NPE.
This PR fixes the problem by:
dnChannelInfoMapfor both the channels and stubs instead of two independent mapsChannelInfoclass to group the channel and stubWhat is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14793
How was this patch tested?
CI: https://github.com/ptlrs/ozone/actions/runs/23703558972
Flaky test runner: https://github.com/ptlrs/ozone/actions/runs/23823575690