From d3697337a21ae0efe2144774c950387d806b27ae Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 10 Mar 2026 00:06:10 +0000 Subject: [PATCH 1/5] reuse gRPC xDS channel by ref-counting --- .../io/grpc/xds/GrpcXdsTransportFactory.java | 49 +++++++++++++-- .../grpc/xds/GrpcXdsTransportFactoryTest.java | 62 +++++++++++++++++++ 2 files changed, 106 insertions(+), 5 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java index 0da51bf47f7..238e8e9d4c9 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java @@ -31,11 +31,17 @@ import io.grpc.Status; import io.grpc.xds.client.Bootstrapper; import io.grpc.xds.client.XdsTransportFactory; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; final class GrpcXdsTransportFactory implements XdsTransportFactory { private final CallCredentials callCredentials; + // The map of xDS server info to its corresponding gRPC xDS transport. + // This enables reusing and sharing the same underlying gRPC channel. + private static final Map xdsServerInfoToTransportMap = + new ConcurrentHashMap<>(); GrpcXdsTransportFactory(CallCredentials callCredentials) { this.callCredentials = callCredentials; @@ -43,12 +49,25 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory { @Override public XdsTransport create(Bootstrapper.ServerInfo serverInfo) { - return new GrpcXdsTransport(serverInfo, callCredentials); + return xdsServerInfoToTransportMap.compute( + serverInfo, + (info, transport) -> { + if (transport == null) { + transport = new GrpcXdsTransport(serverInfo, callCredentials); + } + ++transport.refCount; + return transport; + }); } @VisibleForTesting public XdsTransport createForTest(ManagedChannel channel) { - return new GrpcXdsTransport(channel, callCredentials); + return new GrpcXdsTransport(channel, callCredentials, null); + } + + @VisibleForTesting + static boolean hasTransport(Bootstrapper.ServerInfo serverInfo) { + return xdsServerInfoToTransportMap.containsKey(serverInfo); } @VisibleForTesting @@ -56,6 +75,9 @@ static class GrpcXdsTransport implements XdsTransport { private final ManagedChannel channel; private final CallCredentials callCredentials; + private final Bootstrapper.ServerInfo serverInfo; + // Must only be accessed within the provided atomic methods of ConcurrentHashMap. + private int refCount = 0; public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) { this(serverInfo, null); @@ -63,7 +85,7 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) { @VisibleForTesting public GrpcXdsTransport(ManagedChannel channel) { - this(channel, null); + this(channel, null, null); } public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials callCredentials) { @@ -73,12 +95,17 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials call .keepAliveTime(5, TimeUnit.MINUTES) .build(); this.callCredentials = callCredentials; + this.serverInfo = serverInfo; } @VisibleForTesting - public GrpcXdsTransport(ManagedChannel channel, CallCredentials callCredentials) { + public GrpcXdsTransport( + ManagedChannel channel, + CallCredentials callCredentials, + Bootstrapper.ServerInfo serverInfo) { this.channel = checkNotNull(channel, "channel"); this.callCredentials = callCredentials; + this.serverInfo = serverInfo; } @Override @@ -98,7 +125,19 @@ public StreamingCall createStreamingCall( @Override public void shutdown() { - channel.shutdown(); + if (serverInfo == null) { + channel.shutdown(); + return; + } + xdsServerInfoToTransportMap.computeIfPresent( + serverInfo, + (info, transport) -> { + if (--transport.refCount == 0) { // Prefix decrement and return the updated value. + transport.channel.shutdown(); + return null; // Remove mapping. + } + return transport; + }); } private class XdsStreamingCall implements diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java index 66e0d4b3198..e261624b6a4 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java @@ -118,6 +118,68 @@ public void callApis() throws Exception { xdsTransport.shutdown(); } + @Test + public void refCountedXdsTransport_sameXdsServerAddress_returnsExistingTransport() { + Bootstrapper.ServerInfo xdsServerInfo = + Bootstrapper.ServerInfo.create( + "localhost:" + server.getPort(), InsecureChannelCredentials.create()); + GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null); + // Verify calling create() for the first time creates a new GrpcXdsTransport instance. + // The ref count was previously 0 and now is 1. + XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isTrue(); + // Verify calling create() for the second time to the same xDS server address returns the same + // GrpcXdsTransport instance. The ref count was previously 1 and now is 2. + XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo); + assertThat(transport1).isSameInstanceAs(transport2); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isTrue(); + // Verify calling shutdown() for the first time does not shut down the GrpcXdsTransport + // instance. The ref count was previously 2 and now is 1. + transport1.shutdown(); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isTrue(); + // Verify calling shutdown() for the second time shuts down and cleans up the + // GrpcXdsTransport instance. The ref count was previously 1 and now is 0. + transport2.shutdown(); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isFalse(); + } + + @Test + public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTransport() + throws Exception { + // Create and start a second xDS serverĀ on a different port. + Server server2 = + Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()) + .addService(echoAdsService()) + .build() + .start(); + Bootstrapper.ServerInfo xdsServerInfo1 = + Bootstrapper.ServerInfo.create( + "localhost:" + server.getPort(), InsecureChannelCredentials.create()); + Bootstrapper.ServerInfo xdsServerInfo2 = + Bootstrapper.ServerInfo.create( + "localhost:" + server2.getPort(), InsecureChannelCredentials.create()); + GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null); + // Verify calling create() to the first xDS server creates a new GrpcXdsTransport instance. + // The ref count was previously 0 and now is 1. + XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo1); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo1)).isTrue(); + // Verify calling create() to the second xDS server creates a different GrpcXdsTransport + // instance. The ref count was previously 0 and now is 1. + XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo2); + assertThat(transport1).isNotSameInstanceAs(transport2); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo2)).isTrue(); + // Verify calling shutdown() shuts down and cleans up the GrpcXdsTransport instance for + // the first xDS server. The ref count was previously 1 and now is 0. + transport1.shutdown(); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo1)).isFalse(); + // Verify calling shutdown() shuts down and cleans up the GrpcXdsTransport instance for + // the second xDS server. The ref count was previously 1 and now is 0. + transport2.shutdown(); + assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo2)).isFalse(); + // Clean up the second xDS server. + server2.shutdown(); + } + private static class FakeEventHandler implements XdsTransportFactory.EventHandler { private final BlockingQueue respQ = new LinkedBlockingQueue<>(); From ba7054a7362c35558a32e45dfb8172085029352d Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 12 Mar 2026 00:11:49 +0000 Subject: [PATCH 2/5] update comments for ConcurrentHashMap, add warning for ServerInfo as key --- .../java/io/grpc/xds/GrpcXdsTransportFactory.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java index 238e8e9d4c9..41449a22ef1 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java @@ -40,6 +40,18 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory { private final CallCredentials callCredentials; // The map of xDS server info to its corresponding gRPC xDS transport. // This enables reusing and sharing the same underlying gRPC channel. + // + // NOTE: ConcurrentHashMap is used as a per-entry lock and all reads and writes must be a mutation + // via the ConcurrentHashMap APIs to acquire the per-entry lock in order to ensure thread safety + // for reference counting of each GrpcXdsTransport instance. + // + // WARNING: ServerInfo includes ChannelCredentials, which is compared by reference equality. + // This means every BootstrapInfo would have non-equal copies of ServerInfo, even if they all + // represent the same xDS server configuration. For gRPC name resolution with the `xds` and + // `google-c2p` scheme, this transport sharing works as expected as it internally reuses a single + // BootstrapInfo instance. Otherwise, new transports would be created for each ServerInfo despite + // them possibly representing the same xDS server configuration and defeating the purpose of + // transport sharing. private static final Map xdsServerInfoToTransportMap = new ConcurrentHashMap<>(); @@ -76,7 +88,7 @@ static class GrpcXdsTransport implements XdsTransport { private final ManagedChannel channel; private final CallCredentials callCredentials; private final Bootstrapper.ServerInfo serverInfo; - // Must only be accessed within the provided atomic methods of ConcurrentHashMap. + // Must only be accessed via the ConcurrentHashMap APIs which act as the locking methods. private int refCount = 0; public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) { From 56d96bdf9a48d1b8c1d60eb3d50e10c286c92622 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 12 Mar 2026 00:43:32 +0000 Subject: [PATCH 3/5] test via public methods/results, remove hasTransport() test only method --- .../io/grpc/xds/GrpcXdsTransportFactory.java | 5 --- .../grpc/xds/GrpcXdsTransportFactoryTest.java | 34 +++++++------------ 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java index 41449a22ef1..e95914ad5cc 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java @@ -77,11 +77,6 @@ public XdsTransport createForTest(ManagedChannel channel) { return new GrpcXdsTransport(channel, callCredentials, null); } - @VisibleForTesting - static boolean hasTransport(Bootstrapper.ServerInfo serverInfo) { - return xdsServerInfoToTransportMap.containsKey(serverInfo); - } - @VisibleForTesting static class GrpcXdsTransport implements XdsTransport { diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java index e261624b6a4..547ec4a309c 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java @@ -124,23 +124,19 @@ public void refCountedXdsTransport_sameXdsServerAddress_returnsExistingTransport Bootstrapper.ServerInfo.create( "localhost:" + server.getPort(), InsecureChannelCredentials.create()); GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null); - // Verify calling create() for the first time creates a new GrpcXdsTransport instance. + // Calling create() for the first time creates a new GrpcXdsTransport instance. // The ref count was previously 0 and now is 1. XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isTrue(); - // Verify calling create() for the second time to the same xDS server address returns the same + // Calling create() for the second time to the same xDS server address returns the same // GrpcXdsTransport instance. The ref count was previously 1 and now is 2. XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo); assertThat(transport1).isSameInstanceAs(transport2); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isTrue(); - // Verify calling shutdown() for the first time does not shut down the GrpcXdsTransport - // instance. The ref count was previously 2 and now is 1. + // Calling shutdown() for the first time does not shut down the GrpcXdsTransport instance. + // The ref count was previously 2 and now is 1. transport1.shutdown(); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isTrue(); - // Verify calling shutdown() for the second time shuts down and cleans up the - // GrpcXdsTransport instance. The ref count was previously 1 and now is 0. + // Calling shutdown() for the second time shuts down the GrpcXdsTransport instance. + // The ref count was previously 1 and now is 0. transport2.shutdown(); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo)).isFalse(); } @Test @@ -159,23 +155,19 @@ public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTra Bootstrapper.ServerInfo.create( "localhost:" + server2.getPort(), InsecureChannelCredentials.create()); GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null); - // Verify calling create() to the first xDS server creates a new GrpcXdsTransport instance. + // Calling create() to the first xDS server creates a new GrpcXdsTransport instance. // The ref count was previously 0 and now is 1. XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo1); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo1)).isTrue(); - // Verify calling create() to the second xDS server creates a different GrpcXdsTransport - // instance. The ref count was previously 0 and now is 1. + // Calling create() to the second xDS server creates a different GrpcXdsTransport instance. + // The ref count was previously 0 and now is 1. XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo2); assertThat(transport1).isNotSameInstanceAs(transport2); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo2)).isTrue(); - // Verify calling shutdown() shuts down and cleans up the GrpcXdsTransport instance for - // the first xDS server. The ref count was previously 1 and now is 0. + // Calling shutdown() shuts down the GrpcXdsTransport instance for the first xDS server. + // The ref count was previously 1 and now is 0. transport1.shutdown(); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo1)).isFalse(); - // Verify calling shutdown() shuts down and cleans up the GrpcXdsTransport instance for - // the second xDS server. The ref count was previously 1 and now is 0. + // Calling shutdown() shuts down the GrpcXdsTransport instance for the second xDS server. + // The ref count was previously 1 and now is 0. transport2.shutdown(); - assertThat(GrpcXdsTransportFactory.hasTransport(xdsServerInfo2)).isFalse(); // Clean up the second xDS server. server2.shutdown(); } From bf8e597797d9036000586a86f5407844ceb906a2 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 12 Mar 2026 01:33:59 +0000 Subject: [PATCH 4/5] use GrpcCleanupRule to register new server in test --- .../io/grpc/xds/GrpcXdsTransportFactoryTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java index 547ec4a309c..9c606a962f6 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java @@ -30,6 +30,7 @@ import io.grpc.Server; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.client.Bootstrapper; import io.grpc.xds.client.XdsTransportFactory; import java.util.concurrent.BlockingQueue; @@ -37,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,6 +46,8 @@ @RunWith(JUnit4.class) public class GrpcXdsTransportFactoryTest { + @Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + private Server server; @Before @@ -144,10 +148,11 @@ public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTra throws Exception { // Create and start a second xDS serverĀ on a different port. Server server2 = - Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()) - .addService(echoAdsService()) - .build() - .start(); + grpcCleanupRule.register( + Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()) + .addService(echoAdsService()) + .build() + .start()); Bootstrapper.ServerInfo xdsServerInfo1 = Bootstrapper.ServerInfo.create( "localhost:" + server.getPort(), InsecureChannelCredentials.create()); @@ -168,8 +173,6 @@ public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTra // Calling shutdown() shuts down the GrpcXdsTransport instance for the second xDS server. // The ref count was previously 1 and now is 0. transport2.shutdown(); - // Clean up the second xDS server. - server2.shutdown(); } private static class FakeEventHandler implements From 44da596e0f2b1b4574da3e8fc38a8c74117ccca5 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 12 Mar 2026 22:43:40 +0000 Subject: [PATCH 5/5] move/update warning for ServerInfo as key to class-level Javadoc --- .../io/grpc/xds/GrpcXdsTransportFactory.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java index e95914ad5cc..5100537aea2 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java @@ -35,6 +35,20 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +/** + * A factory for creating gRPC-based transports for xDS communication. + * + *

WARNING: This class reuses channels when possible, based on the provided {@link + * Bootstrapper.ServerInfo} with important considerations. The {@link Bootstrapper.ServerInfo} + * includes {@link ChannelCredentials}, which is compared by reference equality. This means every + * {@link Bootstrapper.BootstrapInfo} would have non-equal copies of {@link + * Bootstrapper.ServerInfo}, even if they all represent the same xDS server configuration. For gRPC + * name resolution with the {@code xds} and {@code google-c2p} scheme, this transport sharing works + * as expected as it internally reuses a single {@link Bootstrapper.BootstrapInfo} instance. + * Otherwise, new transports would be created for each {@link Bootstrapper.ServerInfo} despite them + * possibly representing the same xDS server configuration and defeating the purpose of transport + * sharing. + */ final class GrpcXdsTransportFactory implements XdsTransportFactory { private final CallCredentials callCredentials; @@ -44,14 +58,6 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory { // NOTE: ConcurrentHashMap is used as a per-entry lock and all reads and writes must be a mutation // via the ConcurrentHashMap APIs to acquire the per-entry lock in order to ensure thread safety // for reference counting of each GrpcXdsTransport instance. - // - // WARNING: ServerInfo includes ChannelCredentials, which is compared by reference equality. - // This means every BootstrapInfo would have non-equal copies of ServerInfo, even if they all - // represent the same xDS server configuration. For gRPC name resolution with the `xds` and - // `google-c2p` scheme, this transport sharing works as expected as it internally reuses a single - // BootstrapInfo instance. Otherwise, new transports would be created for each ServerInfo despite - // them possibly representing the same xDS server configuration and defeating the purpose of - // transport sharing. private static final Map xdsServerInfoToTransportMap = new ConcurrentHashMap<>();