diff --git a/api/src/main/java/io/grpc/ForwardingServerBuilder.java b/api/src/main/java/io/grpc/ForwardingServerBuilder.java
index 9cef7cfa331..d1f183dd824 100644
--- a/api/src/main/java/io/grpc/ForwardingServerBuilder.java
+++ b/api/src/main/java/io/grpc/ForwardingServerBuilder.java
@@ -201,6 +201,12 @@ public Server build() {
return delegate().build();
}
+ @Override
+ public T addMetricSink(MetricSink metricSink) {
+ delegate().addMetricSink(metricSink);
+ return thisT();
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
diff --git a/api/src/main/java/io/grpc/InternalTcpMetrics.java b/api/src/main/java/io/grpc/InternalTcpMetrics.java
new file mode 100644
index 00000000000..3dd89b6f76c
--- /dev/null
+++ b/api/src/main/java/io/grpc/InternalTcpMetrics.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2026 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * TCP Metrics defined to be shared across transport implementations.
+ * These metrics and their definitions are specified in
+ * gRFC
+ * A80.
+ */
+@Internal
+public final class InternalTcpMetrics {
+
+ private InternalTcpMetrics() {
+ }
+
+ private static final List OPTIONAL_LABELS = Arrays.asList(
+ "network.local.address",
+ "network.local.port",
+ "network.peer.address",
+ "network.peer.port");
+
+ public static final DoubleHistogramMetricInstrument MIN_RTT_INSTRUMENT =
+ MetricInstrumentRegistry.getDefaultRegistry()
+ .registerDoubleHistogram(
+ "grpc.tcp.min_rtt",
+ "Minimum round-trip time of a TCP connection",
+ "s",
+ Collections.emptyList(),
+ Collections.emptyList(),
+ OPTIONAL_LABELS,
+ false);
+
+ public static final LongCounterMetricInstrument CONNECTIONS_CREATED_INSTRUMENT =
+ MetricInstrumentRegistry
+ .getDefaultRegistry()
+ .registerLongCounter(
+ "grpc.tcp.connections_created",
+ "The total number of TCP connections established.",
+ "{connection}",
+ Collections.emptyList(),
+ OPTIONAL_LABELS,
+ false);
+
+ public static final LongUpDownCounterMetricInstrument CONNECTION_COUNT_INSTRUMENT =
+ MetricInstrumentRegistry
+ .getDefaultRegistry()
+ .registerLongUpDownCounter(
+ "grpc.tcp.connection_count",
+ "The current number of active TCP connections.",
+ "{connection}",
+ Collections.emptyList(),
+ OPTIONAL_LABELS,
+ false);
+
+ public static final LongCounterMetricInstrument PACKETS_RETRANSMITTED_INSTRUMENT =
+ MetricInstrumentRegistry
+ .getDefaultRegistry()
+ .registerLongCounter(
+ "grpc.tcp.packets_retransmitted",
+ "The total number of packets retransmitted for all TCP connections.",
+ "{packet}",
+ Collections.emptyList(),
+ OPTIONAL_LABELS,
+ false);
+
+ public static final LongCounterMetricInstrument RECURRING_RETRANSMITS_INSTRUMENT =
+ MetricInstrumentRegistry
+ .getDefaultRegistry()
+ .registerLongCounter(
+ "grpc.tcp.recurring_retransmits",
+ "The total number of times the retransmit timer "
+ + "popped for all TCP connections.",
+ "{timeout}",
+ Collections.emptyList(),
+ OPTIONAL_LABELS,
+ false);
+
+}
diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java
index 53dbc5d6888..b47bd93332b 100644
--- a/api/src/main/java/io/grpc/NameResolver.java
+++ b/api/src/main/java/io/grpc/NameResolver.java
@@ -355,7 +355,7 @@ public static final class Args {
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;
@Nullable private final String overrideAuthority;
- @Nullable private final MetricRecorder metricRecorder;
+ private final MetricRecorder metricRecorder;
@Nullable private final NameResolverRegistry nameResolverRegistry;
@Nullable private final IdentityHashMap, Object> customArgs;
@@ -369,7 +369,8 @@ private Args(Builder builder) {
this.channelLogger = builder.channelLogger;
this.executor = builder.executor;
this.overrideAuthority = builder.overrideAuthority;
- this.metricRecorder = builder.metricRecorder;
+ this.metricRecorder = builder.metricRecorder != null ? builder.metricRecorder
+ : new MetricRecorder() {};
this.nameResolverRegistry = builder.nameResolverRegistry;
this.customArgs = cloneCustomArgs(builder.customArgs);
}
@@ -497,7 +498,6 @@ public String getOverrideAuthority() {
/**
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*/
- @Nullable
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}
@@ -680,7 +680,7 @@ public Builder setArg(Key key, T value) {
* See {@link Args#getMetricRecorder()}. This is an optional field.
*/
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
- this.metricRecorder = metricRecorder;
+ this.metricRecorder = checkNotNull(metricRecorder);
return this;
}
diff --git a/api/src/main/java/io/grpc/ServerBuilder.java b/api/src/main/java/io/grpc/ServerBuilder.java
index cd1cddbb93f..3effe593e57 100644
--- a/api/src/main/java/io/grpc/ServerBuilder.java
+++ b/api/src/main/java/io/grpc/ServerBuilder.java
@@ -435,6 +435,17 @@ public T setBinaryLog(BinaryLog binaryLog) {
*/
public abstract Server build();
+ /**
+ * Adds a metric sink to the server.
+ *
+ * @param metricSink the metric sink to add.
+ * @return this
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/12693")
+ public T addMetricSink(MetricSink metricSink) {
+ return thisT();
+ }
+
/**
* Returns the correctly typed version of the builder.
*/
diff --git a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
index c926c853472..5f0885883a5 100644
--- a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
+++ b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
@@ -68,7 +68,7 @@ private BinderServerBuilder(
serverImplBuilder =
new ServerImplBuilder(
- streamTracerFactories -> {
+ (streamTracerFactories, metricRecorder) -> {
internalBuilder.setStreamTracerFactories(streamTracerFactories);
BinderServer server = internalBuilder.build();
BinderInternal.setIBinder(binderReceiver, server.getHostBinder());
diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
index 6c10ced4652..6023fb14aa9 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
@@ -24,6 +24,7 @@
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.HttpConnectProxiedSocketAddress;
+import io.grpc.MetricRecorder;
import java.io.Closeable;
import java.net.SocketAddress;
import java.util.Collection;
@@ -91,6 +92,8 @@ final class ClientTransportOptions {
private Attributes eagAttributes = Attributes.EMPTY;
@Nullable private String userAgent;
@Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr;
+ private MetricRecorder metricRecorder = new MetricRecorder() {
+ };
public ChannelLogger getChannelLogger() {
return channelLogger;
@@ -101,6 +104,15 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) {
return this;
}
+ public MetricRecorder getMetricRecorder() {
+ return metricRecorder;
+ }
+
+ public ClientTransportOptions setMetricRecorder(MetricRecorder metricRecorder) {
+ this.metricRecorder = Preconditions.checkNotNull(metricRecorder, "metricRecorder");
+ return this;
+ }
+
public String getAuthority() {
return authority;
}
diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
index 7a48bf642fe..ce31921e316 100644
--- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java
+++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
@@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr
private final InternalChannelz channelz;
private final CallTracer callsTracer;
private final ChannelTracer channelTracer;
+ private final MetricRecorder metricRecorder;
private final ChannelLogger channelLogger;
private final boolean reconnectDisabled;
@@ -191,6 +192,7 @@ protected void handleNotInUse() {
this.scheduledExecutor = scheduledExecutor;
this.connectingTimer = stopwatchSupplier.get();
this.syncContext = syncContext;
+ this.metricRecorder = metricRecorder;
this.callback = callback;
this.channelz = channelz;
this.callsTracer = callsTracer;
@@ -265,6 +267,7 @@ private void startNewTransport() {
.setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
.setEagAttributes(currentEagAttributes)
.setUserAgent(userAgent)
+ .setMetricRecorder(metricRecorder)
.setHttpConnectProxiedSocketAddress(proxiedAddr);
TransportLogger transportLogger = new TransportLogger();
// In case the transport logs in the constructor, use the subchannel logId
diff --git a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java
index f6566e067db..62a0e66f314 100644
--- a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java
@@ -31,6 +31,9 @@
import io.grpc.HandlerRegistry;
import io.grpc.InternalChannelz;
import io.grpc.InternalConfiguratorRegistry;
+import io.grpc.MetricInstrumentRegistry;
+import io.grpc.MetricRecorder;
+import io.grpc.MetricSink;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
@@ -80,6 +83,7 @@ public static ServerBuilder> forPort(int port) {
final List transportFilters = new ArrayList<>();
final List interceptors = new ArrayList<>();
private final List streamTracerFactories = new ArrayList<>();
+ final List metricSinks = new ArrayList<>();
private final ClientTransportServersBuilder clientTransportServersBuilder;
HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
ObjectPool extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
@@ -104,7 +108,8 @@ public static ServerBuilder> forPort(int port) {
*/
public interface ClientTransportServersBuilder {
InternalServer buildClientTransportServers(
- List extends ServerStreamTracer.Factory> streamTracerFactories);
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ MetricRecorder metricRecorder);
}
/**
@@ -157,6 +162,15 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) {
return this;
}
+ /**
+ * Adds a MetricSink to the server.
+ */
+ @Override
+ public ServerImplBuilder addMetricSink(MetricSink metricSink) {
+ metricSinks.add(checkNotNull(metricSink, "metricSink"));
+ return this;
+ }
+
@Override
public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) {
streamTracerFactories.add(checkNotNull(factory, "factory"));
@@ -241,8 +255,11 @@ public void setDeadlineTicker(Deadline.Ticker ticker) {
@Override
public Server build() {
+ MetricRecorder metricRecorder = new MetricRecorderImpl(metricSinks,
+ MetricInstrumentRegistry.getDefaultRegistry());
return new ServerImpl(this,
- clientTransportServersBuilder.buildClientTransportServers(getTracerFactories()),
+ clientTransportServersBuilder.buildClientTransportServers(
+ getTracerFactories(), metricRecorder),
Context.ROOT);
}
diff --git a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java
index 7ad7f15f358..54c2d6ef8b1 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java
@@ -18,10 +18,13 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
import io.grpc.InternalConfigurator;
import io.grpc.InternalConfiguratorRegistry;
import io.grpc.Metadata;
+import io.grpc.MetricRecorder;
+import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
@@ -73,7 +76,8 @@ public void setUp() throws Exception {
new ClientTransportServersBuilder() {
@Override
public InternalServer buildClientTransportServers(
- List extends ServerStreamTracer.Factory> streamTracerFactories) {
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ MetricRecorder metricRecorder) {
throw new UnsupportedOperationException();
}
});
@@ -128,6 +132,13 @@ public void getTracerFactories_disableBoth() {
assertThat(factories).containsExactly(DUMMY_USER_TRACER);
}
+ @Test
+ public void addMetricSink_addsToSinks() {
+ MetricSink mockSink = mock(MetricSink.class);
+ builder.addMetricSink(mockSink);
+ assertThat(builder.metricSinks).containsExactly(mockSink);
+ }
+
@Test
public void getTracerFactories_callsGet() throws Exception {
Class> runnable = classLoader.loadClass(StaticTestingClassLoaderCallsGet.class.getName());
@@ -139,7 +150,7 @@ public static final class StaticTestingClassLoaderCallsGet implements Runnable {
public void run() {
ServerImplBuilder builder =
new ServerImplBuilder(
- streamTracerFactories -> {
+ (streamTracerFactories, metricRecorder) -> {
throw new UnsupportedOperationException();
});
assertThat(builder.getTracerFactories()).hasSize(2);
@@ -169,7 +180,7 @@ public void configureServerBuilder(ServerBuilder> builder) {
}));
ServerImplBuilder builder =
new ServerImplBuilder(
- streamTracerFactories -> {
+ (streamTracerFactories, metricRecorder) -> {
throw new UnsupportedOperationException();
});
assertThat(builder.getTracerFactories()).containsExactly(DUMMY_USER_TRACER);
@@ -192,7 +203,7 @@ public void run() {
InternalConfiguratorRegistry.setConfigurators(Collections.emptyList());
ServerImplBuilder builder =
new ServerImplBuilder(
- streamTracerFactories -> {
+ (streamTracerFactories, metricRecorder) -> {
throw new UnsupportedOperationException();
});
assertThat(builder.getTracerFactories()).isEmpty();
diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java
index 0f18efe078c..3405cb9bb0c 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java
@@ -65,6 +65,7 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.grpc.MetricRecorder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallExecutorSupplier;
@@ -206,7 +207,8 @@ public void startUp() throws IOException {
new ClientTransportServersBuilder() {
@Override
public InternalServer buildClientTransportServers(
- List extends ServerStreamTracer.Factory> streamTracerFactories) {
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ MetricRecorder metricRecorder) {
throw new UnsupportedOperationException();
}
});
diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
index 190f67603c3..b2004426aae 100644
--- a/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
+++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
@@ -24,6 +24,7 @@
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal;
+import io.grpc.MetricRecorder;
import io.grpc.ServerBuilder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.FixedObjectPool;
@@ -120,7 +121,8 @@ private InProcessServerBuilder(SocketAddress listenAddress) {
final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
public InternalServer buildClientTransportServers(
- List extends ServerStreamTracer.Factory> streamTracerFactories) {
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ MetricRecorder metricRecorder) {
return buildTransportServers(streamTracerFactories);
}
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 258aa15b005..e64f1065681 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -856,6 +856,7 @@ public void run() {
localSocketPicker,
channelLogger,
useGetForSafeMethods,
+ options.getMetricRecorder(),
Ticker.systemTicker());
return transport;
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 8ebf89842ad..5615f5ed75a 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -30,6 +30,7 @@
import io.grpc.InternalChannelz;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
+import io.grpc.MetricRecorder;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.ClientStreamListener.RpcProgress;
@@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private final Supplier stopwatchFactory;
private final TransportTracer transportTracer;
private final Attributes eagAttributes;
+ private final TcpMetrics.Tracker tcpMetrics;
private final String authority;
private final InUseStateAggregator inUseState =
new InUseStateAggregator() {
@@ -164,7 +166,8 @@ static NettyClientHandler newHandler(
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger,
- Ticker ticker) {
+ Ticker ticker,
+ MetricRecorder metricRecorder) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@@ -194,7 +197,8 @@ static NettyClientHandler newHandler(
eagAttributes,
authority,
negotiationLogger,
- ticker);
+ ticker,
+ metricRecorder);
}
@VisibleForTesting
@@ -214,7 +218,8 @@ static NettyClientHandler newHandler(
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger,
- Ticker ticker) {
+ Ticker ticker,
+ MetricRecorder metricRecorder) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
@@ -269,7 +274,8 @@ static NettyClientHandler newHandler(
pingCounter,
ticker,
maxHeaderListSize,
- softLimitHeaderListSize);
+ softLimitHeaderListSize,
+ metricRecorder);
}
private NettyClientHandler(
@@ -288,7 +294,8 @@ private NettyClientHandler(
PingLimiter pingLimiter,
Ticker ticker,
int maxHeaderListSize,
- int softLimitHeaderListSize) {
+ int softLimitHeaderListSize,
+ MetricRecorder metricRecorder) {
super(
/* channelUnused= */ null,
decoder,
@@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) {
}
}
});
+ this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder);
}
/**
@@ -478,6 +486,7 @@ private void onRstStreamRead(int streamId, long errorCode) {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ tcpMetrics.recordTcpInfo(ctx.channel());
logger.fine("Network channel being closed by the application.");
if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
lifecycleManager.notifyShutdown(
@@ -490,10 +499,17 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
/**
* Handler for the Channel shutting down.
*/
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ tcpMetrics.channelActive(ctx.channel());
+ super.channelActive(ctx);
+ }
+
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
logger.fine("Network channel is closed");
+ tcpMetrics.channelInactive(ctx.channel());
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN);
final Status streamStatus;
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 53914b3c877..6585df42df3 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -34,6 +34,7 @@
import io.grpc.InternalLogId;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.grpc.MetricRecorder;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ConnectionClientTransport;
@@ -108,6 +109,7 @@ class NettyClientTransport implements ConnectionClientTransport,
private final ChannelLogger channelLogger;
private final boolean useGetForSafeMethods;
private final Ticker ticker;
+ private final MetricRecorder metricRecorder;
NettyClientTransport(
@@ -132,6 +134,7 @@ class NettyClientTransport implements ConnectionClientTransport,
LocalSocketPicker localSocketPicker,
ChannelLogger channelLogger,
boolean useGetForSafeMethods,
+ MetricRecorder metricRecorder,
Ticker ticker) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
@@ -159,6 +162,7 @@ class NettyClientTransport implements ConnectionClientTransport,
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.useGetForSafeMethods = useGetForSafeMethods;
+ this.metricRecorder = metricRecorder;
this.ticker = Preconditions.checkNotNull(ticker, "ticker");
}
@@ -251,7 +255,8 @@ public Runnable start(Listener transportListener) {
eagAttributes,
authorityString,
channelLogger,
- ticker);
+ ticker,
+ metricRecorder);
ChannelHandler negotiationHandler = negotiator.newHandler(handler);
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index 1cf67ea25ca..2bb6b2c5921 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -31,6 +31,7 @@
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
@@ -93,6 +94,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final int maxMessageSize;
private final int maxHeaderListSize;
private final int softLimitHeaderListSize;
+ private MetricRecorder metricRecorder;
private final long keepAliveTimeInNanos;
private final long keepAliveTimeoutInNanos;
private final long maxConnectionIdleInNanos;
@@ -136,8 +138,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
int maxRstCount, long maxRstPeriodNanos,
- Attributes eagAttributes, InternalChannelz channelz) {
+ Attributes eagAttributes, InternalChannelz channelz,
+ MetricRecorder metricRecorder) {
this.addresses = checkNotNull(addresses, "addresses");
+ this.metricRecorder = metricRecorder;
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap, Object>(channelOptions);
@@ -174,6 +178,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
String.valueOf(addresses));
}
+
@Override
public SocketAddress getListenSocketAddress() {
Iterator it = channelGroup.iterator();
@@ -272,7 +277,8 @@ public void initChannel(Channel ch) {
permitKeepAliveTimeInNanos,
maxRstCount,
maxRstPeriodNanos,
- eagAttributes);
+ eagAttributes,
+ metricRecorder);
ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index eb3a6d9b538..21e1a063700 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -32,6 +32,7 @@
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal;
+import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerStreamTracer;
@@ -164,8 +165,9 @@ public static NettyServerBuilder forAddress(SocketAddress address, ServerCredent
private final class NettyClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
public InternalServer buildClientTransportServers(
- List extends ServerStreamTracer.Factory> streamTracerFactories) {
- return buildTransportServers(streamTracerFactories);
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ io.grpc.MetricRecorder metricRecorder) {
+ return buildTransportServers(streamTracerFactories, metricRecorder);
}
}
@@ -703,8 +705,10 @@ void eagAttributes(Attributes eagAttributes) {
this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
}
+ @VisibleForTesting
NettyServer buildTransportServers(
- List extends ServerStreamTracer.Factory> streamTracerFactories) {
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ io.grpc.MetricRecorder metricRecorder) {
assertEventLoopsAndChannelType();
ProtocolNegotiator negotiator = protocolNegotiatorFactory.newNegotiator(
@@ -737,7 +741,8 @@ NettyServer buildTransportServers(
maxRstCount,
maxRstPeriodNanos,
eagAttributes,
- this.serverImplBuilder.getChannelz());
+ this.serverImplBuilder.getChannelz(),
+ metricRecorder);
}
@VisibleForTesting
@@ -760,6 +765,13 @@ NettyServerBuilder setTransportTracerFactory(TransportTracer.Factory transportTr
return this;
}
+ @CanIgnoreReturnValue
+ @Override
+ public NettyServerBuilder addMetricSink(MetricSink metricSink) {
+ serverImplBuilder.addMetricSink(metricSink);
+ return this;
+ }
+
@CanIgnoreReturnValue
@Override
public NettyServerBuilder useTransportSecurity(File certChain, File privateKey) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index 036fde55e2c..53b0f3e0dfd 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -42,6 +42,7 @@
import io.grpc.InternalMetadata;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
@@ -127,6 +128,7 @@ class NettyServerHandler extends AbstractNettyHandler {
private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
private final int maxMessageSize;
+ private final TcpMetrics.Tracker tcpMetrics;
private final long keepAliveTimeInNanos;
private final long keepAliveTimeoutInNanos;
private final long maxConnectionAgeInNanos;
@@ -174,7 +176,8 @@ static NettyServerHandler newHandler(
long permitKeepAliveTimeInNanos,
int maxRstCount,
long maxRstPeriodNanos,
- Attributes eagAttributes) {
+ Attributes eagAttributes,
+ MetricRecorder metricRecorder) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
maxHeaderListSize);
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
@@ -208,7 +211,8 @@ static NettyServerHandler newHandler(
maxRstCount,
maxRstPeriodNanos,
eagAttributes,
- Ticker.systemTicker());
+ Ticker.systemTicker(),
+ metricRecorder);
}
static NettyServerHandler newHandler(
@@ -234,7 +238,8 @@ static NettyServerHandler newHandler(
int maxRstCount,
long maxRstPeriodNanos,
Attributes eagAttributes,
- Ticker ticker) {
+ Ticker ticker,
+ MetricRecorder metricRecorder) {
Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
flowControlWindow);
@@ -294,7 +299,8 @@ static NettyServerHandler newHandler(
keepAliveEnforcer,
autoFlowControl,
rstStreamCounter,
- eagAttributes, ticker);
+ eagAttributes, ticker,
+ metricRecorder);
}
private NettyServerHandler(
@@ -318,7 +324,8 @@ private NettyServerHandler(
boolean autoFlowControl,
RstStreamCounter rstStreamCounter,
Attributes eagAttributes,
- Ticker ticker) {
+ Ticker ticker,
+ MetricRecorder metricRecorder) {
super(
channelUnused,
decoder,
@@ -362,6 +369,7 @@ public void onStreamClosed(Http2Stream stream) {
checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
this.maxMessageSize = maxMessageSize;
+ this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder);
this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionIdleManager = maxConnectionIdleManager;
@@ -661,8 +669,15 @@ void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
/**
* Handler for the Channel shutting down.
*/
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ tcpMetrics.channelActive(ctx.channel());
+ super.channelActive(ctx);
+ }
+
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ tcpMetrics.channelInactive(ctx.channel());
try {
if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
index 758ffeee5b1..c0e52b75876 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
@@ -25,6 +25,7 @@
import io.grpc.Attributes;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalLogId;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.ServerTransport;
@@ -81,6 +82,7 @@ class NettyServerTransport implements ServerTransport {
private final int maxRstCount;
private final long maxRstPeriodNanos;
private final Attributes eagAttributes;
+ private final MetricRecorder metricRecorder;
private final List extends ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer transportTracer;
@@ -105,7 +107,8 @@ class NettyServerTransport implements ServerTransport {
long permitKeepAliveTimeInNanos,
int maxRstCount,
long maxRstPeriodNanos,
- Attributes eagAttributes) {
+ Attributes eagAttributes,
+ MetricRecorder metricRecorder) {
this.channel = Preconditions.checkNotNull(channel, "channel");
this.channelUnused = channelUnused;
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
@@ -128,6 +131,7 @@ class NettyServerTransport implements ServerTransport {
this.maxRstCount = maxRstCount;
this.maxRstPeriodNanos = maxRstPeriodNanos;
this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
+ this.metricRecorder = metricRecorder;
SocketAddress remote = channel.remoteAddress();
this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null);
}
@@ -289,6 +293,7 @@ private NettyServerHandler createHandler(
permitKeepAliveTimeInNanos,
maxRstCount,
maxRstPeriodNanos,
- eagAttributes);
+ eagAttributes,
+ metricRecorder);
}
}
diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java
new file mode 100644
index 00000000000..0123c774771
--- /dev/null
+++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2026 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.netty;
+
+import io.grpc.InternalTcpMetrics;
+import io.grpc.MetricRecorder;
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.ScheduledFuture;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility for collecting TCP metrics from Netty channels.
+ */
+final class TcpMetrics {
+ private static final Logger log = Logger.getLogger(TcpMetrics.class.getName());
+
+ static EpollInfo epollInfo = loadEpollInfo();
+
+ static final class EpollInfo {
+ final Class> channelClass;
+ final Class> infoClass;
+ final java.lang.reflect.Constructor> infoConstructor;
+ final Method tcpInfo;
+ final Method totalRetrans;
+ final Method retransmits;
+ final Method rtt;
+
+ EpollInfo(
+ Class> channelClass,
+ Class> infoClass,
+ java.lang.reflect.Constructor> infoConstructor,
+ Method tcpInfo,
+ Method totalRetrans,
+ Method retransmits,
+ Method rtt) {
+ this.channelClass = channelClass;
+ this.infoClass = infoClass;
+ this.infoConstructor = infoConstructor;
+ this.tcpInfo = tcpInfo;
+ this.totalRetrans = totalRetrans;
+ this.retransmits = retransmits;
+ this.rtt = rtt;
+ }
+ }
+
+ private static EpollInfo loadEpollInfo() {
+ boolean epollAvailable = false;
+ try {
+ Class> epollClass = Class.forName("io.netty.channel.epoll.Epoll");
+ Method isAvailableMethod = epollClass.getDeclaredMethod("isAvailable");
+ epollAvailable = (Boolean) isAvailableMethod.invoke(null);
+ if (epollAvailable) {
+ Class> channelClass = Class.forName("io.netty.channel.epoll.EpollSocketChannel");
+ Class> infoClass = Class.forName("io.netty.channel.epoll.EpollTcpInfo");
+ return new EpollInfo(
+ channelClass,
+ infoClass,
+ infoClass.getDeclaredConstructor(),
+ channelClass.getMethod("tcpInfo", infoClass),
+ infoClass.getMethod("totalRetrans"),
+ infoClass.getMethod("retrans"),
+ infoClass.getMethod("rtt"));
+ }
+ } catch (ReflectiveOperationException | RuntimeException e) {
+ log.log(Level.FINE, "Failed to initialize Epoll tcp_info reflection", e);
+ } catch (Error e) {
+ log.log(Level.FINE, "Failed to load native Epoll library", e);
+ } finally {
+ log.log(Level.INFO, "Epoll available during static init of TcpMetrics:"
+ + "{0}", epollAvailable);
+ }
+ return null;
+ }
+
+ static final class Tracker {
+ private final MetricRecorder metricRecorder;
+ private final Object tcpInfo;
+
+ private long lastTotalRetrans = 0;
+
+ Tracker(MetricRecorder metricRecorder) {
+ this.metricRecorder = metricRecorder;
+
+ Object tcpInfo = null;
+ if (epollInfo != null) {
+ try {
+ tcpInfo = epollInfo.infoConstructor.newInstance();
+ } catch (ReflectiveOperationException e) {
+ log.log(Level.FINE, "Failed to instantiate EpollTcpInfo", e);
+ }
+ }
+ this.tcpInfo = tcpInfo;
+ }
+
+ private static final long RECORD_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(5);
+ private ScheduledFuture> reportTimer;
+
+ void channelActive(Channel channel) {
+ List labelValues = getLabelValues(channel);
+ metricRecorder.addLongCounter(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT, 1,
+ Collections.emptyList(), labelValues);
+ metricRecorder.addLongUpDownCounter(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT, 1,
+ Collections.emptyList(), labelValues);
+ scheduleNextReport(channel, true);
+ }
+
+ private void scheduleNextReport(final Channel channel, boolean isInitial) {
+ if (!channel.isActive()) {
+ return;
+ }
+
+ double jitter = isInitial
+ ? 0.1 + ThreadLocalRandom.current().nextDouble() // 10% to 110%
+ : 0.9 + ThreadLocalRandom.current().nextDouble() * 0.2; // 90% to 110%
+ long rearmingDelay = (long) (RECORD_INTERVAL_MILLIS * jitter);
+
+ reportTimer = channel.eventLoop().schedule(() -> {
+ if (channel.isActive()) {
+ Tracker.this.recordTcpInfo(channel, false);
+ scheduleNextReport(channel, false); // Re-arm
+ }
+ }, rearmingDelay, TimeUnit.MILLISECONDS);
+ }
+
+ void channelInactive(Channel channel) {
+ if (reportTimer != null) {
+ reportTimer.cancel(false);
+ }
+ List labelValues = getLabelValues(channel);
+ metricRecorder.addLongUpDownCounter(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT, -1,
+ Collections.emptyList(), labelValues);
+ // Final collection on close
+ recordTcpInfo(channel, true);
+ }
+
+ void recordTcpInfo(Channel channel) {
+ recordTcpInfo(channel, false);
+ }
+
+ private void recordTcpInfo(Channel channel, boolean isClose) {
+ if (epollInfo == null) {
+ log.log(Level.FINE, "Skipping recordTcpInfo because"
+ + "epollInfo is null");
+ return;
+ }
+ if (!epollInfo.channelClass.isInstance(channel)) {
+ log.log(Level.FINE, "Skipping recordTcpInfo because channel is not an"
+ + "instance of epollSocketChannelClass: {0}",
+ channel.getClass()
+ .getName());
+ return;
+ }
+ List labelValues = getLabelValues(channel);
+ long totalRetrans;
+ long retransmits;
+ long rtt;
+ try {
+ epollInfo.tcpInfo.invoke(channel, tcpInfo);
+ totalRetrans = (Long) epollInfo.totalRetrans.invoke(tcpInfo);
+ retransmits = (Long) epollInfo.retransmits.invoke(tcpInfo);
+ rtt = (Long) epollInfo.rtt.invoke(tcpInfo);
+ } catch (ReflectiveOperationException e) {
+ log.log(Level.FINE, "Error computing TCP metrics", e);
+ return;
+ }
+
+ long deltaTotal = totalRetrans - lastTotalRetrans;
+ if (deltaTotal > 0) {
+ metricRecorder.addLongCounter(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT,
+ deltaTotal, Collections.emptyList(), labelValues);
+ lastTotalRetrans = totalRetrans;
+ }
+ if (isClose && retransmits > 0) {
+ metricRecorder.addLongCounter(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT,
+ retransmits, Collections.emptyList(), labelValues);
+ }
+ metricRecorder.recordDoubleHistogram(InternalTcpMetrics.MIN_RTT_INSTRUMENT,
+ rtt / 1000000.0, // Convert microseconds to seconds
+ Collections.emptyList(), labelValues);
+ }
+ }
+
+ private static List getLabelValues(Channel channel) {
+ String localAddress = "";
+ String localPort = "";
+ String peerAddress = "";
+ String peerPort = "";
+
+ SocketAddress local = channel.localAddress();
+ if (local instanceof InetSocketAddress) {
+ InetSocketAddress inetLocal = (InetSocketAddress) local;
+ localAddress = inetLocal.getAddress().getHostAddress();
+ localPort = String.valueOf(inetLocal.getPort());
+ }
+
+ SocketAddress remote = channel.remoteAddress();
+ if (remote instanceof InetSocketAddress) {
+ InetSocketAddress inetRemote = (InetSocketAddress) remote;
+ peerAddress = inetRemote.getAddress().getHostAddress();
+ peerPort = String.valueOf(inetRemote.getPort());
+ }
+
+ return Arrays.asList(localAddress, localPort, peerAddress, peerPort);
+ }
+
+ private TcpMetrics() {
+ }
+}
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index 53598727efd..9f6be9a2f3e 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -57,6 +57,7 @@
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Metadata;
+import io.grpc.MetricRecorder;
import io.grpc.Status;
import io.grpc.internal.AbstractStream;
import io.grpc.internal.ClientStreamListener;
@@ -1165,7 +1166,8 @@ public Stopwatch get() {
Attributes.EMPTY,
"someauthority",
null,
- fakeClock().getTicker());
+ fakeClock().getTicker(),
+ new MetricRecorder() {});
}
@Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index db44c8f50fd..7023acc947c 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -56,6 +56,7 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.Status.Code;
@@ -228,30 +229,31 @@ public void setSoLingerChannelOption() throws IOException, GeneralSecurityExcept
// set SO_LINGER option
int soLinger = 123;
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
- NettyClientTransport transport =
- new NettyClientTransport(
- address,
- new ReflectiveChannelFactory<>(NioSocketChannel.class),
- channelOptions,
- group,
- newNegotiator(),
- false,
- DEFAULT_WINDOW_SIZE,
- DEFAULT_MAX_MESSAGE_SIZE,
- GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
- GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
- KEEPALIVE_TIME_NANOS_DISABLED,
- 1L,
- false,
- authority,
- null /* user agent */,
- tooManyPingsRunnable,
- new TransportTracer(),
- Attributes.EMPTY,
- new SocketPicker(),
- new FakeChannelLogger(),
- false,
- Ticker.systemTicker());
+ NettyClientTransport transport = new NettyClientTransport(
+ address,
+ new ReflectiveChannelFactory<>(NioSocketChannel.class),
+ channelOptions,
+ group,
+ newNegotiator(),
+ false,
+ DEFAULT_WINDOW_SIZE,
+ DEFAULT_MAX_MESSAGE_SIZE,
+ GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
+ GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
+ KEEPALIVE_TIME_NANOS_DISABLED,
+ 1L,
+ false,
+ authority,
+ null /* user agent */,
+ tooManyPingsRunnable,
+ new TransportTracer(),
+ Attributes.EMPTY,
+ new SocketPicker(),
+ new FakeChannelLogger(),
+ false,
+ new MetricRecorder() {
+ },
+ Ticker.systemTicker());
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
@@ -503,30 +505,31 @@ private static class CantConstructChannelError extends Error {}
public void failingToConstructChannelShouldFailGracefully() throws Exception {
address = TestUtils.testServerAddress(new InetSocketAddress(12345));
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
- NettyClientTransport transport =
- new NettyClientTransport(
- address,
- new ReflectiveChannelFactory<>(CantConstructChannel.class),
- new HashMap, Object>(),
- group,
- newNegotiator(),
- false,
- DEFAULT_WINDOW_SIZE,
- DEFAULT_MAX_MESSAGE_SIZE,
- GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
- GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
- KEEPALIVE_TIME_NANOS_DISABLED,
- 1,
- false,
- authority,
- null,
- tooManyPingsRunnable,
- new TransportTracer(),
- Attributes.EMPTY,
- new SocketPicker(),
- new FakeChannelLogger(),
- false,
- Ticker.systemTicker());
+ NettyClientTransport transport = new NettyClientTransport(
+ address,
+ new ReflectiveChannelFactory<>(CantConstructChannel.class),
+ new HashMap, Object>(),
+ group,
+ newNegotiator(),
+ false,
+ DEFAULT_WINDOW_SIZE,
+ DEFAULT_MAX_MESSAGE_SIZE,
+ GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
+ GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
+ KEEPALIVE_TIME_NANOS_DISABLED,
+ 1,
+ false,
+ authority,
+ null,
+ tooManyPingsRunnable,
+ new TransportTracer(),
+ Attributes.EMPTY,
+ new SocketPicker(),
+ new FakeChannelLogger(),
+ false,
+ new MetricRecorder() {
+ },
+ Ticker.systemTicker());
transports.add(transport);
// Should not throw
@@ -989,7 +992,7 @@ public void authorityOverrideInCallOptions_matchesServerPeerHost_newStreamCreati
new Rpc(transport, new Metadata(), "foo.test.google.fr").waitForResponse();
} finally {
- NettyClientHandler.enablePerRpcAuthorityCheck = false;;
+ NettyClientHandler.enablePerRpcAuthorityCheck = false;
}
}
@@ -1012,7 +1015,7 @@ public void authorityOverrideInCallOptions_portNumberInAuthority_isStrippedForPe
new Rpc(transport, new Metadata(), "foo.test.google.fr:12345").waitForResponse();
} finally {
- NettyClientHandler.enablePerRpcAuthorityCheck = false;;
+ NettyClientHandler.enablePerRpcAuthorityCheck = false;
}
}
@@ -1046,7 +1049,7 @@ public void authorityOverrideInCallOptions_portNumberAndIpv6_isStrippedForPeerVe
"No subject alternative names matching IP address 2001:db8:3333:4444:5555:6666:1.2.3.4 "
+ "found");
} finally {
- NettyClientHandler.enablePerRpcAuthorityCheck = false;;
+ NettyClientHandler.enablePerRpcAuthorityCheck = false;
}
}
@@ -1125,30 +1128,31 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max
if (!enableKeepAlive) {
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
- NettyClientTransport transport =
- new NettyClientTransport(
- address,
- channelFactory,
- new HashMap, Object>(),
- group,
- negotiator,
- false,
- DEFAULT_WINDOW_SIZE,
- maxMsgSize,
- maxHeaderListSize,
- maxHeaderListSize,
- keepAliveTimeNano,
- keepAliveTimeoutNano,
- false,
- authority,
- userAgent,
- tooManyPingsRunnable,
- new TransportTracer(),
- eagAttributes,
- new SocketPicker(),
- new FakeChannelLogger(),
- false,
- Ticker.systemTicker());
+ NettyClientTransport transport = new NettyClientTransport(
+ address,
+ channelFactory,
+ new HashMap, Object>(),
+ group,
+ negotiator,
+ false,
+ DEFAULT_WINDOW_SIZE,
+ maxMsgSize,
+ maxHeaderListSize,
+ maxHeaderListSize,
+ keepAliveTimeNano,
+ keepAliveTimeoutNano,
+ false,
+ authority,
+ userAgent,
+ tooManyPingsRunnable,
+ new TransportTracer(),
+ eagAttributes,
+ new SocketPicker(),
+ new FakeChannelLogger(),
+ false,
+ new MetricRecorder() {
+ },
+ Ticker.systemTicker());
transports.add(transport);
return transport;
}
@@ -1167,35 +1171,35 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize,
ServerListener serverListener) throws IOException {
- server =
- new NettyServer(
- TestUtils.testServerAddresses(new InetSocketAddress(0)),
- new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
- new HashMap, Object>(),
- new HashMap, Object>(),
- new FixedObjectPool<>(group),
- new FixedObjectPool<>(group),
- false,
- negotiator,
- Collections.emptyList(),
- TransportTracer.getDefaultFactory(),
- maxStreamsPerConnection,
- false,
- DEFAULT_WINDOW_SIZE,
- DEFAULT_MAX_MESSAGE_SIZE,
- maxHeaderListSize,
- maxHeaderListSize,
- DEFAULT_SERVER_KEEPALIVE_TIME_NANOS,
- DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
- MAX_CONNECTION_IDLE_NANOS_DISABLED,
- MAX_CONNECTION_AGE_NANOS_DISABLED,
- MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE,
- true,
- 0,
- MAX_RST_COUNT_DISABLED,
- 0,
- Attributes.EMPTY,
- channelz);
+ server = new NettyServer(
+ TestUtils.testServerAddresses(new InetSocketAddress(0)),
+ new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
+ new HashMap, Object>(),
+ new HashMap, Object>(),
+ new FixedObjectPool<>(group),
+ new FixedObjectPool<>(group),
+ false,
+ negotiator,
+ Collections.emptyList(),
+ TransportTracer.getDefaultFactory(),
+ maxStreamsPerConnection,
+ false,
+ DEFAULT_WINDOW_SIZE,
+ DEFAULT_MAX_MESSAGE_SIZE,
+ maxHeaderListSize,
+ maxHeaderListSize,
+ DEFAULT_SERVER_KEEPALIVE_TIME_NANOS,
+ DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
+ MAX_CONNECTION_IDLE_NANOS_DISABLED,
+ MAX_CONNECTION_AGE_NANOS_DISABLED,
+ MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE,
+ true,
+ 0,
+ MAX_RST_COUNT_DISABLED,
+ 0,
+ Attributes.EMPTY,
+ channelz,
+ new MetricRecorder() {});
server.start(serverListener);
address = TestUtils.testServerAddress((InetSocketAddress) server.getListenSocketAddress());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java
index 797cfa95c0e..f3b73a515b5 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java
@@ -22,7 +22,7 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
-import io.grpc.ServerStreamTracer;
+import io.grpc.MetricRecorder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.ssl.SslContext;
@@ -43,8 +43,9 @@ public class NettyServerBuilderTest {
@Test
public void addMultipleListenAddresses() {
builder.addListenAddress(new InetSocketAddress(8081));
- NettyServer server =
- builder.buildTransportServers(ImmutableList.of());
+ NettyServer server = builder.buildTransportServers(
+ ImmutableList.of(),
+ new MetricRecorder() {});
assertThat(server.getListenSocketAddresses()).hasSize(2);
}
@@ -189,4 +190,5 @@ public void useNioTransport_shouldNotThrow() {
builder.assertEventLoopsAndChannelType();
}
+
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
index 0d5a9bab176..2c0ab21cb56 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
@@ -59,6 +59,7 @@
import io.grpc.Attributes;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.Status.Code;
@@ -129,6 +130,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase streamListenerMessageQueue = new LinkedList<>();
@@ -205,6 +207,20 @@ protected void manualSetUp() throws Exception {
channel().releaseOutbound();
}
+ @Test
+ public void tcpMetrics_recorded() throws Exception {
+ manualSetUp();
+ handler().channelActive(ctx());
+ // Verify that channelActive triggered TcpMetrics
+ ArgumentCaptor countCaptor = ArgumentCaptor.forClass(Long.class);
+ verify(metricRecorder, atLeastOnce()).addLongCounter(
+ eq(io.grpc.InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT),
+ countCaptor.capture(),
+ any(),
+ any());
+ assertEquals(1L, countCaptor.getValue().longValue());
+ }
+
@Test
public void transportReadyDelayedUntilConnectionPreface() throws Exception {
initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
@@ -1416,7 +1432,8 @@ protected NettyServerHandler newHandler() {
maxRstCount,
maxRstPeriodNanos,
Attributes.EMPTY,
- fakeClock().getTicker());
+ fakeClock().getTicker(),
+ metricRecorder);
}
@Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index f9bda4c5af1..61c3f9e219e 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -37,6 +37,7 @@
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.Metadata;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ServerListener;
@@ -161,7 +162,7 @@ class NoHandlerProtocolNegotiator implements ProtocolNegotiator {
0,
0, // ignore
Attributes.EMPTY,
- channelz);
+ channelz, mock(MetricRecorder.class));
final SettableFuture serverShutdownCalled = SettableFuture.create();
ns.start(new ServerListener() {
@Override
@@ -218,7 +219,7 @@ public void multiPortStartStopGet() throws Exception {
0,
0, // ignore
Attributes.EMPTY,
- channelz);
+ channelz, mock(MetricRecorder.class));
final SettableFuture shutdownCompleted = SettableFuture.create();
ns.start(new ServerListener() {
@Override
@@ -298,7 +299,7 @@ public void multiPortConnections() throws Exception {
0,
0, // ignore
Attributes.EMPTY,
- channelz);
+ channelz, mock(MetricRecorder.class));
final SettableFuture shutdownCompleted = SettableFuture.create();
ns.start(new ServerListener() {
@Override
@@ -366,7 +367,7 @@ public void getPort_notStarted() {
0,
0, // ignore
Attributes.EMPTY,
- channelz);
+ channelz, mock(MetricRecorder.class));
assertThat(ns.getListenSocketAddress()).isEqualTo(addr);
assertThat(ns.getListenSocketAddresses()).isEqualTo(addresses);
@@ -447,7 +448,7 @@ class TestProtocolNegotiator implements ProtocolNegotiator {
0,
0, // ignore
eagAttributes,
- channelz);
+ channelz, mock(MetricRecorder.class));
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
@@ -501,7 +502,7 @@ public void channelzListenSocket() throws Exception {
0,
0, // ignore
Attributes.EMPTY,
- channelz);
+ channelz, mock(MetricRecorder.class));
final SettableFuture shutdownCompleted = SettableFuture.create();
ns.start(new ServerListener() {
@Override
@@ -649,7 +650,7 @@ private NettyServer getServer(List addr, EventLoopGroup ev) {
0,
0, // ignore
Attributes.EMPTY,
- channelz);
+ channelz, mock(MetricRecorder.class));
}
private static class NoopServerTransportListener implements ServerTransportListener {
diff --git a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
index b779dfbe980..22758a8b727 100644
--- a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
@@ -22,6 +22,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.AbstractTransportTest;
@@ -71,7 +72,7 @@ protected InternalServer newServer(
.forAddress(new InetSocketAddress("localhost", 0))
.flowControlWindow(AbstractTransportTest.TEST_FLOW_CONTROL_WINDOW)
.setTransportTracerFactory(fakeClockTransportTracer)
- .buildTransportServers(streamTracerFactories);
+ .buildTransportServers(streamTracerFactories, new MetricRecorder() {});
}
@Override
@@ -81,7 +82,7 @@ protected InternalServer newServer(
.forAddress(new InetSocketAddress("localhost", port))
.flowControlWindow(AbstractTransportTest.TEST_FLOW_CONTROL_WINDOW)
.setTransportTracerFactory(fakeClockTransportTracer)
- .buildTransportServers(streamTracerFactories);
+ .buildTransportServers(streamTracerFactories, new MetricRecorder() {});
}
@Override
diff --git a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java
index 80438532172..403b1b64329 100644
--- a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java
+++ b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java
@@ -46,6 +46,7 @@
import io.grpc.InternalChannelz;
import io.grpc.InternalChannelz.Security;
import io.grpc.Metadata;
+import io.grpc.MetricRecorder;
import io.grpc.SecurityLevel;
import io.grpc.ServerCredentials;
import io.grpc.ServerStreamTracer;
@@ -389,7 +390,9 @@ private Object expectHandshake(
.buildTransportFactory();
InternalServer server = NettyServerBuilder
.forPort(0, serverCreds)
- .buildTransportServers(Collections.emptyList());
+ .buildTransportServers(
+ Collections.emptyList(),
+ new MetricRecorder() {});
server.start(serverListener);
ManagedClientTransport.Listener clientTransportListener =
diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java
new file mode 100644
index 00000000000..874025369ee
--- /dev/null
+++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java
@@ -0,0 +1,579 @@
+/*
+ * Copyright 2026 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.netty;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import io.grpc.InternalTcpMetrics;
+import io.grpc.MetricRecorder;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.ScheduledFuture;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class TcpMetricsTest {
+
+ @Rule
+ public final MockitoRule mocks = MockitoJUnit.rule();
+
+ @Mock
+ private MetricRecorder metricRecorder;
+ @Mock
+ private Channel channel;
+ @Mock
+ private EventLoop eventLoop;
+ @Mock
+ private ScheduledFuture> scheduledFuture;
+
+ private TcpMetrics.Tracker metrics;
+
+ @Before
+ public void setUp() throws Exception {
+ when(channel.eventLoop()).thenReturn(eventLoop);
+ when(eventLoop.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> scheduledFuture);
+ metrics = new TcpMetrics.Tracker(metricRecorder);
+ }
+
+ @Test
+ public void metricsInitialization() throws Exception {
+
+ org.junit.Assert.assertNotNull(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT);
+ org.junit.Assert.assertNotNull(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT);
+ org.junit.Assert.assertNotNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT);
+ org.junit.Assert.assertNotNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT);
+ org.junit.Assert.assertNotNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT);
+ }
+
+ public static class FakeEpollTcpInfo {
+ long totalRetrans;
+ long retransmits;
+ long rtt;
+
+ public void setValues(long totalRetrans, long retransmits, long rtt) {
+ this.totalRetrans = totalRetrans;
+ this.retransmits = retransmits;
+ this.rtt = rtt;
+ }
+
+ @SuppressWarnings("unused")
+ public long totalRetrans() {
+ return totalRetrans;
+ }
+
+ @SuppressWarnings("unused")
+ public long retrans() {
+ return retransmits;
+ }
+
+ @SuppressWarnings("unused")
+ public long rtt() {
+ return rtt;
+ }
+ }
+
+ @Test
+ public void tracker_recordTcpInfo_reflectionSuccess() throws Exception {
+ MetricRecorder recorder = mock(MetricRecorder.class);
+ TcpMetrics.epollInfo = new TcpMetrics.EpollInfo(
+ ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class,
+ FakeEpollTcpInfo.class.getConstructor(),
+ ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class),
+ FakeEpollTcpInfo.class.getMethod("totalRetrans"),
+ FakeEpollTcpInfo.class.getMethod("retrans"),
+ FakeEpollTcpInfo.class.getMethod("rtt"));
+ TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder);
+
+ FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo();
+ infoSource.setValues(123, 4, 5000);
+ ConfigurableFakeWithTcpInfo channel = new ConfigurableFakeWithTcpInfo(infoSource);
+ channel.writeInbound("dummy");
+
+ tracker.channelInactive(channel);
+
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(123L), any(), any());
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)),
+ eq(4L), any(), any());
+ verify(recorder).recordDoubleHistogram(
+ eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)),
+ eq(0.005), any(), any());
+ }
+
+ @Test
+ public void tracker_periodicRecord_doesNotRecordRecurringRetransmits() throws Exception {
+ MetricRecorder recorder = mock(MetricRecorder.class);
+ TcpMetrics.epollInfo = new TcpMetrics.EpollInfo(
+ ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class,
+ FakeEpollTcpInfo.class.getConstructor(),
+ ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class),
+ FakeEpollTcpInfo.class.getMethod("totalRetrans"),
+ FakeEpollTcpInfo.class.getMethod("retrans"),
+ FakeEpollTcpInfo.class.getMethod("rtt"));
+ TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder);
+
+ FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo();
+ infoSource.setValues(123, 4, 5000);
+ ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy(
+ new ConfigurableFakeWithTcpInfo(infoSource));
+ when(channel.eventLoop()).thenReturn(eventLoop);
+ when(channel.isActive()).thenReturn(true);
+
+ tracker.channelActive(channel);
+
+ ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS));
+ Runnable periodicTask = runnableCaptor.getValue();
+
+ org.mockito.Mockito.clearInvocations(recorder);
+ periodicTask.run();
+
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(123L), any(), any());
+ verify(recorder).recordDoubleHistogram(
+ eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)),
+ eq(0.005), any(), any());
+ // Should NOT record recurring retransmits during periodic polling
+ verify(recorder, org.mockito.Mockito.never())
+ .addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)),
+ anyLong(), any(), any());
+ }
+
+ @Test
+ public void tracker_channelInactive_recordsRecurringRetransmits_raw_notDelta() throws Exception {
+ MetricRecorder recorder = mock(MetricRecorder.class);
+ TcpMetrics.epollInfo = new TcpMetrics.EpollInfo(
+ ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class,
+ FakeEpollTcpInfo.class.getConstructor(),
+ ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class),
+ FakeEpollTcpInfo.class.getMethod("totalRetrans"),
+ FakeEpollTcpInfo.class.getMethod("retrans"),
+ FakeEpollTcpInfo.class.getMethod("rtt"));
+ TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder);
+
+ FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo();
+ infoSource.setValues(123, 4, 5000);
+ ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy(
+ new ConfigurableFakeWithTcpInfo(infoSource));
+ when(channel.eventLoop()).thenReturn(eventLoop);
+ when(channel.isActive()).thenReturn(true);
+
+ // Mimic the periodic schedule invocation
+ tracker.channelActive(channel);
+ ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS));
+
+ // Fire periodic task once. TotalRetrans=123, retransmits=4.
+ runnableCaptor.getValue().run();
+
+ org.mockito.Mockito.clearInvocations(recorder);
+
+ // Let's just create a new channel instance where tcpInfo sets retrans=5.
+ FakeEpollTcpInfo infoSource2 = new FakeEpollTcpInfo();
+ infoSource2.setValues(130, 5, 5000);
+ ConfigurableFakeWithTcpInfo channel2 = org.mockito.Mockito.spy(
+ new ConfigurableFakeWithTcpInfo(infoSource2));
+ when(channel2.eventLoop()).thenReturn(eventLoop);
+
+ tracker.channelInactive(channel2);
+
+ // It should record delta for totalRetrans (130 - 123 = 7)
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(7L), any(), any());
+ // But for recurringRetransmits it MUST record the raw value 5, not the delta!
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)),
+ eq(5L), any(), any());
+ }
+
+ @Test
+ public void tracker_periodicRecord_reportsDeltaForTotalRetrans() throws Exception {
+ MetricRecorder recorder = mock(MetricRecorder.class);
+ TcpMetrics.epollInfo = new TcpMetrics.EpollInfo(
+ ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class,
+ FakeEpollTcpInfo.class.getConstructor(),
+ ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class),
+ FakeEpollTcpInfo.class.getMethod("totalRetrans"),
+ FakeEpollTcpInfo.class.getMethod("retrans"),
+ FakeEpollTcpInfo.class.getMethod("rtt"));
+ TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder);
+
+ FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo();
+ infoSource.setValues(123, 4, 5000);
+ ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy(
+ new ConfigurableFakeWithTcpInfo(infoSource));
+ when(channel.eventLoop()).thenReturn(eventLoop);
+ when(channel.isActive()).thenReturn(true);
+
+ // Initial Active Trigger
+ tracker.channelActive(channel);
+ ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS));
+ Runnable periodicTask = runnableCaptor.getValue();
+
+ // First periodic record
+ org.mockito.Mockito.clearInvocations(recorder);
+ periodicTask.run();
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(123L), any(), any());
+
+ // Change tcpInfo for second periodic record
+ org.mockito.Mockito.doAnswer(invocation -> {
+ FakeEpollTcpInfo info = invocation.getArgument(0);
+ info.totalRetrans = 150;
+ info.retransmits = 2; // Should not be recorded
+ info.rtt = 6000;
+ return null;
+ }).when(channel).tcpInfo(any(FakeEpollTcpInfo.class));
+
+ org.mockito.Mockito.clearInvocations(recorder);
+ periodicTask.run();
+
+ // Only the delta (150 - 123 = 27) should be recorded
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(27L), any(), any());
+ verify(recorder).recordDoubleHistogram(
+ eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)),
+ eq(0.006), any(), any());
+ verify(recorder, org.mockito.Mockito.never())
+ .addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)),
+ anyLong(), any(), any());
+ }
+
+ @Test
+ public void tracker_periodicRecord_doesNotReportZeroDeltaForTotalRetrans() throws Exception {
+ MetricRecorder recorder = mock(MetricRecorder.class);
+ TcpMetrics.epollInfo = new TcpMetrics.EpollInfo(
+ ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class,
+ FakeEpollTcpInfo.class.getConstructor(),
+ ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class),
+ FakeEpollTcpInfo.class.getMethod("totalRetrans"),
+ FakeEpollTcpInfo.class.getMethod("retrans"),
+ FakeEpollTcpInfo.class.getMethod("rtt"));
+ TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder);
+
+ FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo();
+ infoSource.setValues(123, 4, 5000);
+ ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy(
+ new ConfigurableFakeWithTcpInfo(infoSource));
+ when(channel.eventLoop()).thenReturn(eventLoop);
+ when(channel.isActive()).thenReturn(true);
+
+ // Initial Active Trigger
+ tracker.channelActive(channel);
+ ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS));
+ Runnable periodicTask = runnableCaptor.getValue();
+
+ // First periodic record
+ periodicTask.run();
+ org.mockito.Mockito.clearInvocations(recorder);
+
+ // Keep tcpInfo the same for second periodic record
+ periodicTask.run();
+
+ // NO delta (123 - 123 = 0), so it should not be recorded
+ verify(recorder, org.mockito.Mockito.never())
+ .addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ anyLong(), any(), any());
+ verify(recorder).recordDoubleHistogram(
+ eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)),
+ eq(0.005), any(), any());
+ }
+
+ public static class ConfigurableFakeWithTcpInfo extends
+ io.netty.channel.embedded.EmbeddedChannel {
+ private final FakeEpollTcpInfo infoToCopy;
+
+ public ConfigurableFakeWithTcpInfo(FakeEpollTcpInfo infoToCopy) {
+ this.infoToCopy = infoToCopy;
+ }
+
+ public void tcpInfo(FakeEpollTcpInfo info) {
+ info.totalRetrans = infoToCopy.totalRetrans;
+ info.retransmits = infoToCopy.retransmits;
+ info.rtt = infoToCopy.rtt;
+ }
+ }
+
+ @Test
+ public void tracker_reportsDeltas_correctly() throws Exception {
+ MetricRecorder recorder = mock(MetricRecorder.class);
+
+ TcpMetrics.epollInfo = new TcpMetrics.EpollInfo(
+ ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class,
+ FakeEpollTcpInfo.class.getConstructor(),
+ ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class),
+ FakeEpollTcpInfo.class.getMethod("totalRetrans"),
+ FakeEpollTcpInfo.class.getMethod("retrans"),
+ FakeEpollTcpInfo.class.getMethod("rtt"));
+ TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder);
+
+ FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo();
+ ConfigurableFakeWithTcpInfo channel = new ConfigurableFakeWithTcpInfo(infoSource);
+
+ // 10 retransmits total
+ infoSource.setValues(10, 2, 1000);
+ tracker.recordTcpInfo(channel);
+
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(10L), any(), any());
+
+ // 15 retransmits total (delta 5)
+ infoSource.setValues(15, 0, 1000);
+ tracker.recordTcpInfo(channel);
+
+ verify(recorder).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(5L), any(), any());
+
+ // 15 retransmits total (delta 0) - should NOT report
+ // also set retransmits to 1
+ infoSource.setValues(15, 1, 1000);
+ tracker.recordTcpInfo(channel);
+ // Verify no new interactions with this specific metric and value
+ // We can't easily verify "no interaction" for specific value without capturing.
+ verify(recorder, org.mockito.Mockito.times(1)).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(10L), any(), any());
+ verify(recorder, org.mockito.Mockito.times(1)).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ eq(5L), any(), any());
+ // Total interactions for packetsRetransmitted should be 2
+ verify(recorder, org.mockito.Mockito.times(2)).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)),
+ anyLong(), any(), any());
+
+ // recurringRetransmits should NOT have been reported yet (periodic calls)
+ verify(recorder, org.mockito.Mockito.times(0)).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)),
+ anyLong(), any(), any());
+
+ // Close channel - should report recurringRetransmits
+ tracker.channelInactive(channel);
+ verify(recorder, org.mockito.Mockito.times(1)).addLongCounter(
+ eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)),
+ eq(1L), // From last infoSource setValues(15, 1, 1000)
+ any(), any());
+ }
+
+ @Test
+ public void tracker_recordTcpInfo_reflectionFailure() throws Exception {
+ MetricRecorder recorder = mock(MetricRecorder.class);
+
+ TcpMetrics.epollInfo = null;
+ TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder);
+
+ Channel channel = org.mockito.Mockito.mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+
+ // Should catch exception and ignore
+ tracker.channelInactive(channel);
+ }
+
+ @Test
+ public void registeredMetrics_haveCorrectOptionalLabels() throws Exception {
+ List expectedOptionalLabels = Arrays.asList(
+ "network.local.address",
+ "network.local.port",
+ "network.peer.address",
+ "network.peer.port");
+
+ org.junit.Assert.assertEquals(
+ expectedOptionalLabels,
+ InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT.getOptionalLabelKeys());
+ org.junit.Assert.assertEquals(
+ expectedOptionalLabels,
+ InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT.getOptionalLabelKeys());
+
+ if (InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT != null) {
+ org.junit.Assert.assertEquals(
+ expectedOptionalLabels,
+ Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)
+ .getOptionalLabelKeys());
+ org.junit.Assert.assertEquals(
+ expectedOptionalLabels,
+ Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)
+ .getOptionalLabelKeys());
+ org.junit.Assert.assertEquals(
+ expectedOptionalLabels,
+ Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT).getOptionalLabelKeys());
+ }
+ }
+
+ @Test
+ public void channelActive_extractsLabels_ipv4() throws Exception {
+
+ InetAddress localInet = InetAddress.getByAddress(new byte[] { 127, 0, 0, 1 });
+ InetAddress remoteInet = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 });
+ when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080));
+ when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443));
+
+ metrics.channelActive(channel);
+
+ verify(metricRecorder).addLongCounter(
+ eq(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT), eq(1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList(
+ localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443")));
+ verify(metricRecorder).addLongUpDownCounter(
+ eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList(
+ localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443")));
+ verifyNoMoreInteractions(metricRecorder);
+ }
+
+ @Test
+ public void channelInactive_extractsLabels_ipv6() throws Exception {
+
+ InetAddress localInet = InetAddress.getByAddress(
+ new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 });
+ InetAddress remoteInet = InetAddress.getByAddress(
+ new byte[] { 32, 1, 13, -72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 });
+ when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080));
+ when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443));
+
+ metrics.channelInactive(channel);
+
+ verify(metricRecorder).addLongUpDownCounter(
+ eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(-1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList(
+ localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443")));
+ verifyNoMoreInteractions(metricRecorder);
+ }
+
+ @Test
+ public void channelActive_extractsLabels_nonInetAddress() throws Exception {
+ SocketAddress dummyAddress = new SocketAddress() {
+ };
+ when(channel.localAddress()).thenReturn(dummyAddress);
+ when(channel.remoteAddress()).thenReturn(dummyAddress);
+
+ metrics.channelActive(channel);
+
+ verify(metricRecorder).addLongCounter(
+ eq(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT), eq(1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList("", "", "", "")));
+ verify(metricRecorder).addLongUpDownCounter(
+ eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList("", "", "", "")));
+ verifyNoMoreInteractions(metricRecorder);
+ }
+
+ @Test
+ public void channelActive_incrementsCounts() throws Exception {
+ metrics.channelActive(channel);
+ verify(metricRecorder).addLongCounter(
+ eq(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT), eq(1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList("", "", "", "")));
+ verify(metricRecorder).addLongUpDownCounter(
+ eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList("", "", "", "")));
+ verifyNoMoreInteractions(metricRecorder);
+ }
+
+ @Test
+ public void channelInactive_decrementsCount_noEpoll_noError() throws Exception {
+ metrics.channelInactive(channel);
+ verify(metricRecorder).addLongUpDownCounter(
+ eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(-1L),
+ eq(Collections.emptyList()),
+ eq(Arrays.asList("", "", "", "")));
+ verifyNoMoreInteractions(metricRecorder);
+ }
+
+ @Test
+ public void channelActive_schedulesReportTimer() throws Exception {
+ when(channel.isActive()).thenReturn(true);
+ metrics.channelActive(channel);
+
+ ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class);
+ verify(eventLoop).schedule(
+ runnableCaptor.capture(), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS));
+
+ Runnable task = runnableCaptor.getValue();
+ long delay = delayCaptor.getValue();
+
+ // Default RECORD_INTERVAL_MILLIS is 5 minutes (300,000 ms)
+ // Initial jitter is 10% to 110%, so 30,000 ms to 330,000 ms
+ org.junit.Assert.assertTrue("Delay should be >= 30000 but was " + delay, delay >= 30_000);
+ org.junit.Assert.assertTrue("Delay should be <= 330000 but was " + delay, delay <= 330_000);
+
+ // Run the task to verify rescheduling
+ task.run();
+
+ verify(eventLoop, org.mockito.Mockito.times(2))
+ .schedule(any(Runnable.class), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS));
+
+ // Re-arming jitter is 90% to 110%, so 270,000 ms to 330,000 ms
+ long rearmDelay = delayCaptor.getValue();
+ org.junit.Assert.assertTrue(
+ "Delay should be >= 270000 but was " + rearmDelay, rearmDelay >= 270_000);
+ org.junit.Assert.assertTrue(
+ "Delay should be <= 330000 but was " + rearmDelay, rearmDelay <= 330_000);
+ }
+
+ @Test
+ public void channelInactive_cancelsReportTimer() throws Exception {
+ when(channel.isActive()).thenReturn(true);
+ metrics.channelActive(channel);
+
+ metrics.channelInactive(channel);
+
+ verify(scheduledFuture).cancel(false);
+ }
+}
diff --git a/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java
index 78a409a3f85..0032972756d 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java
@@ -17,6 +17,7 @@
package io.grpc.okhttp;
import io.grpc.Internal;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.InternalServer;
import io.grpc.internal.TransportTracer;
@@ -29,7 +30,8 @@
@Internal
public final class InternalOkHttpServerBuilder {
public static InternalServer buildTransportServers(OkHttpServerBuilder builder,
- List extends ServerStreamTracer.Factory> streamTracerFactories) {
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ MetricRecorder metricRecorder) {
return builder.buildTransportServers(streamTracerFactories);
}
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
index 8daeed42a8c..163d2023b1c 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
@@ -27,6 +27,7 @@
import io.grpc.ForwardingServerBuilder;
import io.grpc.InsecureServerCredentials;
import io.grpc.Internal;
+import io.grpc.MetricRecorder;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerStreamTracer;
@@ -111,7 +112,15 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
return new OkHttpServerBuilder(address, result.factory);
}
- final ServerImplBuilder serverImplBuilder = new ServerImplBuilder(this::buildTransportServers);
+ final ServerImplBuilder serverImplBuilder = new ServerImplBuilder(
+ new ServerImplBuilder.ClientTransportServersBuilder() {
+ @Override
+ public InternalServer buildClientTransportServers(
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ MetricRecorder metricRecorder) {
+ return buildTransportServers(streamTracerFactories);
+ }
+ });
final SocketAddress listenAddress;
final HandshakerSocketFactory handshakerSocketFactory;
TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
index 076eea3349a..9317ca96639 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
@@ -17,6 +17,7 @@
package io.grpc.okhttp;
import io.grpc.InsecureServerCredentials;
+import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.ClientTransportFactory;
@@ -58,11 +59,12 @@ protected InternalServer newServer(
@Override
protected InternalServer newServer(
int port, List streamTracerFactories) {
- return OkHttpServerBuilder
+ OkHttpServerBuilder builder = OkHttpServerBuilder
.forPort(port, InsecureServerCredentials.create())
.flowControlWindow(AbstractTransportTest.TEST_FLOW_CONTROL_WINDOW)
- .setTransportTracerFactory(fakeClockTransportTracer)
- .buildTransportServers(streamTracerFactories);
+ .setTransportTracerFactory(fakeClockTransportTracer);
+ return InternalOkHttpServerBuilder
+ .buildTransportServers(builder, streamTracerFactories, new MetricRecorder() {});
}
@Override
diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java
index 6904340ac74..87ad61c9f27 100644
--- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java
+++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java
@@ -196,6 +196,7 @@ public void configureServerBuilder(ServerBuilder> serverBuilder) {
serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor());
}
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
+ serverBuilder.addMetricSink(sink);
}
@VisibleForTesting
diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java
index 16cb02c61c2..f0bd6f93098 100644
--- a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java
+++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java
@@ -26,7 +26,6 @@
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
@@ -98,6 +97,7 @@ public void buildTracer() {
grpcOpenTelemetry.configureServerBuilder(mockServerBuiler);
verify(mockServerBuiler, times(2)).addStreamTracerFactory(any());
verify(mockServerBuiler).intercept(any());
+ verify(mockServerBuiler).addMetricSink(any());
verifyNoMoreInteractions(mockServerBuiler);
ManagedChannelBuilder> mockChannelBuilder = mock(ManagedChannelBuilder.class);
@@ -121,7 +121,6 @@ public void builderDefaults() {
.build());
assertThat(module.getEnableMetrics()).isEmpty();
assertThat(module.getOptionalLabels()).isEmpty();
- assertThat(module.getSink()).isInstanceOf(MetricSink.class);
assertThat(module.getTracer()).isSameInstanceAs(noopOpenTelemetry
.getTracerProvider()
diff --git a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java
index 58143a8516c..59936cdd485 100644
--- a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java
+++ b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java
@@ -59,7 +59,7 @@ public class JettyTransportTest extends AbstractTransportTest {
protected InternalServer newServer(List streamTracerFactories) {
return new InternalServer() {
final InternalServer delegate =
- new ServletServerBuilder().buildTransportServers(streamTracerFactories);
+ new ServletServerBuilder().buildTransportServers(streamTracerFactories);
@Override
public void start(ServerListener listener) throws IOException {
diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java
index aee25de01ad..5bea4c6e03b 100644
--- a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java
+++ b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java
@@ -78,7 +78,9 @@ public final class ServletServerBuilder extends ForwardingServerBuilder
+ buildTransportServers(streamTracerFactories));
}
/**
diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
index 196d51fb5a6..06bd66008f4 100644
--- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
+++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
@@ -1052,18 +1052,18 @@ private static final class BootstrappingXdsClientPool implements XdsClientPool {
private final XdsClientPoolFactory xdsClientPoolFactory;
private final String target;
private final @Nullable Map bootstrapOverride;
- private final @Nullable MetricRecorder metricRecorder;
+ private final MetricRecorder metricRecorder;
private ObjectPool xdsClientPool;
BootstrappingXdsClientPool(
XdsClientPoolFactory xdsClientPoolFactory,
String target,
@Nullable Map bootstrapOverride,
- @Nullable MetricRecorder metricRecorder) {
+ MetricRecorder metricRecorder) {
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.target = checkNotNull(target, "target");
this.bootstrapOverride = bootstrapOverride;
- this.metricRecorder = metricRecorder;
+ this.metricRecorder = checkNotNull(metricRecorder, "metricRecorder");
}
@Override