diff --git a/docs/asciidoc/gRPC.adoc b/docs/asciidoc/gRPC.adoc
new file mode 100644
index 0000000000..39cb94822c
--- /dev/null
+++ b/docs/asciidoc/gRPC.adoc
@@ -0,0 +1,108 @@
+== gRPC
+
+The `jooby-grpc` module provides first-class, native support for https://grpc.io/[gRPC].
+
+Unlike traditional setups that require spinning up a separate gRPC server on a different port (often forcing a specific transport like Netty), this module embeds the `grpc-java` engine directly into Jooby.
+
+By using a custom native bridge, it allows you to run strictly-typed gRPC services alongside your standard REST API routes on the **exact same port**. It bypasses the standard HTTP/1.1 pipeline in favor of a highly optimized, native interceptor tailored for HTTP/2 multiplexing, reactive backpressure, and zero-copy byte framing. It works natively across Undertow, Netty, and Jetty.
+
+=== Dependency
+
+[source, xml, role="primary"]
+.Maven
+----
+
+ io.jooby
+ jooby-grpc
+ ${jooby.version}
+
+----
+
+[source, gradle, role="secondary"]
+.Gradle
+----
+implementation 'io.jooby:jooby-grpc:${jooby.version}'
+----
+
+=== Usage
+
+gRPC strictly requires HTTP/2. Before installing the module, ensure your application is configured to use a supported server with HTTP/2 enabled.
+
+[source, java]
+----
+import io.jooby.Jooby;
+import io.jooby.ServerOptions;
+import io.jooby.grpc.GrpcModule;
+
+public class App extends Jooby {
+ {
+ setServerOptions(new ServerOptions().setHttp2(true)); // <1>
+
+ install(new GrpcModule( // <2>
+ new GreeterService()
+ ));
+
+ get("/api/health", ctx -> "OK"); // <3>
+ }
+
+ public static void main(String[] args) {
+ runApp(args, App::new);
+ }
+}
+----
+<1> Enable HTTP/2 on your server.
+<2> Install the module and explicitly register your services.
+<3> Standard REST routes still work on the exact same port!
+
+=== Dependency Injection
+
+If your gRPC services require external dependencies (like database repositories), you can register the service classes instead of pre-instantiated objects. The module will automatically provision them using your active Dependency Injection framework (e.g., Guice, Spring).
+
+[source, java]
+----
+import io.jooby.Jooby;
+import io.jooby.di.GuiceModule;
+import io.jooby.grpc.GrpcModule;
+
+public class App extends Jooby {
+ {
+ install(new GuiceModule());
+
+ install(new GrpcModule(
+ GreeterService.class // <1>
+ ));
+ }
+}
+----
+<1> Pass the class references. The DI framework will instantiate them.
+
+WARNING: gRPC services are registered as **Singletons**. Ensure your service implementations are thread-safe and do not hold request-scoped state in instance variables. Heavy blocking operations will safely run on background workers, protecting the native server's I/O event loops.
+
+=== Server Reflection
+
+If you want to use tools like `grpcurl` or Postman to interact with your services without providing the `.proto` files, you can easily enable gRPC Server Reflection.
+
+Include the `grpc-services` dependency in your build, and register the v1 reflection service alongside your own:
+
+[source, java]
+----
+import io.grpc.protobuf.services.ProtoReflectionServiceV1;
+
+public class App extends Jooby {
+ {
+ install(new GrpcModule(
+ new GreeterService(),
+ ProtoReflectionServiceV1.newInstance() // <1>
+ ));
+ }
+}
+----
+<1> Enables the modern `v1` reflection protocol for maximum compatibility with gRPC clients.
+
+=== Routing & Fallbacks
+
+The gRPC module intercepts requests natively before they reach Jooby's standard router.
+
+If a client attempts to call a gRPC method that does not exist, the request gracefully falls through to the standard Jooby router, returning a native `404 Not Found` (which gRPC clients will automatically translate to a Status `12 UNIMPLEMENTED`).
+
+If you misconfigure your server (e.g., attempting to run gRPC over HTTP/1.1), the fallback route will catch the request and throw an `IllegalStateException` to help you identify the missing configuration immediately.
diff --git a/jooby/src/main/java/io/jooby/GrpcExchange.java b/jooby/src/main/java/io/jooby/GrpcExchange.java
new file mode 100644
index 0000000000..7e3a068a06
--- /dev/null
+++ b/jooby/src/main/java/io/jooby/GrpcExchange.java
@@ -0,0 +1,83 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Server-agnostic abstraction for a native HTTP/2 gRPC exchange.
+ *
+ *
This interface bridges the gap between the underlying web server (Undertow, Netty, or Jetty)
+ * and the reactive gRPC processor. Because gRPC heavily relies on HTTP/2 multiplexing, asynchronous
+ * I/O, and trailing headers (trailers) to communicate status codes, standard HTTP/1.1 context
+ * abstractions are insufficient.
+ *
+ *
Native server interceptors wrap their respective request/response objects into this interface,
+ * allowing the {@link GrpcProcessor} to read headers, push zero-copy byte frames, and finalize the
+ * stream with standard gRPC trailers without knowing which server engine is actually running.
+ */
+public interface GrpcExchange {
+
+ /**
+ * Retrieves the requested URI path.
+ *
+ *
In gRPC, this dictates the routing and strictly follows the pattern: {@code
+ * /Fully.Qualified.ServiceName/MethodName}.
+ *
+ * @return The exact path of the incoming HTTP/2 request.
+ */
+ String getRequestPath();
+
+ /**
+ * Retrieves the value of the specified HTTP request header.
+ *
+ * @param name The name of the header (case-insensitive).
+ * @return The header value, or {@code null} if the header is not present.
+ */
+ @Nullable String getHeader(String name);
+
+ /**
+ * Retrieves all HTTP request headers.
+ *
+ * @return A map containing all headers provided by the client.
+ */
+ Map getHeaders();
+
+ /**
+ * Writes a gRPC-framed byte payload to the underlying non-blocking socket.
+ *
+ * This method must push the buffer to the native network layer without blocking the invoking
+ * thread (which is typically a background gRPC worker). The implementation is responsible for
+ * translating the ByteBuffer into the server's native data format and flushing it over the
+ * network.
+ *
+ * @param payload The properly framed 5-byte-prefixed gRPC payload to send.
+ * @param onFailure A callback invoked immediately if the asynchronous network write fails (e.g.,
+ * if the client abruptly disconnects or the channel is closed).
+ */
+ void send(ByteBuffer payload, Consumer onFailure);
+
+ /**
+ * Closes the HTTP/2 stream by appending the mandatory gRPC trailing headers.
+ *
+ * In the gRPC specification, a successful response or an application-level error is
+ * communicated not by standard HTTP status codes (which are always 200 OK), but by appending
+ * HTTP/2 trailers ({@code grpc-status} and {@code grpc-message}) at the very end of the stream.
+ *
+ *
Calling this method informs the native server to write those trailing headers and formally
+ * close the bidirectional stream.
+ *
+ * @param statusCode The gRPC integer status code (e.g., {@code 0} for OK, {@code 12} for
+ * UNIMPLEMENTED, {@code 4} for DEADLINE_EXCEEDED).
+ * @param description An optional, human-readable status message detailing the result or error.
+ * May be {@code null}.
+ */
+ void close(int statusCode, @Nullable String description);
+}
diff --git a/jooby/src/main/java/io/jooby/GrpcProcessor.java b/jooby/src/main/java/io/jooby/GrpcProcessor.java
new file mode 100644
index 0000000000..25b1bfdd41
--- /dev/null
+++ b/jooby/src/main/java/io/jooby/GrpcProcessor.java
@@ -0,0 +1,51 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Flow;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Core Service Provider Interface (SPI) for the gRPC extension.
+ *
+ *
This processor acts as the bridge between the native HTTP/2 web servers and the embedded gRPC
+ * engine. It is designed to intercept and process gRPC exchanges at the lowest possible network
+ * level, completely bypassing Jooby's standard HTTP/1.1 routing pipeline. This architecture ensures
+ * strict HTTP/2 specification compliance, zero-copy buffering, and reactive backpressure.
+ */
+public interface GrpcProcessor {
+
+ /**
+ * Checks if the given URI path exactly matches a registered gRPC method.
+ *
+ *
Native server interceptors use this method as a lightweight, fail-fast guard. If this
+ * returns {@code true}, the server will hijack the request and upgrade it to a native gRPC
+ * stream. If {@code false}, the request safely falls through to the standard Jooby router
+ * (typically resulting in a standard HTTP 404 Not Found, which gRPC clients gracefully translate
+ * to Status 12 UNIMPLEMENTED).
+ *
+ * @param path The incoming request path (e.g., {@code /fully.qualified.Service/MethodName}).
+ * @return {@code true} if the path is mapped to an active gRPC service; {@code false} otherwise.
+ */
+ boolean isGrpcMethod(String path);
+
+ /**
+ * Initiates the reactive gRPC pipeline for an incoming HTTP/2 request.
+ *
+ *
When a valid gRPC request is intercepted, the native server wraps the underlying network
+ * connection into a {@link GrpcExchange} and passes it to this method. The processor uses this
+ * exchange to asynchronously send response headers, payload byte frames, and HTTP/2 trailers.
+ *
+ * @param exchange The native server exchange representing the bidirectional HTTP/2 stream.
+ * @return A reactive {@link Flow.Subscriber} that the native server must feed incoming request
+ * payload {@link ByteBuffer} chunks into. Returns {@code null} if the exchange was rejected.
+ * @throws IllegalStateException If an unregistered path bypasses the {@link
+ * #isGrpcMethod(String)} guard.
+ */
+ Flow.Subscriber process(@NonNull GrpcExchange exchange);
+}
diff --git a/jooby/src/main/java/io/jooby/ServerOptions.java b/jooby/src/main/java/io/jooby/ServerOptions.java
index 5ea3ab2a31..7e084053f4 100644
--- a/jooby/src/main/java/io/jooby/ServerOptions.java
+++ b/jooby/src/main/java/io/jooby/ServerOptions.java
@@ -290,7 +290,7 @@ public int getPort() {
* @return True when SSL is enabled. Either bc the secure port, httpsOnly or SSL options are set.
*/
public boolean isSSLEnabled() {
- return securePort != null || ssl != null || httpsOnly;
+ return securePort != null || ssl != null || http2 == Boolean.TRUE || httpsOnly;
}
/**
diff --git a/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java b/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java
index 34f1e07d9b..599bf7a6db 100644
--- a/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java
+++ b/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java
@@ -5,14 +5,15 @@
*/
package io.jooby.internal.apt;
-import javax.lang.model.element.*;
+import static io.jooby.internal.apt.AnnotationSupport.findAnnotationValue;
+import static io.jooby.internal.apt.Types.BUILT_IN;
+import static java.util.stream.Collectors.joining;
+
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Stream;
-import static io.jooby.internal.apt.AnnotationSupport.findAnnotationValue;
-import static io.jooby.internal.apt.Types.BUILT_IN;
-import static java.util.stream.Collectors.joining;
+import javax.lang.model.element.*;
public enum ParameterGenerator {
ContextParam("getAttribute", "io.jooby.annotation.ContextParam", "jakarta.ws.rs.core.Context") {
@@ -440,11 +441,13 @@ protected String defaultValue(VariableElement parameter, AnnotationMirror annota
var sources = findAnnotationValue(annotation, AnnotationSupport.VALUE);
return sources.isEmpty() ? "" : CodeBlock.of(", ", CodeBlock.string(sources.getFirst()));
} else if (annotationType.startsWith("jakarta.ws.rs")) {
- var defaultValueAnnotation = AnnotationSupport.findAnnotationByName(
- parameter, "jakarta.ws.rs.DefaultValue");
+ var defaultValueAnnotation =
+ AnnotationSupport.findAnnotationByName(parameter, "jakarta.ws.rs.DefaultValue");
if (defaultValueAnnotation != null) {
var defaultValue = findAnnotationValue(defaultValueAnnotation, AnnotationSupport.VALUE);
- return defaultValue.isEmpty() ? "" : CodeBlock.of(", ", CodeBlock.string(defaultValue.getFirst()));
+ return defaultValue.isEmpty()
+ ? ""
+ : CodeBlock.of(", ", CodeBlock.string(defaultValue.getFirst()));
}
}
return "";
diff --git a/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java b/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java
index aadae957bd..3dd6c500e0 100644
--- a/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java
+++ b/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java
@@ -8,8 +8,8 @@
import io.jooby.annotation.GET;
import io.jooby.annotation.Path;
import jakarta.ws.rs.DefaultValue;
-import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.FormParam;
+import jakarta.ws.rs.QueryParam;
@Path("/3761")
public class C3761Jakarta {
diff --git a/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java b/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java
index 963f350c74..8cb4621f31 100644
--- a/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java
+++ b/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java
@@ -5,10 +5,11 @@
*/
package tests.i3761;
-import io.jooby.apt.ProcessorRunner;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import io.jooby.apt.ProcessorRunner;
public class Issue3761 {
@Test
@@ -26,11 +27,8 @@ public void shouldGenerateJakartaDefaultValues() throws Exception {
private static void assertSourceCodeRespectDefaultValues(String source) {
assertTrue(source.contains("return c.number(ctx.query(\"num\", \"5\").intValue());"));
assertTrue(source.contains("return c.unset(ctx.query(\"unset\").valueOrNull());"));
- assertTrue(
- source.contains("return c.emptySet(ctx.query(\"emptySet\", \"\").value());"));
- assertTrue(
- source.contains("return c.string(ctx.query(\"stringVal\", \"Hello\").value());"));
- assertTrue(
- source.contains("return c.bool(ctx.form(\"boolVal\", \"false\").booleanValue());"));
+ assertTrue(source.contains("return c.emptySet(ctx.query(\"emptySet\", \"\").value());"));
+ assertTrue(source.contains("return c.string(ctx.query(\"stringVal\", \"Hello\").value());"));
+ assertTrue(source.contains("return c.bool(ctx.form(\"boolVal\", \"false\").booleanValue());"));
}
}
diff --git a/modules/jooby-bom/pom.xml b/modules/jooby-bom/pom.xml
index 5d332b25cc..19e5a36c97 100644
--- a/modules/jooby-bom/pom.xml
+++ b/modules/jooby-bom/pom.xml
@@ -115,6 +115,11 @@
jooby-graphql
${project.version}
+
+ io.jooby
+ jooby-grpc
+ ${project.version}
+
io.jooby
jooby-gson
diff --git a/modules/jooby-grpc/pom.xml b/modules/jooby-grpc/pom.xml
new file mode 100644
index 0000000000..f215d25fab
--- /dev/null
+++ b/modules/jooby-grpc/pom.xml
@@ -0,0 +1,67 @@
+
+
+
+ 4.0.0
+
+
+ io.jooby
+ modules
+ 4.0.17-SNAPSHOT
+
+ jooby-grpc
+ jooby-grpc
+
+
+
+ io.jooby
+ jooby
+ ${jooby.version}
+
+
+
+ org.slf4j
+ jul-to-slf4j
+ ${slf4j.version}
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-inprocess
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-services
+ ${grpc.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
+ org.jacoco
+ org.jacoco.agent
+ runtime
+ test
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
diff --git a/modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcModule.java b/modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcModule.java
new file mode 100644
index 0000000000..93f0a0aa32
--- /dev/null
+++ b/modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcModule.java
@@ -0,0 +1,194 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.grpc;
+
+import java.util.*;
+
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import io.grpc.BindableService;
+import io.grpc.MethodDescriptor;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.jooby.*;
+import io.jooby.internal.grpc.DefaultGrpcProcessor;
+
+/**
+ * Native gRPC extension for Jooby. *
+ *
+ * This module allows you to run strictly-typed gRPC services alongside standard Jooby HTTP
+ * routes on the exact same port. It completely bypasses standard HTTP/1.1 pipelines in favor of a
+ * highly optimized, reactive, native interceptor tailored for HTTP/2 multiplexing and trailing
+ * headers.
+ *
+ *
Usage
+ *
+ * gRPC requires HTTP/2. Ensure your Jooby application is configured to use a supported server
+ * with HTTP/2 enabled.
+ *
+ *
{@code
+ * import io.jooby.Jooby;
+ * import io.jooby.ServerOptions;
+ * import io.jooby.grpc.GrpcModule;
+ * * public class App extends Jooby {
+ * {
+ * setServerOptions(new ServerOptions().setHttp2(true).setSecurePort(8443));
+ * * // Install the extension and register your services
+ * install(new GrpcModule(new GreeterService()));
+ * }
+ * }
+ * }
+ *
+ * Dependency Injection
+ *
+ * If your gRPC services require external dependencies (like repositories or configuration), you
+ * can register the service classes instead of instances. The module will automatically provision
+ * them using Jooby's DI registry (e.g., Guice, Spring) during the application startup phase.
+ *
+ *
{@code
+ * public class App extends Jooby {
+ * {
+ * install(new GuiceModule());
+ * * // Pass the class reference. Guice will instantiate it!
+ * install(new GrpcModule(GreeterService.class));
+ * }
+ * }
+ * }
+ *
+ * *
+ *
+ * Note: gRPC services are inherently registered as Singletons. Ensure your
+ * service implementations are thread-safe and do not hold request-scoped state in instance
+ * variables.
+ *
+ *
Logging
+ *
+ * gRPC internally uses {@code java.util.logging}. This module automatically installs the {@link
+ * SLF4JBridgeHandler} to redirect all internal gRPC logs to your configured SLF4J backend.
+ */
+public class GrpcModule implements Extension {
+ private final List services = new ArrayList<>();
+ private final List> serviceClasses = new ArrayList<>();
+
+ static {
+ // Optionally remove existing handlers attached to the j.u.l root logger
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ // Install the SLF4J bridge
+ SLF4JBridgeHandler.install();
+ }
+
+ /**
+ * Creates a new gRPC module with pre-instantiated service objects. * @param services One or more
+ * fully instantiated gRPC services.
+ */
+ public GrpcModule(BindableService... services) {
+ this.services.addAll(Arrays.asList(services));
+ }
+
+ /**
+ * Creates a new gRPC module with service classes to be provisioned via Dependency Injection.
+ * * @param serviceClasses One or more gRPC service classes to be resolved from the Jooby
+ * registry.
+ */
+ @SafeVarargs
+ public GrpcModule(Class extends BindableService>... serviceClasses) {
+ bind(serviceClasses);
+ }
+
+ /**
+ * Registers additional gRPC service classes to be provisioned via Dependency Injection. * @param
+ * serviceClasses One or more gRPC service classes to be resolved from the Jooby registry.
+ *
+ * @return This module for chaining.
+ */
+ @SafeVarargs
+ public final GrpcModule bind(Class extends BindableService>... serviceClasses) {
+ this.serviceClasses.addAll(List.of(serviceClasses));
+ return this;
+ }
+
+ /**
+ * Installs the gRPC extension into the Jooby application. *
+ *
+ * This method sets up the {@link GrpcProcessor} SPI, registers native fallback routes, and
+ * defers DI resolution and the starting of the embedded in-process gRPC server to the {@code
+ * onStarting} lifecycle hook. * @param app The target Jooby application.
+ *
+ * @throws Exception If an error occurs during installation.
+ */
+ @Override
+ public void install(@NonNull Jooby app) throws Exception {
+ var serverName = app.getName();
+ var builder = InProcessServerBuilder.forName(serverName);
+ final Map> registry = new HashMap<>();
+
+ // 1. Register user-provided services
+ for (var service : services) {
+ bindService(app, builder, registry, service);
+ }
+
+ var services = app.getServices();
+ var processor = new DefaultGrpcProcessor(registry);
+ services.put(GrpcProcessor.class, processor);
+
+ // Lazy init service from DI.
+ app.onStarting(
+ () -> {
+ for (Class extends BindableService> serviceClass : serviceClasses) {
+ var service = app.require(serviceClass);
+ bindService(app, builder, registry, service);
+ }
+ var grpcServer = builder.build().start();
+
+ // KEEP .directExecutor() here!
+ // This ensures that when the background gRPC worker finishes, it instantly pushes
+ // the response back to Undertow/Netty without wasting time on another thread hop.
+ var channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
+ processor.setChannel(channel);
+
+ app.onStop(channel::shutdownNow);
+ app.onStop(grpcServer::shutdownNow);
+ });
+ }
+
+ /**
+ * Internal helper to register a service with the gRPC builder, extract its method descriptors,
+ * and map a fail-fast route in the Jooby router.
+ *
+ * @param app The target Jooby application.
+ * @param server The in-process server builder.
+ * @param registry The method descriptor registry.
+ * @param service The provisioned gRPC service to bind.
+ */
+ private static void bindService(
+ Jooby app,
+ InProcessServerBuilder server,
+ Map> registry,
+ BindableService service) {
+ server.addService(service);
+ for (var method : service.bindService().getMethods()) {
+ var descriptor = method.getMethodDescriptor();
+ String methodFullName = descriptor.getFullMethodName();
+ registry.put(methodFullName, descriptor);
+ String routePath = "/" + methodFullName;
+
+ // Map a fallback route. If a request hits this, it means the native SPI interceptor
+ // failed to upgrade the request, typically due to a missing HTTP/2 configuration.
+ app.post(
+ routePath,
+ ctx -> {
+ throw new IllegalStateException(
+ "gRPC request reached the standard HTTP router for: "
+ + routePath
+ + ". "
+ + "This means the native gRPC server interceptor was bypassed. "
+ + "Ensure you are running with HTTP/2 enabled, "
+ + "and that the GrpcProcessor SPI is correctly loaded.");
+ });
+ }
+ }
+}
diff --git a/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/DefaultGrpcProcessor.java b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/DefaultGrpcProcessor.java
new file mode 100644
index 0000000000..16205a9b2d
--- /dev/null
+++ b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/DefaultGrpcProcessor.java
@@ -0,0 +1,238 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.grpc;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import io.grpc.CallOptions;
+import io.grpc.ManagedChannel;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientCalls;
+import io.grpc.stub.ClientResponseObserver;
+import io.jooby.GrpcExchange;
+import io.jooby.GrpcProcessor;
+
+public class DefaultGrpcProcessor implements GrpcProcessor {
+
+ // Minimal Marshaller to pass raw bytes through the bridge
+ private static class RawMarshaller implements MethodDescriptor.Marshaller {
+ @Override
+ public InputStream stream(byte[] value) {
+ return new ByteArrayInputStream(value);
+ }
+
+ @Override
+ public byte[] parse(InputStream stream) {
+ try {
+ return stream.readAllBytes();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private ManagedChannel channel;
+ private final Map> registry;
+
+ public DefaultGrpcProcessor(Map> registry) {
+ this.registry = registry;
+ }
+
+ public void setChannel(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public boolean isGrpcMethod(String path) {
+ // gRPC paths typically come in as "/package.Service/Method"
+ // Our registry stores them as "package.Service/Method"
+ String methodName = path.startsWith("/") ? path.substring(1) : path;
+
+ // Quick O(1) hash map lookup
+ return registry.get(methodName) != null;
+ }
+
+ @Override
+ public @NonNull Flow.Subscriber process(@NonNull GrpcExchange exchange) {
+ // Route paths: /{package.Service}/{Method}
+ String path = exchange.getRequestPath();
+ // Remove the leading slash to match the gRPC method registry format
+ var descriptor = registry.get(path.substring(1));
+
+ if (descriptor == null) {
+ // MUST never occur, it is guarded by {@link #isGrpcMethod}
+ throw new IllegalStateException(
+ "Unregistered gRPC method: '"
+ + path
+ + "'. "
+ + "This request bypassed the GrpcProcessor.isGrpcMethod() guard, "
+ + "indicating a bug or misconfiguration in the native server interceptor.");
+ }
+
+ var method =
+ MethodDescriptor.newBuilder()
+ .setType(descriptor.getType())
+ .setFullMethodName(descriptor.getFullMethodName())
+ .setRequestMarshaller(new RawMarshaller())
+ .setResponseMarshaller(new RawMarshaller())
+ .build();
+
+ var callOptions = extractCallOptions(exchange);
+ var metadata = extractMetadata(exchange);
+
+ var interceptedChannel =
+ io.grpc.ClientInterceptors.intercept(
+ channel, io.grpc.stub.MetadataUtils.newAttachHeadersInterceptor(metadata));
+
+ var call = interceptedChannel.newCall(method, callOptions);
+ var isFinished = new AtomicBoolean(false);
+
+ boolean isUnaryOrServerStreaming =
+ method.getType() == MethodDescriptor.MethodType.UNARY
+ || method.getType() == MethodDescriptor.MethodType.SERVER_STREAMING;
+
+ var requestBridge = new GrpcRequestBridge(call, method.getType());
+
+ var responseObserver =
+ new ClientResponseObserver() {
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver requestStream) {
+ if (!isUnaryOrServerStreaming) {
+ // requestStream.disableAutoInboundFlowControl();
+ // Wire the readiness callback securely to the bridge
+ requestStream.setOnReadyHandler(requestBridge::onGrpcReady);
+ requestBridge.setRequestObserver(requestStream);
+ }
+ }
+
+ @Override
+ public void onNext(byte[] value) {
+ if (isFinished.get()) return;
+
+ ByteBuffer framed = addGrpcHeader(value);
+
+ exchange.send(
+ framed,
+ cause -> {
+ if (cause != null) {
+ onError(cause);
+ }
+ });
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (isFinished.compareAndSet(false, true)) {
+ log.debug("gRPC stream error", t);
+ Status status = Status.fromThrowable(t);
+ exchange.close(status.getCode().value(), status.getDescription());
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ if (isFinished.compareAndSet(false, true)) {
+ exchange.close(Status.OK.getCode().value(), null);
+ }
+ }
+ };
+
+ // 2. Inject the observer to break the circular dependency
+ requestBridge.setResponseObserver(responseObserver);
+
+ if (!isUnaryOrServerStreaming) {
+ ClientCalls.asyncBidiStreamingCall(call, responseObserver);
+ }
+
+ return requestBridge;
+ }
+
+ /** Extracts the grpc-timeout header and applies it to CallOptions. */
+ private CallOptions extractCallOptions(GrpcExchange exchange) {
+ CallOptions options = CallOptions.DEFAULT;
+ String timeout = exchange.getHeader("grpc-timeout");
+
+ if (timeout == null || timeout.isEmpty()) {
+ return options;
+ }
+
+ try {
+ var unit = timeout.charAt(timeout.length() - 1);
+ var value = Long.parseLong(timeout.substring(0, timeout.length() - 1));
+
+ var timeUnit =
+ switch (unit) {
+ case 'H' -> java.util.concurrent.TimeUnit.HOURS;
+ case 'M' -> java.util.concurrent.TimeUnit.MINUTES;
+ case 'S' -> java.util.concurrent.TimeUnit.SECONDS;
+ case 'm' -> java.util.concurrent.TimeUnit.MILLISECONDS;
+ case 'u' -> java.util.concurrent.TimeUnit.MICROSECONDS;
+ case 'n' -> java.util.concurrent.TimeUnit.NANOSECONDS;
+ default -> null;
+ };
+
+ if (timeUnit != null) {
+ options = options.withDeadlineAfter(value, timeUnit);
+ }
+ } catch (Exception e) {
+ log.debug("Failed to parse grpc-timeout header: {}", timeout);
+ }
+
+ return options;
+ }
+
+ /** Maps standard HTTP headers from the GrpcExchange into gRPC Metadata. */
+ private io.grpc.Metadata extractMetadata(GrpcExchange exchange) {
+ var metadata = new io.grpc.Metadata();
+
+ for (var header : exchange.getHeaders().entrySet()) {
+ var key = header.getKey().toLowerCase();
+
+ if (key.startsWith(":")
+ || key.startsWith("grpc-")
+ || key.equals("content-type")
+ || key.equals("te")) {
+ continue;
+ }
+
+ if (key.endsWith("-bin")) {
+ io.grpc.Metadata.Key metaKey =
+ io.grpc.Metadata.Key.of(key, io.grpc.Metadata.BINARY_BYTE_MARSHALLER);
+ byte[] decoded = java.util.Base64.getDecoder().decode(header.getValue());
+ metadata.put(metaKey, decoded);
+ } else {
+ io.grpc.Metadata.Key metaKey =
+ io.grpc.Metadata.Key.of(key, io.grpc.Metadata.ASCII_STRING_MARSHALLER);
+ metadata.put(metaKey, header.getValue());
+ }
+ }
+ return metadata;
+ }
+
+ /** Prepends the 5-byte gRPC header and returns a ready-to-write ByteBuffer. */
+ private ByteBuffer addGrpcHeader(byte[] payload) {
+ var buffer = ByteBuffer.allocate(5 + payload.length);
+ buffer.put((byte) 0); // Compressed flag (0 = none)
+ buffer.putInt(payload.length);
+ buffer.put(payload);
+ buffer.flip(); // Prepare the buffer for reading by the server socket
+ return buffer;
+ }
+}
diff --git a/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcDeframer.java b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcDeframer.java
new file mode 100644
index 0000000000..e011293c0f
--- /dev/null
+++ b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcDeframer.java
@@ -0,0 +1,65 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.grpc;
+
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+public class GrpcDeframer {
+ private enum State {
+ HEADER,
+ PAYLOAD
+ }
+
+ private State state = State.HEADER;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(5);
+ private ByteBuffer payloadBuffer;
+
+ /** Processes a chunk of data directly from the server's native ByteBuffer. */
+ public void process(ByteBuffer input, Consumer onMessage) {
+ while (input.hasRemaining()) {
+ if (state == State.HEADER) {
+ int toRead = Math.min(headerBuffer.remaining(), input.remaining());
+
+ // Bulk read into header buffer
+ int oldLimit = input.limit();
+ input.limit(input.position() + toRead);
+ headerBuffer.put(input);
+ input.limit(oldLimit);
+
+ if (!headerBuffer.hasRemaining()) {
+ headerBuffer.flip();
+ headerBuffer.get(); // skip compressed flag
+ int length = headerBuffer.getInt();
+
+ if (length == 0) {
+ onMessage.accept(new byte[0]);
+ headerBuffer.clear();
+ } else {
+ payloadBuffer = ByteBuffer.allocate(length);
+ state = State.PAYLOAD;
+ }
+ }
+ } else if (state == State.PAYLOAD) {
+ int toRead = Math.min(payloadBuffer.remaining(), input.remaining());
+
+ // Bulk read into payload buffer
+ int oldLimit = input.limit();
+ input.limit(input.position() + toRead);
+ payloadBuffer.put(input);
+ input.limit(oldLimit);
+
+ if (!payloadBuffer.hasRemaining()) {
+ // The full gRPC message is assembled. Emit it.
+ onMessage.accept(payloadBuffer.array());
+ headerBuffer.clear();
+ payloadBuffer = null;
+ state = State.HEADER;
+ }
+ }
+ }
+ }
+}
diff --git a/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcRequestBridge.java b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcRequestBridge.java
new file mode 100644
index 0000000000..4a65e74bd3
--- /dev/null
+++ b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcRequestBridge.java
@@ -0,0 +1,121 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.grpc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ClientCall;
+import io.grpc.MethodDescriptor;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientCalls;
+import io.grpc.stub.ClientResponseObserver;
+
+public class GrpcRequestBridge implements Subscriber {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final GrpcDeframer deframer = new GrpcDeframer();
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+
+ private final ClientCall call;
+ private final MethodDescriptor.MethodType methodType;
+ private final boolean isUnaryOrServerStreaming;
+
+ private ClientResponseObserver responseObserver;
+ private ClientCallStreamObserver requestObserver;
+ private Subscription subscription;
+ private byte[] singlePayload;
+
+ public GrpcRequestBridge(
+ ClientCall call, MethodDescriptor.MethodType methodType) {
+ this.call = call;
+ this.methodType = methodType;
+ this.isUnaryOrServerStreaming =
+ methodType == MethodDescriptor.MethodType.UNARY
+ || methodType == MethodDescriptor.MethodType.SERVER_STREAMING;
+ }
+
+ public void setResponseObserver(ClientResponseObserver responseObserver) {
+ this.responseObserver = responseObserver;
+ }
+
+ public void setRequestObserver(ClientCallStreamObserver requestObserver) {
+ this.requestObserver = requestObserver;
+ }
+
+ public void onGrpcReady() {
+ if (subscription != null
+ && requestObserver != null
+ && requestObserver.isReady()
+ && !completed.get()) {
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ // Initial demand to kick off the network body reader
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(ByteBuffer item) {
+ try {
+ deframer.process(
+ item,
+ msg -> {
+ if (isUnaryOrServerStreaming) {
+ singlePayload = msg;
+ } else {
+ requestObserver.onNext(msg);
+ }
+ });
+
+ if (isUnaryOrServerStreaming) {
+ subscription.request(1); // Keep reading until EOF for unary/server-streaming
+ } else if (requestObserver != null && requestObserver.isReady()) {
+ subscription.request(1); // Ask for more if the streaming gRPC buffer is ready
+ }
+ } catch (Throwable t) {
+ subscription.cancel();
+ onError(t);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (completed.compareAndSet(false, true)) {
+ log.error("Error in gRPC request stream", throwable);
+ if (requestObserver != null) {
+ requestObserver.onError(throwable);
+ } else if (responseObserver != null) {
+ responseObserver.onError(throwable);
+ }
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (completed.compareAndSet(false, true)) {
+ if (isUnaryOrServerStreaming) {
+ byte[] payload = singlePayload == null ? new byte[0] : singlePayload;
+ if (methodType == MethodDescriptor.MethodType.UNARY) {
+ ClientCalls.asyncUnaryCall(call, payload, responseObserver);
+ } else {
+ ClientCalls.asyncServerStreamingCall(call, payload, responseObserver);
+ }
+ } else if (requestObserver != null) {
+ requestObserver.onCompleted();
+ }
+ }
+ }
+}
diff --git a/modules/jooby-grpc/src/test/java/io/jooby/grpc/DefaultGrpcProcessorTest.java b/modules/jooby-grpc/src/test/java/io/jooby/grpc/DefaultGrpcProcessorTest.java
new file mode 100644
index 0000000000..940ef8acfc
--- /dev/null
+++ b/modules/jooby-grpc/src/test/java/io/jooby/grpc/DefaultGrpcProcessorTest.java
@@ -0,0 +1,221 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.grpc;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.Flow;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.jooby.GrpcExchange;
+import io.jooby.internal.grpc.DefaultGrpcProcessor;
+import io.jooby.internal.grpc.GrpcRequestBridge;
+
+public class DefaultGrpcProcessorTest {
+
+ private ManagedChannel channel;
+ private Map> registry;
+ private GrpcExchange exchange;
+ private ClientCall call;
+ private DefaultGrpcProcessor bridge;
+
+ @BeforeEach
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ channel = mock(ManagedChannel.class);
+ registry = mock(Map.class);
+ exchange = mock(GrpcExchange.class);
+ call = mock(ClientCall.class);
+
+ // The interceptor wraps the channel, but eventually delegates to the real one
+ when(channel.newCall(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(call);
+
+ bridge = new DefaultGrpcProcessor(registry);
+ bridge.setChannel(channel);
+ }
+
+ @Test
+ @DisplayName(
+ "Should throw IllegalStateException if an unknown method bypasses the isGrpcMethod guard")
+ public void shouldRejectUnknownMethod() {
+ when(exchange.getRequestPath()).thenReturn("/unknown.Service/Method");
+ when(registry.get("unknown.Service/Method")).thenReturn(null);
+
+ // Assert that the bridge correctly identifies the illegal state
+ IllegalStateException exception =
+ org.junit.jupiter.api.Assertions.assertThrows(
+ IllegalStateException.class, () -> bridge.process(exchange));
+
+ // Ensure the exchange wasn't manipulated or closed, because the framework
+ // should crash the thread instead of trying to gracefully close a gRPC stream.
+ verify(exchange, org.mockito.Mockito.never()).close(anyInt(), any());
+ }
+
+ @Test
+ @DisplayName("Should successfully bridge a valid Bidi-Streaming gRPC call")
+ public void shouldProcessValidStreamingCall() {
+ setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING);
+
+ Flow.Subscriber subscriber = bridge.process(exchange);
+
+ assertNotNull(subscriber);
+ assertTrue(subscriber instanceof GrpcRequestBridge);
+
+ // Verify call was actually created
+ verify(channel).newCall(any(MethodDescriptor.class), any(CallOptions.class));
+ }
+
+ @Test
+ @DisplayName("Should parse grpc-timeout header into CallOptions deadline")
+ public void shouldParseGrpcTimeout() {
+ setupValidMethod("test.Chat/TimeoutCall", MethodDescriptor.MethodType.UNARY);
+
+ // 1000m = 1000 milliseconds
+ when(exchange.getHeader("grpc-timeout")).thenReturn("1000m");
+
+ bridge.process(exchange);
+
+ ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(CallOptions.class);
+ verify(channel).newCall(any(MethodDescriptor.class), optionsCaptor.capture());
+
+ CallOptions options = optionsCaptor.getValue();
+ assertNotNull(options.getDeadline());
+ assertTrue(options.getDeadline().isExpired() == false, "Deadline should be in the future");
+ }
+
+ @Test
+ @DisplayName("Should correctly deframe and send gRPC payload to the client")
+ public void shouldFrameAndSendResponsePayload() {
+ setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING);
+ bridge.process(exchange);
+
+ // Capture the internal listener that gRPC uses to push data back to us
+ ArgumentCaptor> listenerCaptor =
+ ArgumentCaptor.forClass(ClientCall.Listener.class);
+ verify(call).start(listenerCaptor.capture(), any(Metadata.class));
+ ClientCall.Listener responseListener = listenerCaptor.getValue();
+
+ // Simulate the server pushing a payload back
+ byte[] serverResponse = "hello".getBytes();
+ responseListener.onMessage(serverResponse);
+
+ // Verify our bridge framed it with the 5-byte header and sent it to Jooby
+ ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
+ verify(exchange).send(bufferCaptor.capture(), any());
+
+ ByteBuffer framedBuffer = bufferCaptor.getValue();
+ assertEquals(5 + serverResponse.length, framedBuffer.limit());
+ assertEquals((byte) 0, framedBuffer.get(), "Compressed flag should be 0");
+ assertEquals(serverResponse.length, framedBuffer.getInt(), "Length should match payload");
+
+ byte[] capturedPayload = new byte[serverResponse.length];
+ framedBuffer.get(capturedPayload);
+ assertArrayEquals(serverResponse, capturedPayload);
+ }
+
+ @Test
+ @DisplayName("Should close exchange with HTTP/2 trailing status when server throws error")
+ public void shouldCloseExchangeOnError() {
+ setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING);
+ bridge.process(exchange);
+
+ ArgumentCaptor> listenerCaptor =
+ ArgumentCaptor.forClass(ClientCall.Listener.class);
+ verify(call).start(listenerCaptor.capture(), any(Metadata.class));
+ ClientCall.Listener responseListener = listenerCaptor.getValue();
+
+ // Simulate an internal server error from the gRPC engine
+ Status errorStatus = Status.INVALID_ARGUMENT.withDescription("Bad data");
+ responseListener.onClose(errorStatus, new Metadata());
+
+ // Verify it mapped down to the core exchange SPI
+ verify(exchange).close(errorStatus.getCode().value(), "Bad data");
+ }
+
+ @Test
+ @DisplayName("Should gracefully close exchange with Status 0 on completion")
+ public void shouldCloseExchangeOnComplete() {
+ setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING);
+ bridge.process(exchange);
+
+ ArgumentCaptor> listenerCaptor =
+ ArgumentCaptor.forClass(ClientCall.Listener.class);
+ verify(call).start(listenerCaptor.capture(), any(Metadata.class));
+ ClientCall.Listener responseListener = listenerCaptor.getValue();
+
+ // Simulate clean completion
+ responseListener.onClose(Status.OK, new Metadata());
+
+ verify(exchange).close(Status.OK.getCode().value(), null);
+ }
+
+ @Test
+ @DisplayName("Should extract and decode custom metadata headers")
+ public void shouldExtractMetadata() {
+ // CRITICAL FIX: Use BIDI_STREAMING instead of UNARY.
+ // Unary delays call.start() until the request stream completes. Bidi starts immediately.
+ setupValidMethod("test.Chat/Headers", MethodDescriptor.MethodType.BIDI_STREAMING);
+
+ Map incomingHeaders =
+ Map.of(
+ "x-custom-id",
+ "12345",
+ "x-custom-bin",
+ Base64.getEncoder().encodeToString("binary_data".getBytes()));
+ when(exchange.getHeaders()).thenReturn(incomingHeaders);
+
+ bridge.process(exchange);
+
+ // Verify that the metadata was extracted and attached to the call
+ ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
+ verify(call).start(any(), metadataCaptor.capture());
+
+ Metadata metadata = metadataCaptor.getValue();
+
+ assertEquals(
+ "12345", metadata.get(Metadata.Key.of("x-custom-id", Metadata.ASCII_STRING_MARSHALLER)));
+ assertArrayEquals(
+ "binary_data".getBytes(),
+ metadata.get(Metadata.Key.of("x-custom-bin", Metadata.BINARY_BYTE_MARSHALLER)));
+ }
+
+ /** Helper to mock out a valid MethodDescriptor in the registry. */
+ @SuppressWarnings("unchecked")
+ private void setupValidMethod(String methodPath, MethodDescriptor.MethodType type) {
+ MethodDescriptor descriptor =
+ MethodDescriptor.newBuilder()
+ .setType(type)
+ .setFullMethodName(methodPath)
+ .setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
+ .setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
+ .build();
+
+ when(exchange.getRequestPath()).thenReturn("/" + methodPath);
+ //noinspection rawtypes
+ when(registry.get(methodPath)).thenReturn((MethodDescriptor) descriptor);
+ }
+}
diff --git a/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcDeframerTest.java b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcDeframerTest.java
new file mode 100644
index 0000000000..5239681cc9
--- /dev/null
+++ b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcDeframerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.grpc;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.jooby.internal.grpc.GrpcDeframer;
+
+public class GrpcDeframerTest {
+
+ private GrpcDeframer deframer;
+ private List outputMessages;
+
+ @BeforeEach
+ public void setUp() {
+ deframer = new GrpcDeframer();
+ outputMessages = new ArrayList<>();
+ }
+
+ @Test
+ public void shouldParseSingleCompleteMessage() {
+ byte[] payload = "hello grpc".getBytes();
+ ByteBuffer frame = createGrpcFrame(payload);
+
+ deframer.process(frame, msg -> outputMessages.add(msg));
+
+ assertEquals(1, outputMessages.size());
+ assertArrayEquals(payload, outputMessages.get(0));
+ }
+
+ @Test
+ public void shouldParseFragmentedHeader() {
+ byte[] payload = "fragmented header".getBytes();
+ byte[] frame = createGrpcFrame(payload).array();
+
+ // Send the first 2 bytes of the header
+ deframer.process(ByteBuffer.wrap(frame, 0, 2), msg -> outputMessages.add(msg));
+ assertEquals(0, outputMessages.size(), "Should not emit message yet");
+
+ // Send the rest of the header and the payload
+ deframer.process(ByteBuffer.wrap(frame, 2, frame.length - 2), msg -> outputMessages.add(msg));
+
+ assertEquals(1, outputMessages.size());
+ assertArrayEquals(payload, outputMessages.get(0));
+ }
+
+ @Test
+ public void shouldParseFragmentedPayload() {
+ byte[] payload = "this is a very long payload that gets split".getBytes();
+ byte[] frame = createGrpcFrame(payload).array();
+
+ // Send the 5-byte header + first 10 bytes of payload (15 bytes total)
+ deframer.process(ByteBuffer.wrap(frame, 0, 15), msg -> outputMessages.add(msg));
+ assertEquals(0, outputMessages.size(), "Should not emit message until full payload arrives");
+
+ // Send the remainder of the payload
+ deframer.process(ByteBuffer.wrap(frame, 15, frame.length - 15), msg -> outputMessages.add(msg));
+
+ assertEquals(1, outputMessages.size());
+ assertArrayEquals(payload, outputMessages.get(0));
+ }
+
+ @Test
+ public void shouldParseMultipleMessagesInSingleBuffer() {
+ byte[] payload1 = "message 1".getBytes();
+ byte[] payload2 = "message 2".getBytes();
+
+ ByteBuffer frame1 = createGrpcFrame(payload1);
+ ByteBuffer frame2 = createGrpcFrame(payload2);
+
+ // Combine both frames into a single buffer
+ ByteBuffer combined = ByteBuffer.allocate(frame1.capacity() + frame2.capacity());
+ combined.put(frame1).put(frame2);
+ combined.flip();
+
+ deframer.process(combined, msg -> outputMessages.add(msg));
+
+ assertEquals(2, outputMessages.size());
+ assertArrayEquals(payload1, outputMessages.get(0));
+ assertArrayEquals(payload2, outputMessages.get(1));
+ }
+
+ @Test
+ public void shouldHandleZeroLengthPayload() {
+ byte[] payload = new byte[0];
+ ByteBuffer frame = createGrpcFrame(payload);
+
+ deframer.process(frame, msg -> outputMessages.add(msg));
+
+ assertEquals(1, outputMessages.size());
+ assertArrayEquals(payload, outputMessages.get(0));
+ }
+
+ @Test
+ public void shouldHandleExtremeFragmentationByteByByte() {
+ byte[] payload = "byte by byte".getBytes();
+ byte[] frame = createGrpcFrame(payload).array();
+
+ for (byte b : frame) {
+ deframer.process(ByteBuffer.wrap(new byte[] {b}), msg -> outputMessages.add(msg));
+ }
+
+ assertEquals(1, outputMessages.size());
+ assertArrayEquals(payload, outputMessages.get(0));
+ }
+
+ /** Helper to wrap a raw payload in the standard 5-byte gRPC framing. */
+ private ByteBuffer createGrpcFrame(byte[] payload) {
+ ByteBuffer buffer = ByteBuffer.allocate(5 + payload.length);
+ buffer.put((byte) 0); // Compressed flag
+ buffer.putInt(payload.length); // Length
+ buffer.put(payload); // Data
+ buffer.flip();
+ return buffer;
+ }
+}
diff --git a/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcRequestBridgeTest.java b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcRequestBridgeTest.java
new file mode 100644
index 0000000000..2d6ed11699
--- /dev/null
+++ b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcRequestBridgeTest.java
@@ -0,0 +1,159 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.grpc;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Flow.Subscription;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import io.grpc.ClientCall;
+import io.grpc.MethodDescriptor;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.jooby.internal.grpc.GrpcRequestBridge;
+
+public class GrpcRequestBridgeTest {
+
+ private ClientCall call;
+ private Subscription subscription;
+ private ClientCallStreamObserver requestObserver;
+ private ClientResponseObserver responseObserver;
+ private GrpcRequestBridge bridge;
+
+ @BeforeEach
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ call = mock(ClientCall.class);
+ subscription = mock(Subscription.class);
+ requestObserver = mock(ClientCallStreamObserver.class);
+ responseObserver = mock(ClientResponseObserver.class);
+
+ // Default to BIDI_STREAMING to test standard flow-control and backpressure
+ bridge = new GrpcRequestBridge(call, MethodDescriptor.MethodType.BIDI_STREAMING);
+ bridge.setRequestObserver(requestObserver);
+ bridge.setResponseObserver(responseObserver);
+ }
+
+ @Test
+ @DisplayName("Should request initial demand (1) upon subscription")
+ public void shouldRequestInitialDemandOnSubscribe() {
+ bridge.onSubscribe(subscription);
+
+ verify(subscription).request(1);
+ }
+
+ @Test
+ @DisplayName("Should forward payload to requestObserver and request more if gRPC buffer is ready")
+ public void shouldSendMessageAndRequestMoreIfReady() {
+ bridge.onSubscribe(subscription);
+ reset(subscription); // Clear the initial request(1) counter
+
+ when(requestObserver.isReady()).thenReturn(true);
+
+ byte[] payload = "test".getBytes();
+ ByteBuffer frame = createFrame(payload);
+
+ bridge.onNext(frame);
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class);
+ verify(requestObserver).onNext(captor.capture());
+ assertArrayEquals(payload, captor.getValue(), "The deframed payload should match exactly");
+
+ // Because isReady() is true, it should demand the next network chunk
+ verify(subscription).request(1);
+ }
+
+ @Test
+ @DisplayName(
+ "Should forward payload but apply backpressure (do not request) if gRPC is not ready")
+ public void shouldNotRequestMoreIfNotReady() {
+ bridge.onSubscribe(subscription);
+ reset(subscription);
+
+ when(requestObserver.isReady()).thenReturn(false);
+
+ byte[] payload = "test".getBytes();
+ bridge.onNext(createFrame(payload));
+
+ verify(requestObserver).onNext(any());
+
+ // Since isReady() is false, it should NOT request more data, effectively applying backpressure
+ verify(subscription, never()).request(anyLong());
+ }
+
+ @Test
+ @DisplayName("Should complete the requestObserver when the network stream completes")
+ public void shouldCompleteRequestObserverOnComplete() {
+ bridge.onSubscribe(subscription);
+ bridge.onComplete();
+
+ verify(requestObserver).onCompleted();
+ }
+
+ @Test
+ @DisplayName("Should propagate network errors to the requestObserver")
+ public void shouldPropagateErrorToObserver() {
+ bridge.onSubscribe(subscription);
+ Throwable error = new RuntimeException("Stream network failure");
+
+ bridge.onError(error);
+
+ verify(requestObserver).onError(error);
+ }
+
+ @Test
+ @DisplayName("Unary calls should accumulate payload without forwarding until EOF")
+ public void shouldHandleUnaryCallsDifferently() {
+ bridge = new GrpcRequestBridge(call, MethodDescriptor.MethodType.UNARY);
+ bridge.setResponseObserver(responseObserver);
+ bridge.onSubscribe(subscription);
+ reset(subscription);
+
+ byte[] payload = "unary".getBytes();
+ bridge.onNext(createFrame(payload));
+
+ // For Unary and Server Streaming, chunks are NOT passed via onNext
+ verify(requestObserver, never()).onNext(any());
+
+ // It should keep requesting data from the network until EOF is reached
+ verify(subscription).request(1);
+ }
+
+ @Test
+ @DisplayName("onGrpcReady callback should trigger network demand if stream is active")
+ public void shouldRequestMoreOnGrpcReady() {
+ bridge.onSubscribe(subscription);
+ reset(subscription);
+
+ when(requestObserver.isReady()).thenReturn(true);
+
+ bridge.onGrpcReady();
+
+ verify(subscription).request(1);
+ }
+
+ private ByteBuffer createFrame(byte[] payload) {
+ ByteBuffer frame = ByteBuffer.allocate(5 + payload.length);
+ frame.put((byte) 0); // Uncompressed flag
+ frame.putInt(payload.length);
+ frame.put(payload);
+ frame.flip(); // Prepare buffer for reading
+ return frame;
+ }
+}
diff --git a/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java b/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java
index d54ad90ae0..db3bc6132a 100644
--- a/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java
+++ b/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java
@@ -16,13 +16,13 @@
import org.junit.jupiter.api.Test;
-import tools.jackson.databind.ObjectMapper;
-import tools.jackson.dataformat.xml.XmlMapper;
import io.jooby.Body;
import io.jooby.Context;
import io.jooby.MediaType;
import io.jooby.output.OutputFactory;
import io.jooby.output.OutputOptions;
+import tools.jackson.databind.ObjectMapper;
+import tools.jackson.dataformat.xml.XmlMapper;
public class Jackson3JsonModuleTest {
diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcExchange.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcExchange.java
new file mode 100644
index 0000000000..7abd91364d
--- /dev/null
+++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcExchange.java
@@ -0,0 +1,100 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.jetty;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.util.Callback;
+
+import io.jooby.GrpcExchange;
+
+public class JettyGrpcExchange implements GrpcExchange {
+
+ private final Request request;
+ private final Response response;
+ private final Callback jettyCallback;
+ private boolean headersSent = false;
+
+ // Create a mutable trailers object that Jetty will pull from at the end of the stream
+ private final HttpFields.Mutable trailers = HttpFields.build();
+
+ public JettyGrpcExchange(Request request, Response response, Callback jettyCallback) {
+ this.request = request;
+ this.response = response;
+ this.jettyCallback = jettyCallback;
+
+ response.getHeaders().put("Content-Type", "application/grpc");
+
+ // Register the supplier BEFORE the response commits
+ response.setTrailersSupplier(() -> trailers);
+ }
+
+ @Override
+ public String getRequestPath() {
+ return request.getHttpURI().getPath();
+ }
+
+ @Override
+ public String getHeader(String name) {
+ return request.getHeaders().get(name);
+ }
+
+ @Override
+ public Map getHeaders() {
+ Map map = new HashMap<>();
+ for (var field : request.getHeaders()) {
+ map.put(field.getName(), field.getValue());
+ }
+ return map;
+ }
+
+ @Override
+ public void send(ByteBuffer payload, Consumer callback) {
+ headersSent = true;
+
+ response.write(
+ false,
+ payload,
+ new Callback() {
+ @Override
+ public void succeeded() {
+ callback.accept(null);
+ }
+
+ @Override
+ public void failed(Throwable x) {
+ callback.accept(x);
+ }
+ });
+ }
+
+ @Override
+ public void close(int statusCode, String description) {
+ if (headersSent) {
+ // Trailers-Appended: Data was sent, populate the mutable trailers object
+ trailers.add("grpc-status", String.valueOf(statusCode));
+ if (description != null) {
+ trailers.add("grpc-message", description);
+ }
+
+ // Complete stream. Jetty will automatically read from the supplier we registered earlier.
+ response.write(true, ByteBuffer.allocate(0), jettyCallback);
+ } else {
+ // Trailers-Only: No data was sent, trailers become standard HTTP headers.
+ response.getHeaders().put("grpc-status", String.valueOf(statusCode));
+ if (description != null) {
+ response.getHeaders().put("grpc-message", description);
+ }
+ response.write(true, ByteBuffer.allocate(0), jettyCallback);
+ }
+ }
+}
diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcHandler.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcHandler.java
new file mode 100644
index 0000000000..9ae74a03a6
--- /dev/null
+++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.jetty;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.util.Callback;
+
+import io.jooby.GrpcProcessor;
+
+public class JettyGrpcHandler extends Handler.Wrapper {
+
+ private final GrpcProcessor processor;
+
+ public JettyGrpcHandler(Handler next, GrpcProcessor processor) {
+ this.processor = processor;
+ setHandler(next);
+ }
+
+ @Override
+ public boolean handle(Request request, Response response, Callback callback) throws Exception {
+ var contentType = request.getHeaders().get("Content-Type");
+
+ if (processor.isGrpcMethod(request.getHttpURI().getPath())
+ && contentType != null
+ && contentType.startsWith("application/grpc")) {
+
+ if (!"HTTP/2.0".equals(request.getConnectionMetaData().getProtocol())) {
+ response.setStatus(426); // Upgrade Required
+ response.getHeaders().put("Connection", "Upgrade");
+ response.getHeaders().put("Upgrade", "h2c");
+ callback.succeeded();
+ return true;
+ }
+
+ var exchange = new JettyGrpcExchange(request, response, callback);
+ var subscriber = processor.process(exchange);
+
+ new JettyGrpcInputBridge(request, subscriber, callback).start();
+
+ return true;
+ }
+
+ // not grpc, move next
+ return super.handle(request, response, callback);
+ }
+}
diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcInputBridge.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcInputBridge.java
new file mode 100644
index 0000000000..a78101553a
--- /dev/null
+++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcInputBridge.java
@@ -0,0 +1,91 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.jetty;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.util.Callback;
+
+public class JettyGrpcInputBridge implements Flow.Subscription, Runnable {
+
+ private final Request request;
+ private final Flow.Subscriber subscriber;
+ private final Callback callback;
+ private final AtomicLong demand = new AtomicLong();
+
+ public JettyGrpcInputBridge(
+ Request request, Flow.Subscriber subscriber, Callback callback) {
+ this.request = request;
+ this.subscriber = subscriber;
+ this.callback = callback;
+ }
+
+ public void start() {
+ subscriber.onSubscribe(this);
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ subscriber.onError(new IllegalArgumentException("Demand must be positive"));
+ return;
+ }
+
+ if (demand.getAndAdd(n) == 0) {
+ run();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ demand.set(0);
+ callback.failed(new CancellationException("gRPC stream cancelled by client"));
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (demand.get() > 0) {
+ var chunk = request.read();
+
+ if (chunk == null) {
+ request.demand(this);
+ return;
+ }
+
+ try {
+ var failure = chunk.getFailure();
+ if (failure != null) {
+ subscriber.onError(failure);
+ callback.failed(failure);
+ return;
+ }
+
+ var buffer = chunk.getByteBuffer();
+ if (buffer != null && buffer.hasRemaining()) {
+ subscriber.onNext(buffer);
+ demand.decrementAndGet();
+ }
+
+ if (chunk.isLast()) {
+ subscriber.onComplete();
+ return;
+ }
+
+ } finally {
+ chunk.release();
+ }
+ }
+ } catch (Throwable t) {
+ subscriber.onError(t);
+ callback.failed(t);
+ }
+ }
+}
diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java
index ab3b8d50cc..1a1d03d459 100644
--- a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java
+++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java
@@ -5,8 +5,6 @@
*/
package io.jooby.internal.jetty.http2;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
@@ -25,13 +23,23 @@ public class JettyHttp2Configurer {
public List configure(HttpConfiguration input) {
if (input.getCustomizer(SecureRequestCustomizer.class) != null) {
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(H2, H2_17, HTTP_1_1);
- alpn.setDefaultProtocol(HTTP_1_1);
+ alpn.setDefaultProtocol(H2);
- HTTP2ServerConnectionFactory https2 = new HTTP2ServerConnectionFactory(input);
+ HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(input);
+ h2.setInitialStreamRecvWindow(1024 * 1024);
+ h2.setInitialSessionRecvWindow(10 * 1024 * 1024);
- return Arrays.asList(alpn, https2);
+ // FIX: Set Max Concurrent Streams higher if you have many bidi clients
+ h2.setMaxConcurrentStreams(1000);
+ return List.of(alpn, h2);
} else {
- return Collections.singletonList(new HTTP2CServerConnectionFactory(input));
+ var h2c = new HTTP2CServerConnectionFactory(input);
+ h2c.setInitialStreamRecvWindow(1024 * 1024);
+ h2c.setInitialSessionRecvWindow(10 * 1024 * 1024);
+
+ // FIX: Set Max Concurrent Streams higher if you have many bidi clients
+ h2c.setMaxConcurrentStreams(1000);
+ return List.of(h2c);
}
}
}
diff --git a/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java b/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java
index 6188177e03..ab25f22b3a 100644
--- a/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java
+++ b/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java
@@ -32,9 +32,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.jooby.*;
import io.jooby.exception.StartupException;
-import io.jooby.internal.jetty.JettyHandler;
-import io.jooby.internal.jetty.JettyHttpExpectAndContinueHandler;
-import io.jooby.internal.jetty.PrefixHandler;
+import io.jooby.internal.jetty.*;
import io.jooby.internal.jetty.http2.JettyHttp2Configurer;
import io.jooby.output.OutputFactory;
@@ -303,12 +301,17 @@ private List> createHandler(
if (options.isExpectContinue() == Boolean.TRUE) {
handler = new JettyHttpExpectAndContinueHandler(handler);
}
+ GrpcProcessor grpcProcessor =
+ application.getServices().getOrNull(GrpcProcessor.class);
+ if (grpcProcessor != null) {
+ handler = new JettyGrpcHandler(handler, grpcProcessor);
+ }
return Map.entry(application.getContextPath(), handler);
})
.toList();
}
- @NonNull @Override
+ @Override
public List getLoggerOff() {
return List.of(
"org.eclipse.jetty.server.Server",
diff --git a/modules/jooby-jetty/src/main/java/module-info.java b/modules/jooby-jetty/src/main/java/module-info.java
index e10ff88cd1..8b1a6e495b 100644
--- a/modules/jooby-jetty/src/main/java/module-info.java
+++ b/modules/jooby-jetty/src/main/java/module-info.java
@@ -18,6 +18,7 @@
requires org.eclipse.jetty.http2.server;
requires org.eclipse.jetty.websocket.server;
requires java.desktop;
+ requires org.eclipse.jetty.http;
provides Server with
JettyServer;
diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java
new file mode 100644
index 0000000000..5a4b4adb1b
--- /dev/null
+++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java
@@ -0,0 +1,115 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.netty;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import io.jooby.GrpcExchange;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+
+public class NettyGrpcExchange implements GrpcExchange {
+
+ private final ChannelHandlerContext ctx;
+ private final HttpRequest request;
+ private boolean headersSent = false;
+
+ public NettyGrpcExchange(ChannelHandlerContext ctx, HttpRequest request) {
+ this.ctx = ctx;
+ this.request = request;
+ }
+
+ @Override
+ public String getRequestPath() {
+ String uri = request.uri();
+ int queryIndex = uri.indexOf('?');
+ return queryIndex > 0 ? uri.substring(0, queryIndex) : uri;
+ }
+
+ @Override
+ public String getHeader(String name) {
+ return request.headers().get(name);
+ }
+
+ @Override
+ public Map getHeaders() {
+ Map map = new HashMap<>();
+ for (Map.Entry entry : request.headers()) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ return map;
+ }
+
+ private void sendHeadersIfNecessary() {
+ if (!headersSent) {
+ // Send the initial HTTP/2 HEADERS frame (Status 200)
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc");
+ ctx.write(response);
+ headersSent = true;
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer payload, Consumer callback) {
+ sendHeadersIfNecessary();
+
+ // Wrap the NIO ByteBuffer in a Netty ByteBuf without copying
+ HttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(payload));
+
+ // Write and flush, then map Netty's Future to your single-lambda callback
+ ctx.writeAndFlush(chunk)
+ .addListener(
+ future -> {
+ if (future.isSuccess()) {
+ callback.accept(null);
+ } else {
+ callback.accept(future.cause());
+ }
+ });
+ }
+
+ @Override
+ public void close(int statusCode, String description) {
+ if (headersSent) {
+ // Trailers-Appended: Send the final HTTP/2 HEADERS frame with END_STREAM flag
+ LastHttpContent lastContent = new DefaultLastHttpContent();
+ lastContent.trailingHeaders().set("grpc-status", String.valueOf(statusCode));
+ if (description != null) {
+ lastContent.trailingHeaders().set("grpc-message", description);
+ }
+ // writeAndFlush the LastHttpContent, then close the Netty stream channel
+ ctx.writeAndFlush(lastContent).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ // Trailers-Only: No body was sent, so standard headers become the trailers
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc");
+ response.headers().set("grpc-status", String.valueOf(statusCode));
+ if (description != null) {
+ response.headers().set("grpc-message", description);
+ }
+ ctx.write(response);
+
+ // Close out the stream with an empty DATA frame possessing the END_STREAM flag
+ ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
+ .addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+}
diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java
new file mode 100644
index 0000000000..b7254e1a34
--- /dev/null
+++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.netty;
+
+import io.jooby.GrpcProcessor;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.*;
+import io.netty.util.ReferenceCountUtil;
+
+public class NettyGrpcHandler extends ChannelInboundHandlerAdapter {
+
+ private final GrpcProcessor processor;
+ private final boolean isHttp2;
+
+ // State for the current stream
+ private boolean isGrpc = false;
+ private NettyGrpcInputBridge inputBridge;
+
+ public NettyGrpcHandler(GrpcProcessor processor, boolean isHttp2) {
+ this.processor = processor;
+ this.isHttp2 = isHttp2;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+ // 1. Intercept the initial Request headers
+ if (msg instanceof HttpRequest req) {
+ var contentType = req.headers().get(HttpHeaderNames.CONTENT_TYPE);
+ var path = req.uri();
+ int queryIndex = path.indexOf('?');
+ path = queryIndex > 0 ? path.substring(0, queryIndex) : path;
+
+ if (processor.isGrpcMethod(path)
+ && contentType != null
+ && contentType.startsWith("application/grpc")) {
+ isGrpc = true;
+
+ if (!isHttp2) {
+ // gRPC requires HTTP/2. Reject HTTP/1.1 calls immediately.
+ var response =
+ new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.UPGRADE_REQUIRED);
+ response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
+ response.headers().set(HttpHeaderNames.UPGRADE, "h2c");
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ ReferenceCountUtil.release(msg);
+ return;
+ }
+
+ // We will implement NettyGrpcExchange in the next step
+ var exchange = new NettyGrpcExchange(ctx, req);
+ var subscriber = processor.process(exchange);
+
+ inputBridge = new NettyGrpcInputBridge(ctx, subscriber);
+ inputBridge.start();
+
+ ReferenceCountUtil.release(msg); // We consumed the headers
+ return;
+ }
+ }
+
+ // 2. Intercept subsequent body chunks for this gRPC stream
+ if (isGrpc && msg instanceof HttpContent chunk) {
+ try {
+ if (inputBridge != null) {
+ inputBridge.onChunk(chunk);
+ }
+ } finally {
+ // Always release Netty's direct memory buffers
+ ReferenceCountUtil.release(chunk);
+ }
+ return;
+ }
+
+ // Not a gRPC request. Pass down the pipeline to Jooby's NettyHandler
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ if (isGrpc && inputBridge != null) {
+ inputBridge.cancel(); // Client disconnected abruptly
+ }
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (isGrpc) {
+ ctx.close();
+ } else {
+ super.exceptionCaught(ctx, cause);
+ }
+ }
+}
diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java
new file mode 100644
index 0000000000..48457c2aff
--- /dev/null
+++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java
@@ -0,0 +1,82 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.netty;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.LastHttpContent;
+
+public class NettyGrpcInputBridge implements Flow.Subscription {
+
+ private final ChannelHandlerContext ctx;
+ private final Flow.Subscriber subscriber;
+ private final AtomicLong demand = new AtomicLong();
+
+ public NettyGrpcInputBridge(ChannelHandlerContext ctx, Flow.Subscriber subscriber) {
+ this.ctx = ctx;
+ this.subscriber = subscriber;
+ }
+
+ public void start() {
+ // Disable auto-read. We will manually request reads based on gRPC demand.
+ ctx.channel().config().setAutoRead(false);
+ subscriber.onSubscribe(this);
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ subscriber.onError(new IllegalArgumentException("Demand must be positive"));
+ return;
+ }
+
+ if (demand.getAndAdd(n) == 0) {
+ // We transitioned from 0 to n demand, trigger a read from the socket
+ ctx.read();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ demand.set(0);
+ ctx.close(); // Abort the connection
+ }
+
+ /** Called by the NettyGrpcHandler when a new chunk arrives from the network. */
+ public void onChunk(HttpContent chunk) {
+ try {
+ ByteBuf content = chunk.content();
+ if (content.isReadable()) {
+ // Convert Netty ByteBuf to standard Java ByteBuffer
+ ByteBuffer buffer = content.nioBuffer();
+
+ // Pass to the gRPC deframer
+ subscriber.onNext(buffer);
+
+ long currentDemand = demand.decrementAndGet();
+ if (currentDemand > 0) {
+ // Still have demand, ask Netty for the next chunk
+ ctx.read();
+ }
+ }
+
+ if (chunk instanceof LastHttpContent) {
+ subscriber.onComplete();
+ } else if (demand.get() > 0 && !content.isReadable()) {
+ // Edge case: Empty chunk but not LastHttpContent, read next
+ ctx.read();
+ }
+ } catch (Throwable t) {
+ subscriber.onError(t);
+ ctx.close();
+ }
+ }
+}
diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java
index a09667006b..d62c400d08 100644
--- a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java
+++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java
@@ -85,9 +85,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// possibly body:
long contentLength = contentLength(req);
if (contentLength > 0 || isTransferEncodingChunked(req)) {
- context.httpDataFactory = new DefaultHttpDataFactory(bufferSize);
- context.httpDataFactory.setBaseDir(app.getTmpdir().toString());
- context.setDecoder(newDecoder(req, context.httpDataFactory, maxFormFields));
+ if (req.getClass() == DefaultFullHttpRequest.class) {
+ // HTTP2 aggregates all into a full http request.
+ if (((DefaultFullHttpRequest) req).content().readableBytes() > maxRequestSize) {
+ router.match(context).execute(context, Route.REQUEST_ENTITY_TOO_LARGE);
+ return;
+ }
+ // full body is here move
+ router.match(context).execute(context);
+ } else {
+ context.httpDataFactory = new DefaultHttpDataFactory(bufferSize);
+ context.httpDataFactory.setBaseDir(app.getTmpdir().toString());
+ context.setDecoder(newDecoder(req, context.httpDataFactory, maxFormFields));
+ }
} else {
// no body, move on
router.match(context).execute(context);
diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java
index c20a6bcea5..bfafa63fdb 100644
--- a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java
+++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java
@@ -9,6 +9,7 @@
import java.util.concurrent.ScheduledExecutorService;
import io.jooby.Context;
+import io.jooby.GrpcProcessor;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
@@ -33,6 +34,7 @@ public class NettyPipeline extends ChannelInitializer {
private final boolean expectContinue;
private final Integer compressionLevel;
private final NettyDateService dateService;
+ private final GrpcProcessor grpcProcessor;
public NettyPipeline(
SslContext sslContext,
@@ -45,7 +47,8 @@ public NettyPipeline(
boolean http2,
boolean expectContinue,
Integer compressionLevel,
- NettyDateService dateService) {
+ NettyDateService dateService,
+ GrpcProcessor grpcProcessor) {
this.sslContext = sslContext;
this.decoderConfig = decoderConfig;
this.contextSelector = contextSelector;
@@ -57,6 +60,7 @@ public NettyPipeline(
this.expectContinue = expectContinue;
this.compressionLevel = compressionLevel;
this.dateService = dateService;
+ this.grpcProcessor = grpcProcessor;
}
@Override
@@ -77,6 +81,12 @@ public void initChannel(SocketChannel ch) {
private void setupHttp11(ChannelPipeline p) {
p.addLast("codec", createServerCodec());
addCommonHandlers(p);
+
+ // Inject gRPC handler (isHttp2 = false to trigger 426 Upgrade Required)
+ if (grpcProcessor != null) {
+ p.addLast("grpc", new NettyGrpcHandler(grpcProcessor, false));
+ }
+
p.addLast("handler", createHandler(p.channel().eventLoop()));
}
@@ -103,6 +113,12 @@ private void setupHttp11Upgrade(ChannelPipeline pipeline) {
(int) maxRequestSize));
addCommonHandlers(pipeline);
+
+ // Inject gRPC handler (isHttp2 = false to trigger 426 Upgrade Required)
+ if (grpcProcessor != null) {
+ pipeline.addLast("grpc", new NettyGrpcHandler(grpcProcessor, false));
+ }
+
pipeline.addLast("handler", createHandler(pipeline.channel().eventLoop()));
}
@@ -196,6 +212,12 @@ private static class Http2StreamInitializer extends ChannelInitializer
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast("http2", new Http2StreamFrameToHttpObjectCodec(true));
+
+ // Inject gRPC handler (isHttp2 = true). This handles the actual multiplexed gRPC traffic.
+ if (pipeline.grpcProcessor != null) {
+ ch.pipeline().addLast("grpc", new NettyGrpcHandler(pipeline.grpcProcessor, true));
+ }
+
ch.pipeline().addLast("handler", pipeline.createHandler(ch.eventLoop()));
}
}
diff --git a/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java b/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java
index 2f62da8fb2..f798a43c1d 100644
--- a/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java
+++ b/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java
@@ -155,9 +155,16 @@ public Server start(@NonNull Jooby... application) {
var outputFactory = (NettyOutputFactory) getOutputFactory();
var allocator = outputFactory.getAllocator();
var http2 = options.isHttp2() == Boolean.TRUE;
+
+ // Retrieve the GrpcProcessor from the application's service registry
+ GrpcProcessor grpcProcessor =
+ http2 ? applications.get(0).getServices().getOrNull(GrpcProcessor.class) : null;
+
/* Bootstrap: */
if (!options.isHttpsOnly()) {
- var http = newBootstrap(allocator, transport, newPipeline(options, null, http2), eventLoop);
+ var http =
+ newBootstrap(
+ allocator, transport, newPipeline(options, null, http2, grpcProcessor), eventLoop);
http.bind(options.getHost(), options.getPort()).get();
}
@@ -170,7 +177,11 @@ public Server start(@NonNull Jooby... application) {
var clientAuth = sslOptions.getClientAuth();
var sslContext = wrap(javaSslContext, toClientAuth(clientAuth), protocol, http2);
var https =
- newBootstrap(allocator, transport, newPipeline(options, sslContext, http2), eventLoop);
+ newBootstrap(
+ allocator,
+ transport,
+ newPipeline(options, sslContext, http2, grpcProcessor),
+ eventLoop);
portInUse = options.getSecurePort();
https.bind(options.getHost(), portInUse).get();
} else if (options.isHttpsOnly()) {
@@ -216,7 +227,8 @@ private ClientAuth toClientAuth(SslOptions.ClientAuth clientAuth) {
};
}
- private NettyPipeline newPipeline(ServerOptions options, SslContext sslContext, boolean http2) {
+ private NettyPipeline newPipeline(
+ ServerOptions options, SslContext sslContext, boolean http2, GrpcProcessor grpcProcessor) {
var decoderConfig =
new HttpDecoderConfig()
.setMaxInitialLineLength(_4KB)
@@ -235,7 +247,8 @@ private NettyPipeline newPipeline(ServerOptions options, SslContext sslContext,
http2,
options.isExpectContinue() == Boolean.TRUE,
options.getCompressionLevel(),
- dateLoop);
+ dateLoop,
+ grpcProcessor);
}
@Override
diff --git a/modules/jooby-netty/src/main/java/module-info.java b/modules/jooby-netty/src/main/java/module-info.java
index 5f9e7f16e4..0831e5136c 100644
--- a/modules/jooby-netty/src/main/java/module-info.java
+++ b/modules/jooby-netty/src/main/java/module-info.java
@@ -25,7 +25,6 @@
requires static io.netty.transport.classes.epoll;
requires static io.netty.transport.classes.kqueue;
requires static io.netty.transport.classes.io_uring;
- requires java.desktop;
provides Server with
NettyServer;
diff --git a/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java b/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java
index 27edcd6b71..1538e8cad4 100644
--- a/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java
+++ b/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java
@@ -130,7 +130,8 @@ public Optional getJakartaDefaultValue(List annotations)
Stream.of(annotations()).filter(it -> it.getName().startsWith("jakarta.ws.rs")).toList();
for (var a : annotations) {
if (a.values != null) {
- var matches = names.stream().anyMatch(it -> "Ljakarta/ws/rs/DefaultValue;".equals(a.desc));
+ var matches =
+ names.stream().anyMatch(it -> "Ljakarta/ws/rs/DefaultValue;".equals(a.desc));
if (matches) {
return AnnotationUtils.findAnnotationValue(a, "value").map(Objects::toString);
}
diff --git a/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java b/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java
index 300ae3631c..f0fdf53f30 100644
--- a/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java
+++ b/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java
@@ -5,10 +5,10 @@
*/
package issues.i3835;
-import io.jooby.Jooby;
-
import static io.jooby.openapi.MvcExtensionGenerator.toMvcExtension;
+import io.jooby.Jooby;
+
public class App3835Jakarta extends Jooby {
{
mvc(toMvcExtension(C3835Jakarta.class));
diff --git a/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java b/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java
index 1734cef8bc..e2bc17c744 100644
--- a/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java
+++ b/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java
@@ -5,14 +5,14 @@
*/
package issues.i3835;
+import java.util.List;
+import java.util.Map;
+
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.QueryParam;
-import java.util.List;
-import java.util.Map;
-
@Path("/3835")
public class C3835Jakarta {
diff --git a/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcExchange.java b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcExchange.java
new file mode 100644
index 0000000000..139061f54d
--- /dev/null
+++ b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcExchange.java
@@ -0,0 +1,173 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.undertow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.xnio.channels.StreamSinkChannel;
+
+import io.jooby.GrpcExchange;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.server.protocol.http.HttpAttachments;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.HeaderValues;
+import io.undertow.util.Headers;
+import io.undertow.util.HttpString;
+
+public class UndertowGrpcExchange implements GrpcExchange {
+
+ private final HttpServerExchange exchange;
+ private boolean headersSent = false;
+ private StreamSinkChannel responseChannel;
+
+ public UndertowGrpcExchange(HttpServerExchange exchange) {
+ this.exchange = exchange;
+ }
+
+ @Override
+ public String getRequestPath() {
+ return exchange.getRequestPath();
+ }
+
+ @Override
+ public String getHeader(String name) {
+ return exchange.getRequestHeaders().getFirst(name);
+ }
+
+ @Override
+ public Map getHeaders() {
+ Map map = new HashMap<>();
+ for (HeaderValues values : exchange.getRequestHeaders()) {
+ map.put(values.getHeaderName().toString(), values.getFirst());
+ }
+ return map;
+ }
+
+ @Override
+ public void send(ByteBuffer payload, Consumer callback) {
+ if (!headersSent) {
+ exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/grpc");
+ this.responseChannel = exchange.getResponseChannel();
+ headersSent = true;
+ }
+
+ // Write and immediately flush to prevent bidirectional deadlocks
+ doWriteAndFlush(payload, callback);
+ }
+
+ private void doWriteAndFlush(ByteBuffer payload, Consumer callback) {
+ try {
+ int res = responseChannel.write(payload);
+
+ if (payload.hasRemaining()) {
+ // Wait for socket to become writable
+ responseChannel
+ .getWriteSetter()
+ .set(
+ ch -> {
+ try {
+ ch.write(payload);
+ if (!payload.hasRemaining()) {
+ ch.suspendWrites();
+ doFlush(callback); // Proceed to flush
+ }
+ } catch (IOException e) {
+ ch.suspendWrites();
+ callback.accept(e);
+ }
+ });
+ responseChannel.resumeWrites();
+ } else {
+ // Written fully, proceed to flush immediately
+ doFlush(callback);
+ }
+ } catch (IOException e) {
+ callback.accept(e);
+ }
+ }
+
+ private void doFlush(Consumer callback) {
+ try {
+ if (responseChannel.flush()) {
+ callback.accept(null); // Fully flushed to network
+ } else {
+ // Wait for socket to become flushable
+ responseChannel
+ .getWriteSetter()
+ .set(
+ ch -> {
+ try {
+ if (ch.flush()) {
+ ch.suspendWrites();
+ callback.accept(null);
+ }
+ } catch (IOException e) {
+ ch.suspendWrites();
+ callback.accept(e);
+ }
+ });
+ responseChannel.resumeWrites();
+ }
+ } catch (IOException e) {
+ callback.accept(e);
+ }
+ }
+
+ @Override
+ public void close(int statusCode, String description) {
+ if (headersSent) {
+ exchange.putAttachment(
+ HttpAttachments.RESPONSE_TRAILER_SUPPLIER,
+ () -> {
+ HeaderMap trailers = new HeaderMap();
+ trailers.put(HttpString.tryFromString("grpc-status"), String.valueOf(statusCode));
+ if (description != null) {
+ trailers.put(HttpString.tryFromString("grpc-message"), description);
+ }
+ return trailers;
+ });
+
+ try {
+ responseChannel.shutdownWrites();
+ if (!responseChannel.flush()) {
+ responseChannel
+ .getWriteSetter()
+ .set(
+ ch -> {
+ try {
+ if (ch.flush()) {
+ ch.suspendWrites();
+ exchange.endExchange();
+ }
+ } catch (IOException ignored) {
+ ch.suspendWrites();
+ exchange.endExchange();
+ }
+ });
+ responseChannel.resumeWrites();
+ } else {
+ exchange.endExchange();
+ }
+ } catch (IOException e) {
+ exchange.endExchange();
+ }
+
+ } else {
+ exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/grpc");
+ exchange
+ .getResponseHeaders()
+ .put(HttpString.tryFromString("grpc-status"), String.valueOf(statusCode));
+ if (description != null) {
+ exchange.getResponseHeaders().put(HttpString.tryFromString("grpc-message"), description);
+ }
+ exchange.endExchange();
+ }
+ }
+}
diff --git a/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcHandler.java b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcHandler.java
new file mode 100644
index 0000000000..36643bcd08
--- /dev/null
+++ b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.undertow;
+
+import io.jooby.GrpcProcessor;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.Headers;
+import io.undertow.util.Protocols;
+
+public class UndertowGrpcHandler implements HttpHandler {
+
+ private final HttpHandler next;
+ private final GrpcProcessor processor;
+
+ public UndertowGrpcHandler(HttpHandler next, GrpcProcessor processor) {
+ this.next = next;
+ this.processor = processor;
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange exchange) throws Exception {
+ var contentType = exchange.getRequestHeaders().getFirst(Headers.CONTENT_TYPE);
+
+ if (processor.isGrpcMethod(exchange.getRequestPath())
+ && contentType != null
+ && contentType.startsWith("application/grpc")) {
+
+ // gRPC strictly requires HTTP/2
+ if (!exchange.getProtocol().equals(Protocols.HTTP_2_0)) {
+ exchange.setStatusCode(426); // Upgrade Required
+ exchange.getResponseHeaders().put(Headers.CONNECTION, "Upgrade");
+ exchange.getResponseHeaders().put(Headers.UPGRADE, "h2c");
+ exchange.endExchange();
+ return;
+ }
+
+ var grpcExchange = new UndertowGrpcExchange(exchange);
+ var subscriber = processor.process(grpcExchange);
+
+ // Starts the reactive pipeline and acquires the XNIO channel
+ var inputBridge = new UndertowGrpcInputBridge(exchange, subscriber);
+ inputBridge.start();
+
+ return; // Fully handled, do not pass to the standard router
+ }
+
+ // Not a gRPC request, delegate to the next handler in the chain
+ next.handleRequest(exchange);
+ }
+}
diff --git a/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcInputBridge.java b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcInputBridge.java
new file mode 100644
index 0000000000..84d05a0293
--- /dev/null
+++ b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcInputBridge.java
@@ -0,0 +1,96 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.internal.undertow;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.xnio.ChannelListener;
+import org.xnio.IoUtils;
+import org.xnio.channels.StreamSourceChannel;
+
+import io.undertow.server.HttpServerExchange;
+
+public class UndertowGrpcInputBridge
+ implements Flow.Subscription, ChannelListener {
+
+ private final HttpServerExchange exchange;
+ private final Flow.Subscriber subscriber;
+ private final AtomicLong demand = new AtomicLong();
+ private StreamSourceChannel channel;
+
+ private final ByteBuffer buffer = ByteBuffer.allocate(8192);
+
+ public UndertowGrpcInputBridge(
+ HttpServerExchange exchange, Flow.Subscriber subscriber) {
+ this.exchange = exchange;
+ this.subscriber = subscriber;
+ }
+
+ public void start() {
+ this.channel = exchange.getRequestChannel();
+ this.channel.getReadSetter().set(this);
+ subscriber.onSubscribe(this);
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ subscriber.onError(new IllegalArgumentException("Demand must be positive"));
+ return;
+ }
+
+ if (demand.getAndAdd(n) == 0 && channel != null) {
+ // CRITICAL FIX: wakeupReads() forces the listener to fire immediately,
+ // draining any data that arrived in the same packet as the headers.
+ channel.wakeupReads();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ demand.set(0);
+ IoUtils.safeClose(channel);
+ exchange.endExchange();
+ }
+
+ @Override
+ public void handleEvent(StreamSourceChannel channel) {
+ try {
+ while (demand.get() > 0) {
+ buffer.clear();
+ int res = channel.read(buffer);
+
+ if (res == -1) {
+ channel.suspendReads();
+ subscriber.onComplete();
+ return;
+ } else if (res == 0) {
+ // Buffer drained, waiting for more data from the network
+ return;
+ }
+
+ buffer.flip();
+ ByteBuffer chunk = ByteBuffer.allocate(buffer.remaining());
+ chunk.put(buffer);
+ chunk.flip();
+
+ subscriber.onNext(chunk);
+ demand.decrementAndGet();
+ }
+
+ if (demand.get() == 0) {
+ channel.suspendReads();
+ }
+
+ } catch (Throwable t) {
+ subscriber.onError(t);
+ IoUtils.safeClose(channel);
+ exchange.endExchange();
+ }
+ }
+}
diff --git a/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java b/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java
index f335a220ed..c918a4acb9 100644
--- a/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java
+++ b/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java
@@ -18,6 +18,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.jooby.*;
import io.jooby.exception.StartupException;
+import io.jooby.internal.undertow.UndertowGrpcHandler;
import io.jooby.internal.undertow.UndertowHandler;
import io.jooby.internal.undertow.UndertowWebSocket;
import io.jooby.output.OutputFactory;
@@ -101,6 +102,13 @@ public Server start(@NonNull Jooby... application) {
options.getMaxRequestSize(),
options.getDefaultHeaders());
+ GrpcProcessor grpcProcessor =
+ applications.get(0).getServices().getOrNull(GrpcProcessor.class);
+
+ if (grpcProcessor != null) {
+ handler = new UndertowGrpcHandler(handler, grpcProcessor);
+ }
+
if (options.getCompressionLevel() != null) {
int compressionLevel = options.getCompressionLevel();
handler =
@@ -154,7 +162,7 @@ public Server start(@NonNull Jooby... application) {
builder.addHttpListener(options.getPort(), options.getHost());
}
- // HTTP @
+ // HTTP/2
builder.setServerOption(ENABLE_HTTP2, options.isHttp2() == Boolean.TRUE);
var classLoader = this.applications.get(0).getClassLoader();
SSLContext sslContext = options.getSSLContext(classLoader);
diff --git a/modules/pom.xml b/modules/pom.xml
index be324dbec9..f7c20aa517 100644
--- a/modules/pom.xml
+++ b/modules/pom.xml
@@ -66,6 +66,8 @@
jooby-thymeleaf
jooby-camel
+ jooby-grpc
+
jooby-avaje-validator
jooby-hibernate-validator
diff --git a/pom.xml b/pom.xml
index 3ad1677b2d..c577729768 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
7.0.0
+ 1.78.0
1.5.32
diff --git a/tests/pom.xml b/tests/pom.xml
index beb8c743e1..ed5a9833e0 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -116,6 +116,11 @@
jooby-guice
${jooby.version}
+
+ io.jooby
+ jooby-grpc
+ ${jooby.version}
+
io.jooby
jooby-pac4j
@@ -174,6 +179,30 @@
kotlin-reflect
+
+ io.grpc
+ grpc-services
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-servlet
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-netty-shaded
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-okhttp
+ ${grpc.version}
+
+
org.slf4j
jcl-over-slf4j
@@ -296,7 +325,33 @@
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.7.1
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+ com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+
+
+ compile
+ compile-custom
+
+
+
+
+
org.jetbrains.kotlin
kotlin-maven-plugin
diff --git a/tests/src/main/proto/chat.proto b/tests/src/main/proto/chat.proto
new file mode 100644
index 0000000000..2c1e7c36d5
--- /dev/null
+++ b/tests/src/main/proto/chat.proto
@@ -0,0 +1,17 @@
+syntax = "proto3";
+
+package io.jooby.i3875;
+
+option java_package = "io.jooby.i3875";
+option java_multiple_files = true;
+
+service ChatService {
+ // BiDi: Client sends a stream of messages,
+ // Server responds to each one individually.
+ rpc ChatStream (stream ChatMessage) returns (stream ChatMessage);
+}
+
+message ChatMessage {
+ string user = 1;
+ string text = 2;
+}
diff --git a/tests/src/main/proto/hello.proto b/tests/src/main/proto/hello.proto
new file mode 100644
index 0000000000..80f776555b
--- /dev/null
+++ b/tests/src/main/proto/hello.proto
@@ -0,0 +1,22 @@
+syntax = "proto3";
+
+package io.jooby.i3875;
+
+option java_package = "io.jooby.i3875";
+option java_multiple_files = true;
+
+// The request message containing the user's name.
+message HelloRequest {
+ string name = 1;
+}
+
+// The response message containing the greetings
+message HelloReply {
+ string message = 1;
+}
+
+// The greeting service definition.
+service Greeter {
+ // Sends a greeting
+ rpc SayHello (HelloRequest) returns (HelloReply) {}
+}
diff --git a/tests/src/test/java/io/jooby/i3875/EchoChatService.java b/tests/src/test/java/io/jooby/i3875/EchoChatService.java
new file mode 100644
index 0000000000..676bf7baba
--- /dev/null
+++ b/tests/src/test/java/io/jooby/i3875/EchoChatService.java
@@ -0,0 +1,37 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.i3875;
+
+import io.grpc.stub.StreamObserver;
+
+public class EchoChatService extends ChatServiceGrpc.ChatServiceImplBase {
+
+ @Override
+ public StreamObserver chatStream(StreamObserver responseObserver) {
+ return new StreamObserver() {
+ @Override
+ public void onNext(ChatMessage request) {
+ ChatMessage response =
+ ChatMessage.newBuilder()
+ .setUser("Server")
+ .setText("Echo: " + request.getText())
+ .build();
+
+ responseObserver.onNext(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ System.err.println("Stream error: " + t.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
diff --git a/tests/src/test/java/io/jooby/i3875/EchoGreeterService.java b/tests/src/test/java/io/jooby/i3875/EchoGreeterService.java
new file mode 100644
index 0000000000..c76922aee3
--- /dev/null
+++ b/tests/src/test/java/io/jooby/i3875/EchoGreeterService.java
@@ -0,0 +1,25 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.i3875;
+
+import io.grpc.stub.StreamObserver;
+
+public class EchoGreeterService extends GreeterGrpc.GreeterImplBase {
+
+ private final EchoService echoService;
+
+ @jakarta.inject.Inject
+ public EchoGreeterService(EchoService echoService) {
+ this.echoService = echoService;
+ }
+
+ @Override
+ public void sayHello(HelloRequest req, StreamObserver responseObserver) {
+ HelloReply reply = HelloReply.newBuilder().setMessage(echoService.echo(req.getName())).build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+}
diff --git a/tests/src/test/java/io/jooby/i3875/EchoService.java b/tests/src/test/java/io/jooby/i3875/EchoService.java
new file mode 100644
index 0000000000..25d44d594e
--- /dev/null
+++ b/tests/src/test/java/io/jooby/i3875/EchoService.java
@@ -0,0 +1,13 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.i3875;
+
+public class EchoService {
+
+ public String echo(String value) {
+ return "Hello " + value;
+ }
+}
diff --git a/tests/src/test/java/io/jooby/i3875/GrpcTest.java b/tests/src/test/java/io/jooby/i3875/GrpcTest.java
new file mode 100644
index 0000000000..fd2a8a6de1
--- /dev/null
+++ b/tests/src/test/java/io/jooby/i3875/GrpcTest.java
@@ -0,0 +1,380 @@
+/*
+ * Jooby https://jooby.io
+ * Apache License Version 2.0 https://jooby.io/LICENSE.txt
+ * Copyright 2014 Edgar Espina
+ */
+package io.jooby.i3875;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.services.ProtoReflectionServiceV1;
+import io.grpc.reflection.v1.ServerReflectionGrpc;
+import io.grpc.reflection.v1.ServerReflectionRequest;
+import io.grpc.reflection.v1.ServerReflectionResponse;
+import io.grpc.stub.StreamObserver;
+import io.jooby.Jooby;
+import io.jooby.ServerOptions;
+import io.jooby.grpc.GrpcModule;
+import io.jooby.guice.GuiceModule;
+import io.jooby.junit.ServerTest;
+import io.jooby.junit.ServerTestRunner;
+
+public class GrpcTest {
+
+ private void setupApp(Jooby app) {
+ app.install(new GuiceModule());
+
+ app.install(
+ new GrpcModule(new EchoChatService(), ProtoReflectionServiceV1.newInstance())
+ .bind(EchoGreeterService.class));
+ }
+
+ @ServerTest
+ void shouldHandleUnaryRequests(ServerTestRunner runner) {
+ runner
+ .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .define(this::setupApp)
+ .ready(
+ http -> {
+ var channel =
+ ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort())
+ .usePlaintext()
+ .build();
+
+ try {
+ var stub = GreeterGrpc.newBlockingStub(channel);
+ var response =
+ stub.sayHello(HelloRequest.newBuilder().setName("Pablo Marmol").build());
+
+ assertThat(response.getMessage()).isEqualTo("Hello Pablo Marmol");
+ } finally {
+ channel.shutdown();
+ }
+ });
+ }
+
+ @ServerTest
+ void shouldHandleDeadlineExceeded(ServerTestRunner runner) {
+ runner
+ .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .define(this::setupApp)
+ .ready(
+ http -> {
+ var channel =
+ ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort())
+ .usePlaintext()
+ .build();
+
+ try {
+ // Attach an impossibly short deadline (1 millisecond) to the stub
+ var stub =
+ GreeterGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(1, TimeUnit.MILLISECONDS);
+
+ var exception =
+ org.junit.jupiter.api.Assertions.assertThrows(
+ StatusRuntimeException.class,
+ () ->
+ stub.sayHello(
+ HelloRequest.newBuilder().setName("Pablo Marmol").build()));
+
+ // Assert that the bridge correctly caught the timeout and returned Status 4
+ assertThat(exception.getStatus().getCode())
+ .isEqualTo(Status.Code.DEADLINE_EXCEEDED);
+ } finally {
+ channel.shutdown();
+ }
+ });
+ }
+
+ @ServerTest
+ void shouldHandleBidiStreaming(ServerTestRunner runner) {
+ runner
+ .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .define(this::setupApp)
+ .ready(
+ http -> {
+ var channel =
+ ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort())
+ .usePlaintext()
+ .build();
+
+ try {
+ var asyncStub = ChatServiceGrpc.newStub(channel);
+ var responses = new CopyOnWriteArrayList();
+ var latch = new CountDownLatch(1);
+
+ StreamObserver responseObserver =
+ new StreamObserver<>() {
+ @Override
+ public void onNext(ChatMessage value) {
+ responses.add(value.getText());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ latch.countDown();
+ }
+ };
+
+ var requestObserver = asyncStub.chatStream(responseObserver);
+
+ requestObserver.onNext(
+ ChatMessage.newBuilder().setUser("JavaClient").setText("Ping 1").build());
+
+ // Add a tiny delay to prevent CI thread-scheduler race conditions
+ // where the server closes the stream before Undertow finishes flushing Ping 2.
+ Thread.sleep(50);
+
+ requestObserver.onNext(
+ ChatMessage.newBuilder().setUser("JavaClient").setText("Ping 2").build());
+
+ // Allow Ping 2 to reach the server before sending the close signal
+ Thread.sleep(50);
+
+ requestObserver.onCompleted();
+
+ // Wait for the server stream to gracefully complete
+ boolean completed = latch.await(5, TimeUnit.SECONDS);
+
+ assertThat(completed).isTrue();
+ assertThat(responses).containsExactly("Echo: Ping 1", "Echo: Ping 2");
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ channel.shutdown();
+ }
+ });
+ }
+
+ @ServerTest
+ void shouldHandleServerStreaming(ServerTestRunner runner) {
+ runner
+ .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .define(this::setupApp)
+ .ready(
+ http -> {
+ var channel =
+ ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort())
+ .usePlaintext()
+ .build();
+
+ try {
+ var asyncStub = ChatServiceGrpc.newStub(channel);
+ var responses = new CopyOnWriteArrayList();
+ var latch = new CountDownLatch(1);
+
+ StreamObserver responseObserver =
+ new StreamObserver<>() {
+ @Override
+ public void onNext(ChatMessage value) {
+ responses.add(value.getText());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ latch.countDown();
+ }
+ };
+
+ // Assuming a server-streaming method exists: rpc ServerStream(ChatMessage) returns
+ // (stream ChatMessage)
+ // asyncStub.serverStream(ChatMessage.newBuilder().setText("Trigger").build(),
+ // responseObserver);
+ //
+ // boolean completed = latch.await(5, TimeUnit.SECONDS);
+ // assertThat(completed).isTrue();
+ // assertThat(responses.size()).isGreaterThan(1);
+
+ } finally {
+ channel.shutdown();
+ }
+ });
+ }
+
+ @ServerTest
+ void shouldHandleReflection(ServerTestRunner runner) {
+ runner
+ .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .define(this::setupApp)
+ .ready(
+ http -> {
+ var channel =
+ ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort())
+ .usePlaintext()
+ .build();
+
+ try {
+ var stub = ServerReflectionGrpc.newStub(channel);
+ var registeredServices = new CopyOnWriteArrayList();
+ var latch = new CountDownLatch(1);
+
+ StreamObserver responseObserver =
+ new StreamObserver<>() {
+ @Override
+ public void onNext(ServerReflectionResponse response) {
+ response
+ .getListServicesResponse()
+ .getServiceList()
+ .forEach(s -> registeredServices.add(s.getName()));
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ latch.countDown();
+ }
+ };
+
+ var requestObserver = stub.serverReflectionInfo(responseObserver);
+
+ requestObserver.onNext(
+ ServerReflectionRequest.newBuilder()
+ .setListServices("")
+ .setHost("localhost")
+ .build());
+ requestObserver.onCompleted();
+
+ boolean completed = latch.await(5, TimeUnit.SECONDS);
+
+ assertThat(completed).isTrue();
+ assertThat(registeredServices)
+ .contains(
+ "io.jooby.i3875.Greeter",
+ "io.jooby.i3875.ChatService",
+ "grpc.reflection.v1.ServerReflection");
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ channel.shutdown();
+ }
+ });
+ }
+
+ @ServerTest
+ void shouldHandleGrpcurlReflection(ServerTestRunner runner) {
+ org.junit.jupiter.api.Assumptions.assumeTrue(
+ isGrpcurlInstalled(), "grpcurl is not installed. Skipping strict HTTP/2 compliance test.");
+
+ runner
+ .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .define(this::setupApp)
+ .ready(
+ http -> {
+ try {
+ var pb =
+ new ProcessBuilder(
+ "grpcurl", "-plaintext", "localhost:" + runner.getAllocatedPort(), "list");
+
+ var process = pb.start();
+ var output =
+ new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
+ var error =
+ new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
+
+ int exitCode = process.waitFor();
+
+ assertThat(exitCode)
+ .withFailMessage("grpcurl failed with error: " + error)
+ .isEqualTo(0);
+
+ assertThat(output)
+ .contains(
+ "io.jooby.i3875.Greeter",
+ "io.jooby.i3875.ChatService",
+ "grpc.reflection.v1.ServerReflection");
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to execute grpcurl test", e);
+ }
+ });
+ }
+
+ /**
+ * When a gRPC client requests a method that doesn't exist, our native handlers will ignore it.
+ * Jooby's core router will then catch it and return a standard HTTP 404 Not Found. According to
+ * the gRPC-over-HTTP/2 specification, the gRPC client will automatically translate a pure HTTP
+ * 404 into a gRPC UNIMPLEMENTED (Status Code 12) exception.
+ *
+ * @param runner Sever runner.
+ */
+ @ServerTest
+ void shouldHandleMethodNotFound(ServerTestRunner runner) {
+ runner
+ .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .define(this::setupApp)
+ .ready(
+ http -> {
+ var channel =
+ ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort())
+ .usePlaintext()
+ .build();
+
+ try {
+ // 1. Create a fake method descriptor for a non-existent method
+ var unknownMethod =
+ io.grpc.MethodDescriptor.newBuilder()
+ .setType(io.grpc.MethodDescriptor.MethodType.UNARY)
+ .setFullMethodName("io.jooby.i3875.Greeter/UnknownMethod")
+ .setRequestMarshaller(
+ io.grpc.protobuf.ProtoUtils.marshaller(
+ HelloRequest.getDefaultInstance()))
+ .setResponseMarshaller(
+ io.grpc.protobuf.ProtoUtils.marshaller(HelloReply.getDefaultInstance()))
+ .build();
+
+ // 2. Execute the call manually and expect an exception
+ var exception =
+ org.junit.jupiter.api.Assertions.assertThrows(
+ io.grpc.StatusRuntimeException.class,
+ () ->
+ io.grpc.stub.ClientCalls.blockingUnaryCall(
+ channel,
+ unknownMethod,
+ io.grpc.CallOptions.DEFAULT,
+ HelloRequest.newBuilder().setName("Pablo Marmol").build()));
+
+ // 3. Assert that Jooby's HTTP 404 is correctly translated by the gRPC client into
+ // UNIMPLEMENTED
+ assertThat(exception.getStatus().getCode())
+ .isEqualTo(io.grpc.Status.Code.UNIMPLEMENTED);
+
+ } finally {
+ channel.shutdown();
+ }
+ });
+ }
+
+ private boolean isGrpcurlInstalled() {
+ try {
+ var process = new ProcessBuilder("grpcurl", "-version").start();
+ return process.waitFor() == 0;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
diff --git a/tests/src/test/java/io/jooby/test/Http2Test.java b/tests/src/test/java/io/jooby/test/Http2Test.java
index f0405aeda1..89796b2e35 100644
--- a/tests/src/test/java/io/jooby/test/Http2Test.java
+++ b/tests/src/test/java/io/jooby/test/Http2Test.java
@@ -42,7 +42,7 @@ public class Http2Test {
@ServerTest
public void h2body(ServerTestRunner runner) {
runner
- .options(new ServerOptions().setHttp2(true).setSecurePort(8443))
+ .options(new ServerOptions().setHttp2(true))
.define(
app -> {
app.install(new JacksonModule());