Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -31,6 +32,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,14 +93,13 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5;
private final Pipeline pipeline;
private final ConfigurationSource config;
private final Map<DatanodeID, XceiverClientProtocolServiceStub> asyncStubs;
private final XceiverClientMetrics metrics;
private final Map<DatanodeID, ManagedChannel> channels;
private final Semaphore semaphore;
private long timeout;
private final SecurityConfig secConfig;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
private final ConcurrentMap<DatanodeID, ChannelInfo> dnChannelInfoMap;
// Cache the DN which returned the GetBlock command so that the ReadChunk
// command can be sent to the same DN.
private final Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache;
Expand Down Expand Up @@ -126,8 +127,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.channels = new ConcurrentHashMap<>();
this.asyncStubs = new ConcurrentHashMap<>();
this.dnChannelInfoMap = new ConcurrentHashMap<>();
this.topologyAwareRead = config.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
Expand Down Expand Up @@ -161,49 +161,37 @@ public void connect() throws Exception {
connectToDatanode(dn);
}

private void connectToDatanode(DatanodeDetails dn)
throws IOException {
private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}

if (isConnected(dn)) {
return;
}
// read port from the data node, on failure use default configured port
int port = dn.getStandalonePort().getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}
final int finalPort = port;

LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, pipeline.getNodes());

channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
if (channel.isTerminated() || channel.isShutdown()) {
asyncStubs.remove(dnId);
return null; // removes from channels map
dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> {
// channel is absent or stale
if (channelInfo == null || channelInfo.isChannelInactive()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to close inactive channel?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel is inactive, it is already either terminated or shutdown. In both cases the underlying resources have already been released.

LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes());
try {
return generateNewChannel(dn);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

return channel;
// channel is present and active
return channelInfo;
});
}

ManagedChannel channel;
try {
channel = channels.computeIfAbsent(dn.getID(), dnId -> {
try {
return createChannel(dn, finalPort).build();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (RuntimeException e) {
LOG.error("Failed to create channel to datanode {}", dn, e);
throw new IOException(e.getCause());
Comment on lines -201 to -203
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep the translation back from unchecked exception to IOException, because callers may not handle the former.

try {
  dnChannelInfoMap.compute(...);
} catch (UncheckedIOException e) {
  LOG.error("Failed to create channel to datanode {}", dn, e);
  throw e.getCause();
}

private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException {
// read port from the data node, on failure use default configured port
int port = dn.getStandalonePort().getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}

asyncStubs.computeIfAbsent(dn.getID(), dnId -> XceiverClientProtocolServiceGrpc.newStub(channel));
ManagedChannel channel = createChannel(dn, port).build();
XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel);
return new ChannelInfo(channel, stub);
}

protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
Expand Down Expand Up @@ -248,11 +236,11 @@ private boolean datanodeUseHostName() {
*/
@VisibleForTesting
public boolean isConnected(DatanodeDetails details) {
return isConnected(channels.get(details.getID()));
}
if (details == null || !dnChannelInfoMap.containsKey(details.getID())) {
return false;
}

private boolean isConnected(ManagedChannel channel) {
return channel != null && !channel.isTerminated() && !channel.isShutdown();
return !dnChannelInfoMap.get(details.getID()).isChannelInactive();
Comment on lines +239 to +243
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid separate containsKey and get.

if (details == null) {
  return false;
}
ChannelInfo info = dnChannelInfoMap.get(details.getID());
return info != null && !info.isChannelInactive();

}

/**
Expand All @@ -267,13 +255,16 @@ public void close() {
return;
}

for (ManagedChannel channel : channels.values()) {
channel.shutdown();
for (ChannelInfo channelInfo : dnChannelInfoMap.values()) {
channelInfo.getChannel().shutdown();
}

final long maxWaitNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
long deadline = System.nanoTime() + maxWaitNanos;
List<ManagedChannel> nonTerminatedChannels = new ArrayList<>(channels.values());
List<ManagedChannel> nonTerminatedChannels = dnChannelInfoMap.values()
.stream()
.map(ChannelInfo::getChannel)
.collect(Collectors.toList());
Comment on lines +266 to +267
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChannelInfo.channel may be null (or at least there are checks for that elsewhere).

Suggested change
.map(ChannelInfo::getChannel)
.collect(Collectors.toList());
.map(ChannelInfo::getChannel)
.filter(Objects::nonNull)
.collect(Collectors.toList());


while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) {
nonTerminatedChannels.removeIf(ManagedChannel::isTerminated);
Expand All @@ -286,16 +277,17 @@ public void close() {
}
}

List<DatanodeID> failedChannels = channels.entrySet().stream()
.filter(e -> !e.getValue().isTerminated())
List<DatanodeID> failedChannels = dnChannelInfoMap.entrySet()
.stream()
.filter(e -> !e.getValue().getChannel().isTerminated())
.map(Map.Entry::getKey)
.collect(Collectors.toList());

if (!failedChannels.isEmpty()) {
LOG.warn("Channels {} did not terminate within timeout.", failedChannels);
}

channels.clear();
asyncStubs.clear();
dnChannelInfoMap.clear();
}

@Override
Expand Down Expand Up @@ -581,7 +573,7 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t
try {
checkOpen(dn);
semaphore.acquire();
XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
XceiverClientProtocolServiceStub stub = dnChannelInfoMap.get(dn.getID()).getStub();
if (stub == null) {
throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
}
Expand Down Expand Up @@ -698,7 +690,7 @@ public XceiverClientReply sendCommandAsync(

// create a new grpc message stream pair for each call.
final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(ContainerCommandResponseProto value) {
Expand Down Expand Up @@ -743,26 +735,21 @@ private void checkOpen(DatanodeDetails dn)
throw new IOException("This channel is not connected.");
}

ManagedChannel channel = channels.get(dn.getID());
// If the channel doesn't exist for this specific datanode or the channel
// is closed, just reconnect
if (!isConnected(channel)) {
// If the channel doesn't exist for this specific datanode or the channel is closed, just reconnect
if (!isConnected(dn)) {
reconnect(dn);
}

}

private void reconnect(DatanodeDetails dn)
throws IOException {
ManagedChannel channel;
try {
connectToDatanode(dn);
channel = channels.get(dn.getID());
} catch (Exception e) {
throw new IOException("Error while connecting", e);
}

if (!isConnected(channel)) {
if (!isConnected(dn)) {
throw new IOException("This channel is not connected.");
}
Comment on lines 735 to 754
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that connectToDatanode handles all cases ("closed", "already connected", "needs new connection"), checkOpen can be simplified:

    connectToDatanode(dn);

    if (!isConnected(dn)) {
      throw new IOException("This channel is not connected.");
    }

and reconnect can be removed.

}
Expand All @@ -784,4 +771,31 @@ public ConfigurationSource getConfig() {
public void setTimeout(long timeout) {
this.timeout = timeout;
}

/**
* Group the channel and stub so that they are published together.
*/
private static class ChannelInfo {
private final ManagedChannel channel;
private final XceiverClientProtocolServiceStub stub;

ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) {
this.channel = channel;
this.stub = stub;
}

public ManagedChannel getChannel() {
return channel;
}

public XceiverClientProtocolServiceStub getStub() {
return stub;
}

public boolean isChannelInactive() {
return channel == null
|| channel.isTerminated()
|| channel.isShutdown();
}
}
}