diff --git a/docs/asciidoc/gRPC.adoc b/docs/asciidoc/gRPC.adoc new file mode 100644 index 0000000000..39cb94822c --- /dev/null +++ b/docs/asciidoc/gRPC.adoc @@ -0,0 +1,108 @@ +== gRPC + +The `jooby-grpc` module provides first-class, native support for https://grpc.io/[gRPC]. + +Unlike traditional setups that require spinning up a separate gRPC server on a different port (often forcing a specific transport like Netty), this module embeds the `grpc-java` engine directly into Jooby. + +By using a custom native bridge, it allows you to run strictly-typed gRPC services alongside your standard REST API routes on the **exact same port**. It bypasses the standard HTTP/1.1 pipeline in favor of a highly optimized, native interceptor tailored for HTTP/2 multiplexing, reactive backpressure, and zero-copy byte framing. It works natively across Undertow, Netty, and Jetty. + +=== Dependency + +[source, xml, role="primary"] +.Maven +---- + + io.jooby + jooby-grpc + ${jooby.version} + +---- + +[source, gradle, role="secondary"] +.Gradle +---- +implementation 'io.jooby:jooby-grpc:${jooby.version}' +---- + +=== Usage + +gRPC strictly requires HTTP/2. Before installing the module, ensure your application is configured to use a supported server with HTTP/2 enabled. + +[source, java] +---- +import io.jooby.Jooby; +import io.jooby.ServerOptions; +import io.jooby.grpc.GrpcModule; + +public class App extends Jooby { + { + setServerOptions(new ServerOptions().setHttp2(true)); // <1> + + install(new GrpcModule( // <2> + new GreeterService() + )); + + get("/api/health", ctx -> "OK"); // <3> + } + + public static void main(String[] args) { + runApp(args, App::new); + } +} +---- +<1> Enable HTTP/2 on your server. +<2> Install the module and explicitly register your services. +<3> Standard REST routes still work on the exact same port! + +=== Dependency Injection + +If your gRPC services require external dependencies (like database repositories), you can register the service classes instead of pre-instantiated objects. The module will automatically provision them using your active Dependency Injection framework (e.g., Guice, Spring). + +[source, java] +---- +import io.jooby.Jooby; +import io.jooby.di.GuiceModule; +import io.jooby.grpc.GrpcModule; + +public class App extends Jooby { + { + install(new GuiceModule()); + + install(new GrpcModule( + GreeterService.class // <1> + )); + } +} +---- +<1> Pass the class references. The DI framework will instantiate them. + +WARNING: gRPC services are registered as **Singletons**. Ensure your service implementations are thread-safe and do not hold request-scoped state in instance variables. Heavy blocking operations will safely run on background workers, protecting the native server's I/O event loops. + +=== Server Reflection + +If you want to use tools like `grpcurl` or Postman to interact with your services without providing the `.proto` files, you can easily enable gRPC Server Reflection. + +Include the `grpc-services` dependency in your build, and register the v1 reflection service alongside your own: + +[source, java] +---- +import io.grpc.protobuf.services.ProtoReflectionServiceV1; + +public class App extends Jooby { + { + install(new GrpcModule( + new GreeterService(), + ProtoReflectionServiceV1.newInstance() // <1> + )); + } +} +---- +<1> Enables the modern `v1` reflection protocol for maximum compatibility with gRPC clients. + +=== Routing & Fallbacks + +The gRPC module intercepts requests natively before they reach Jooby's standard router. + +If a client attempts to call a gRPC method that does not exist, the request gracefully falls through to the standard Jooby router, returning a native `404 Not Found` (which gRPC clients will automatically translate to a Status `12 UNIMPLEMENTED`). + +If you misconfigure your server (e.g., attempting to run gRPC over HTTP/1.1), the fallback route will catch the request and throw an `IllegalStateException` to help you identify the missing configuration immediately. diff --git a/jooby/src/main/java/io/jooby/GrpcExchange.java b/jooby/src/main/java/io/jooby/GrpcExchange.java new file mode 100644 index 0000000000..7e3a068a06 --- /dev/null +++ b/jooby/src/main/java/io/jooby/GrpcExchange.java @@ -0,0 +1,83 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.function.Consumer; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Server-agnostic abstraction for a native HTTP/2 gRPC exchange. + * + *

This interface bridges the gap between the underlying web server (Undertow, Netty, or Jetty) + * and the reactive gRPC processor. Because gRPC heavily relies on HTTP/2 multiplexing, asynchronous + * I/O, and trailing headers (trailers) to communicate status codes, standard HTTP/1.1 context + * abstractions are insufficient. + * + *

Native server interceptors wrap their respective request/response objects into this interface, + * allowing the {@link GrpcProcessor} to read headers, push zero-copy byte frames, and finalize the + * stream with standard gRPC trailers without knowing which server engine is actually running. + */ +public interface GrpcExchange { + + /** + * Retrieves the requested URI path. + * + *

In gRPC, this dictates the routing and strictly follows the pattern: {@code + * /Fully.Qualified.ServiceName/MethodName}. + * + * @return The exact path of the incoming HTTP/2 request. + */ + String getRequestPath(); + + /** + * Retrieves the value of the specified HTTP request header. + * + * @param name The name of the header (case-insensitive). + * @return The header value, or {@code null} if the header is not present. + */ + @Nullable String getHeader(String name); + + /** + * Retrieves all HTTP request headers. + * + * @return A map containing all headers provided by the client. + */ + Map getHeaders(); + + /** + * Writes a gRPC-framed byte payload to the underlying non-blocking socket. + * + *

This method must push the buffer to the native network layer without blocking the invoking + * thread (which is typically a background gRPC worker). The implementation is responsible for + * translating the ByteBuffer into the server's native data format and flushing it over the + * network. + * + * @param payload The properly framed 5-byte-prefixed gRPC payload to send. + * @param onFailure A callback invoked immediately if the asynchronous network write fails (e.g., + * if the client abruptly disconnects or the channel is closed). + */ + void send(ByteBuffer payload, Consumer onFailure); + + /** + * Closes the HTTP/2 stream by appending the mandatory gRPC trailing headers. + * + *

In the gRPC specification, a successful response or an application-level error is + * communicated not by standard HTTP status codes (which are always 200 OK), but by appending + * HTTP/2 trailers ({@code grpc-status} and {@code grpc-message}) at the very end of the stream. + * + *

Calling this method informs the native server to write those trailing headers and formally + * close the bidirectional stream. + * + * @param statusCode The gRPC integer status code (e.g., {@code 0} for OK, {@code 12} for + * UNIMPLEMENTED, {@code 4} for DEADLINE_EXCEEDED). + * @param description An optional, human-readable status message detailing the result or error. + * May be {@code null}. + */ + void close(int statusCode, @Nullable String description); +} diff --git a/jooby/src/main/java/io/jooby/GrpcProcessor.java b/jooby/src/main/java/io/jooby/GrpcProcessor.java new file mode 100644 index 0000000000..25b1bfdd41 --- /dev/null +++ b/jooby/src/main/java/io/jooby/GrpcProcessor.java @@ -0,0 +1,51 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby; + +import java.nio.ByteBuffer; +import java.util.concurrent.Flow; + +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Core Service Provider Interface (SPI) for the gRPC extension. + * + *

This processor acts as the bridge between the native HTTP/2 web servers and the embedded gRPC + * engine. It is designed to intercept and process gRPC exchanges at the lowest possible network + * level, completely bypassing Jooby's standard HTTP/1.1 routing pipeline. This architecture ensures + * strict HTTP/2 specification compliance, zero-copy buffering, and reactive backpressure. + */ +public interface GrpcProcessor { + + /** + * Checks if the given URI path exactly matches a registered gRPC method. + * + *

Native server interceptors use this method as a lightweight, fail-fast guard. If this + * returns {@code true}, the server will hijack the request and upgrade it to a native gRPC + * stream. If {@code false}, the request safely falls through to the standard Jooby router + * (typically resulting in a standard HTTP 404 Not Found, which gRPC clients gracefully translate + * to Status 12 UNIMPLEMENTED). + * + * @param path The incoming request path (e.g., {@code /fully.qualified.Service/MethodName}). + * @return {@code true} if the path is mapped to an active gRPC service; {@code false} otherwise. + */ + boolean isGrpcMethod(String path); + + /** + * Initiates the reactive gRPC pipeline for an incoming HTTP/2 request. + * + *

When a valid gRPC request is intercepted, the native server wraps the underlying network + * connection into a {@link GrpcExchange} and passes it to this method. The processor uses this + * exchange to asynchronously send response headers, payload byte frames, and HTTP/2 trailers. + * + * @param exchange The native server exchange representing the bidirectional HTTP/2 stream. + * @return A reactive {@link Flow.Subscriber} that the native server must feed incoming request + * payload {@link ByteBuffer} chunks into. Returns {@code null} if the exchange was rejected. + * @throws IllegalStateException If an unregistered path bypasses the {@link + * #isGrpcMethod(String)} guard. + */ + Flow.Subscriber process(@NonNull GrpcExchange exchange); +} diff --git a/jooby/src/main/java/io/jooby/ServerOptions.java b/jooby/src/main/java/io/jooby/ServerOptions.java index 5ea3ab2a31..7e084053f4 100644 --- a/jooby/src/main/java/io/jooby/ServerOptions.java +++ b/jooby/src/main/java/io/jooby/ServerOptions.java @@ -290,7 +290,7 @@ public int getPort() { * @return True when SSL is enabled. Either bc the secure port, httpsOnly or SSL options are set. */ public boolean isSSLEnabled() { - return securePort != null || ssl != null || httpsOnly; + return securePort != null || ssl != null || http2 == Boolean.TRUE || httpsOnly; } /** diff --git a/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java b/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java index 34f1e07d9b..599bf7a6db 100644 --- a/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java +++ b/modules/jooby-apt/src/main/java/io/jooby/internal/apt/ParameterGenerator.java @@ -5,14 +5,15 @@ */ package io.jooby.internal.apt; -import javax.lang.model.element.*; +import static io.jooby.internal.apt.AnnotationSupport.findAnnotationValue; +import static io.jooby.internal.apt.Types.BUILT_IN; +import static java.util.stream.Collectors.joining; + import java.util.*; import java.util.function.Predicate; import java.util.stream.Stream; -import static io.jooby.internal.apt.AnnotationSupport.findAnnotationValue; -import static io.jooby.internal.apt.Types.BUILT_IN; -import static java.util.stream.Collectors.joining; +import javax.lang.model.element.*; public enum ParameterGenerator { ContextParam("getAttribute", "io.jooby.annotation.ContextParam", "jakarta.ws.rs.core.Context") { @@ -440,11 +441,13 @@ protected String defaultValue(VariableElement parameter, AnnotationMirror annota var sources = findAnnotationValue(annotation, AnnotationSupport.VALUE); return sources.isEmpty() ? "" : CodeBlock.of(", ", CodeBlock.string(sources.getFirst())); } else if (annotationType.startsWith("jakarta.ws.rs")) { - var defaultValueAnnotation = AnnotationSupport.findAnnotationByName( - parameter, "jakarta.ws.rs.DefaultValue"); + var defaultValueAnnotation = + AnnotationSupport.findAnnotationByName(parameter, "jakarta.ws.rs.DefaultValue"); if (defaultValueAnnotation != null) { var defaultValue = findAnnotationValue(defaultValueAnnotation, AnnotationSupport.VALUE); - return defaultValue.isEmpty() ? "" : CodeBlock.of(", ", CodeBlock.string(defaultValue.getFirst())); + return defaultValue.isEmpty() + ? "" + : CodeBlock.of(", ", CodeBlock.string(defaultValue.getFirst())); } } return ""; diff --git a/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java b/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java index aadae957bd..3dd6c500e0 100644 --- a/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java +++ b/modules/jooby-apt/src/test/java/tests/i3761/C3761Jakarta.java @@ -8,8 +8,8 @@ import io.jooby.annotation.GET; import io.jooby.annotation.Path; import jakarta.ws.rs.DefaultValue; -import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.FormParam; +import jakarta.ws.rs.QueryParam; @Path("/3761") public class C3761Jakarta { diff --git a/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java b/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java index 963f350c74..8cb4621f31 100644 --- a/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java +++ b/modules/jooby-apt/src/test/java/tests/i3761/Issue3761.java @@ -5,10 +5,11 @@ */ package tests.i3761; -import io.jooby.apt.ProcessorRunner; +import static org.junit.jupiter.api.Assertions.assertTrue; + import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertTrue; +import io.jooby.apt.ProcessorRunner; public class Issue3761 { @Test @@ -26,11 +27,8 @@ public void shouldGenerateJakartaDefaultValues() throws Exception { private static void assertSourceCodeRespectDefaultValues(String source) { assertTrue(source.contains("return c.number(ctx.query(\"num\", \"5\").intValue());")); assertTrue(source.contains("return c.unset(ctx.query(\"unset\").valueOrNull());")); - assertTrue( - source.contains("return c.emptySet(ctx.query(\"emptySet\", \"\").value());")); - assertTrue( - source.contains("return c.string(ctx.query(\"stringVal\", \"Hello\").value());")); - assertTrue( - source.contains("return c.bool(ctx.form(\"boolVal\", \"false\").booleanValue());")); + assertTrue(source.contains("return c.emptySet(ctx.query(\"emptySet\", \"\").value());")); + assertTrue(source.contains("return c.string(ctx.query(\"stringVal\", \"Hello\").value());")); + assertTrue(source.contains("return c.bool(ctx.form(\"boolVal\", \"false\").booleanValue());")); } } diff --git a/modules/jooby-bom/pom.xml b/modules/jooby-bom/pom.xml index 5d332b25cc..19e5a36c97 100644 --- a/modules/jooby-bom/pom.xml +++ b/modules/jooby-bom/pom.xml @@ -115,6 +115,11 @@ jooby-graphql ${project.version} + + io.jooby + jooby-grpc + ${project.version} + io.jooby jooby-gson diff --git a/modules/jooby-grpc/pom.xml b/modules/jooby-grpc/pom.xml new file mode 100644 index 0000000000..f215d25fab --- /dev/null +++ b/modules/jooby-grpc/pom.xml @@ -0,0 +1,67 @@ + + + + 4.0.0 + + + io.jooby + modules + 4.0.17-SNAPSHOT + + jooby-grpc + jooby-grpc + + + + io.jooby + jooby + ${jooby.version} + + + + org.slf4j + jul-to-slf4j + ${slf4j.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-inprocess + ${grpc.version} + + + io.grpc + grpc-services + ${grpc.version} + + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + org.jacoco + org.jacoco.agent + runtime + test + + + + org.mockito + mockito-core + test + + + diff --git a/modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcModule.java b/modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcModule.java new file mode 100644 index 0000000000..93f0a0aa32 --- /dev/null +++ b/modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcModule.java @@ -0,0 +1,194 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.grpc; + +import java.util.*; + +import org.slf4j.bridge.SLF4JBridgeHandler; + +import edu.umd.cs.findbugs.annotations.NonNull; +import io.grpc.BindableService; +import io.grpc.MethodDescriptor; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.jooby.*; +import io.jooby.internal.grpc.DefaultGrpcProcessor; + +/** + * Native gRPC extension for Jooby. * + * + *

This module allows you to run strictly-typed gRPC services alongside standard Jooby HTTP + * routes on the exact same port. It completely bypasses standard HTTP/1.1 pipelines in favor of a + * highly optimized, reactive, native interceptor tailored for HTTP/2 multiplexing and trailing + * headers. + * + *

Usage

+ * + *

gRPC requires HTTP/2. Ensure your Jooby application is configured to use a supported server + * with HTTP/2 enabled. + * + *

{@code
+ * import io.jooby.Jooby;
+ * import io.jooby.ServerOptions;
+ * import io.jooby.grpc.GrpcModule;
+ * * public class App extends Jooby {
+ * {
+ * setServerOptions(new ServerOptions().setHttp2(true).setSecurePort(8443));
+ * * // Install the extension and register your services
+ * install(new GrpcModule(new GreeterService()));
+ * }
+ * }
+ * }
+ * + *

Dependency Injection

+ * + *

If your gRPC services require external dependencies (like repositories or configuration), you + * can register the service classes instead of instances. The module will automatically provision + * them using Jooby's DI registry (e.g., Guice, Spring) during the application startup phase. + * + *

{@code
+ * public class App extends Jooby {
+ * {
+ * install(new GuiceModule());
+ * * // Pass the class reference. Guice will instantiate it!
+ * install(new GrpcModule(GreeterService.class));
+ * }
+ * }
+ * }
+ * + * * + * + *

Note: gRPC services are inherently registered as Singletons. Ensure your + * service implementations are thread-safe and do not hold request-scoped state in instance + * variables. + * + *

Logging

+ * + *

gRPC internally uses {@code java.util.logging}. This module automatically installs the {@link + * SLF4JBridgeHandler} to redirect all internal gRPC logs to your configured SLF4J backend. + */ +public class GrpcModule implements Extension { + private final List services = new ArrayList<>(); + private final List> serviceClasses = new ArrayList<>(); + + static { + // Optionally remove existing handlers attached to the j.u.l root logger + SLF4JBridgeHandler.removeHandlersForRootLogger(); + // Install the SLF4J bridge + SLF4JBridgeHandler.install(); + } + + /** + * Creates a new gRPC module with pre-instantiated service objects. * @param services One or more + * fully instantiated gRPC services. + */ + public GrpcModule(BindableService... services) { + this.services.addAll(Arrays.asList(services)); + } + + /** + * Creates a new gRPC module with service classes to be provisioned via Dependency Injection. + * * @param serviceClasses One or more gRPC service classes to be resolved from the Jooby + * registry. + */ + @SafeVarargs + public GrpcModule(Class... serviceClasses) { + bind(serviceClasses); + } + + /** + * Registers additional gRPC service classes to be provisioned via Dependency Injection. * @param + * serviceClasses One or more gRPC service classes to be resolved from the Jooby registry. + * + * @return This module for chaining. + */ + @SafeVarargs + public final GrpcModule bind(Class... serviceClasses) { + this.serviceClasses.addAll(List.of(serviceClasses)); + return this; + } + + /** + * Installs the gRPC extension into the Jooby application. * + * + *

This method sets up the {@link GrpcProcessor} SPI, registers native fallback routes, and + * defers DI resolution and the starting of the embedded in-process gRPC server to the {@code + * onStarting} lifecycle hook. * @param app The target Jooby application. + * + * @throws Exception If an error occurs during installation. + */ + @Override + public void install(@NonNull Jooby app) throws Exception { + var serverName = app.getName(); + var builder = InProcessServerBuilder.forName(serverName); + final Map> registry = new HashMap<>(); + + // 1. Register user-provided services + for (var service : services) { + bindService(app, builder, registry, service); + } + + var services = app.getServices(); + var processor = new DefaultGrpcProcessor(registry); + services.put(GrpcProcessor.class, processor); + + // Lazy init service from DI. + app.onStarting( + () -> { + for (Class serviceClass : serviceClasses) { + var service = app.require(serviceClass); + bindService(app, builder, registry, service); + } + var grpcServer = builder.build().start(); + + // KEEP .directExecutor() here! + // This ensures that when the background gRPC worker finishes, it instantly pushes + // the response back to Undertow/Netty without wasting time on another thread hop. + var channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + processor.setChannel(channel); + + app.onStop(channel::shutdownNow); + app.onStop(grpcServer::shutdownNow); + }); + } + + /** + * Internal helper to register a service with the gRPC builder, extract its method descriptors, + * and map a fail-fast route in the Jooby router. + * + * @param app The target Jooby application. + * @param server The in-process server builder. + * @param registry The method descriptor registry. + * @param service The provisioned gRPC service to bind. + */ + private static void bindService( + Jooby app, + InProcessServerBuilder server, + Map> registry, + BindableService service) { + server.addService(service); + for (var method : service.bindService().getMethods()) { + var descriptor = method.getMethodDescriptor(); + String methodFullName = descriptor.getFullMethodName(); + registry.put(methodFullName, descriptor); + String routePath = "/" + methodFullName; + + // Map a fallback route. If a request hits this, it means the native SPI interceptor + // failed to upgrade the request, typically due to a missing HTTP/2 configuration. + app.post( + routePath, + ctx -> { + throw new IllegalStateException( + "gRPC request reached the standard HTTP router for: " + + routePath + + ". " + + "This means the native gRPC server interceptor was bypassed. " + + "Ensure you are running with HTTP/2 enabled, " + + "and that the GrpcProcessor SPI is correctly loaded."); + }); + } + } +} diff --git a/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/DefaultGrpcProcessor.java b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/DefaultGrpcProcessor.java new file mode 100644 index 0000000000..16205a9b2d --- /dev/null +++ b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/DefaultGrpcProcessor.java @@ -0,0 +1,238 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.grpc; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.NonNull; +import io.grpc.CallOptions; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.ClientResponseObserver; +import io.jooby.GrpcExchange; +import io.jooby.GrpcProcessor; + +public class DefaultGrpcProcessor implements GrpcProcessor { + + // Minimal Marshaller to pass raw bytes through the bridge + private static class RawMarshaller implements MethodDescriptor.Marshaller { + @Override + public InputStream stream(byte[] value) { + return new ByteArrayInputStream(value); + } + + @Override + public byte[] parse(InputStream stream) { + try { + return stream.readAllBytes(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private final Logger log = LoggerFactory.getLogger(getClass()); + private ManagedChannel channel; + private final Map> registry; + + public DefaultGrpcProcessor(Map> registry) { + this.registry = registry; + } + + public void setChannel(ManagedChannel channel) { + this.channel = channel; + } + + @Override + public boolean isGrpcMethod(String path) { + // gRPC paths typically come in as "/package.Service/Method" + // Our registry stores them as "package.Service/Method" + String methodName = path.startsWith("/") ? path.substring(1) : path; + + // Quick O(1) hash map lookup + return registry.get(methodName) != null; + } + + @Override + public @NonNull Flow.Subscriber process(@NonNull GrpcExchange exchange) { + // Route paths: /{package.Service}/{Method} + String path = exchange.getRequestPath(); + // Remove the leading slash to match the gRPC method registry format + var descriptor = registry.get(path.substring(1)); + + if (descriptor == null) { + // MUST never occur, it is guarded by {@link #isGrpcMethod} + throw new IllegalStateException( + "Unregistered gRPC method: '" + + path + + "'. " + + "This request bypassed the GrpcProcessor.isGrpcMethod() guard, " + + "indicating a bug or misconfiguration in the native server interceptor."); + } + + var method = + MethodDescriptor.newBuilder() + .setType(descriptor.getType()) + .setFullMethodName(descriptor.getFullMethodName()) + .setRequestMarshaller(new RawMarshaller()) + .setResponseMarshaller(new RawMarshaller()) + .build(); + + var callOptions = extractCallOptions(exchange); + var metadata = extractMetadata(exchange); + + var interceptedChannel = + io.grpc.ClientInterceptors.intercept( + channel, io.grpc.stub.MetadataUtils.newAttachHeadersInterceptor(metadata)); + + var call = interceptedChannel.newCall(method, callOptions); + var isFinished = new AtomicBoolean(false); + + boolean isUnaryOrServerStreaming = + method.getType() == MethodDescriptor.MethodType.UNARY + || method.getType() == MethodDescriptor.MethodType.SERVER_STREAMING; + + var requestBridge = new GrpcRequestBridge(call, method.getType()); + + var responseObserver = + new ClientResponseObserver() { + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + if (!isUnaryOrServerStreaming) { + // requestStream.disableAutoInboundFlowControl(); + // Wire the readiness callback securely to the bridge + requestStream.setOnReadyHandler(requestBridge::onGrpcReady); + requestBridge.setRequestObserver(requestStream); + } + } + + @Override + public void onNext(byte[] value) { + if (isFinished.get()) return; + + ByteBuffer framed = addGrpcHeader(value); + + exchange.send( + framed, + cause -> { + if (cause != null) { + onError(cause); + } + }); + } + + @Override + public void onError(Throwable t) { + if (isFinished.compareAndSet(false, true)) { + log.debug("gRPC stream error", t); + Status status = Status.fromThrowable(t); + exchange.close(status.getCode().value(), status.getDescription()); + } + } + + @Override + public void onCompleted() { + if (isFinished.compareAndSet(false, true)) { + exchange.close(Status.OK.getCode().value(), null); + } + } + }; + + // 2. Inject the observer to break the circular dependency + requestBridge.setResponseObserver(responseObserver); + + if (!isUnaryOrServerStreaming) { + ClientCalls.asyncBidiStreamingCall(call, responseObserver); + } + + return requestBridge; + } + + /** Extracts the grpc-timeout header and applies it to CallOptions. */ + private CallOptions extractCallOptions(GrpcExchange exchange) { + CallOptions options = CallOptions.DEFAULT; + String timeout = exchange.getHeader("grpc-timeout"); + + if (timeout == null || timeout.isEmpty()) { + return options; + } + + try { + var unit = timeout.charAt(timeout.length() - 1); + var value = Long.parseLong(timeout.substring(0, timeout.length() - 1)); + + var timeUnit = + switch (unit) { + case 'H' -> java.util.concurrent.TimeUnit.HOURS; + case 'M' -> java.util.concurrent.TimeUnit.MINUTES; + case 'S' -> java.util.concurrent.TimeUnit.SECONDS; + case 'm' -> java.util.concurrent.TimeUnit.MILLISECONDS; + case 'u' -> java.util.concurrent.TimeUnit.MICROSECONDS; + case 'n' -> java.util.concurrent.TimeUnit.NANOSECONDS; + default -> null; + }; + + if (timeUnit != null) { + options = options.withDeadlineAfter(value, timeUnit); + } + } catch (Exception e) { + log.debug("Failed to parse grpc-timeout header: {}", timeout); + } + + return options; + } + + /** Maps standard HTTP headers from the GrpcExchange into gRPC Metadata. */ + private io.grpc.Metadata extractMetadata(GrpcExchange exchange) { + var metadata = new io.grpc.Metadata(); + + for (var header : exchange.getHeaders().entrySet()) { + var key = header.getKey().toLowerCase(); + + if (key.startsWith(":") + || key.startsWith("grpc-") + || key.equals("content-type") + || key.equals("te")) { + continue; + } + + if (key.endsWith("-bin")) { + io.grpc.Metadata.Key metaKey = + io.grpc.Metadata.Key.of(key, io.grpc.Metadata.BINARY_BYTE_MARSHALLER); + byte[] decoded = java.util.Base64.getDecoder().decode(header.getValue()); + metadata.put(metaKey, decoded); + } else { + io.grpc.Metadata.Key metaKey = + io.grpc.Metadata.Key.of(key, io.grpc.Metadata.ASCII_STRING_MARSHALLER); + metadata.put(metaKey, header.getValue()); + } + } + return metadata; + } + + /** Prepends the 5-byte gRPC header and returns a ready-to-write ByteBuffer. */ + private ByteBuffer addGrpcHeader(byte[] payload) { + var buffer = ByteBuffer.allocate(5 + payload.length); + buffer.put((byte) 0); // Compressed flag (0 = none) + buffer.putInt(payload.length); + buffer.put(payload); + buffer.flip(); // Prepare the buffer for reading by the server socket + return buffer; + } +} diff --git a/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcDeframer.java b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcDeframer.java new file mode 100644 index 0000000000..e011293c0f --- /dev/null +++ b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcDeframer.java @@ -0,0 +1,65 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.grpc; + +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +public class GrpcDeframer { + private enum State { + HEADER, + PAYLOAD + } + + private State state = State.HEADER; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(5); + private ByteBuffer payloadBuffer; + + /** Processes a chunk of data directly from the server's native ByteBuffer. */ + public void process(ByteBuffer input, Consumer onMessage) { + while (input.hasRemaining()) { + if (state == State.HEADER) { + int toRead = Math.min(headerBuffer.remaining(), input.remaining()); + + // Bulk read into header buffer + int oldLimit = input.limit(); + input.limit(input.position() + toRead); + headerBuffer.put(input); + input.limit(oldLimit); + + if (!headerBuffer.hasRemaining()) { + headerBuffer.flip(); + headerBuffer.get(); // skip compressed flag + int length = headerBuffer.getInt(); + + if (length == 0) { + onMessage.accept(new byte[0]); + headerBuffer.clear(); + } else { + payloadBuffer = ByteBuffer.allocate(length); + state = State.PAYLOAD; + } + } + } else if (state == State.PAYLOAD) { + int toRead = Math.min(payloadBuffer.remaining(), input.remaining()); + + // Bulk read into payload buffer + int oldLimit = input.limit(); + input.limit(input.position() + toRead); + payloadBuffer.put(input); + input.limit(oldLimit); + + if (!payloadBuffer.hasRemaining()) { + // The full gRPC message is assembled. Emit it. + onMessage.accept(payloadBuffer.array()); + headerBuffer.clear(); + payloadBuffer = null; + state = State.HEADER; + } + } + } + } +} diff --git a/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcRequestBridge.java b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcRequestBridge.java new file mode 100644 index 0000000000..4a65e74bd3 --- /dev/null +++ b/modules/jooby-grpc/src/main/java/io/jooby/internal/grpc/GrpcRequestBridge.java @@ -0,0 +1,121 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.grpc; + +import java.nio.ByteBuffer; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.ClientCall; +import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.ClientResponseObserver; + +public class GrpcRequestBridge implements Subscriber { + + private final Logger log = LoggerFactory.getLogger(getClass()); + private final GrpcDeframer deframer = new GrpcDeframer(); + private final AtomicBoolean completed = new AtomicBoolean(false); + + private final ClientCall call; + private final MethodDescriptor.MethodType methodType; + private final boolean isUnaryOrServerStreaming; + + private ClientResponseObserver responseObserver; + private ClientCallStreamObserver requestObserver; + private Subscription subscription; + private byte[] singlePayload; + + public GrpcRequestBridge( + ClientCall call, MethodDescriptor.MethodType methodType) { + this.call = call; + this.methodType = methodType; + this.isUnaryOrServerStreaming = + methodType == MethodDescriptor.MethodType.UNARY + || methodType == MethodDescriptor.MethodType.SERVER_STREAMING; + } + + public void setResponseObserver(ClientResponseObserver responseObserver) { + this.responseObserver = responseObserver; + } + + public void setRequestObserver(ClientCallStreamObserver requestObserver) { + this.requestObserver = requestObserver; + } + + public void onGrpcReady() { + if (subscription != null + && requestObserver != null + && requestObserver.isReady() + && !completed.get()) { + subscription.request(1); + } + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + // Initial demand to kick off the network body reader + subscription.request(1); + } + + @Override + public void onNext(ByteBuffer item) { + try { + deframer.process( + item, + msg -> { + if (isUnaryOrServerStreaming) { + singlePayload = msg; + } else { + requestObserver.onNext(msg); + } + }); + + if (isUnaryOrServerStreaming) { + subscription.request(1); // Keep reading until EOF for unary/server-streaming + } else if (requestObserver != null && requestObserver.isReady()) { + subscription.request(1); // Ask for more if the streaming gRPC buffer is ready + } + } catch (Throwable t) { + subscription.cancel(); + onError(t); + } + } + + @Override + public void onError(Throwable throwable) { + if (completed.compareAndSet(false, true)) { + log.error("Error in gRPC request stream", throwable); + if (requestObserver != null) { + requestObserver.onError(throwable); + } else if (responseObserver != null) { + responseObserver.onError(throwable); + } + } + } + + @Override + public void onComplete() { + if (completed.compareAndSet(false, true)) { + if (isUnaryOrServerStreaming) { + byte[] payload = singlePayload == null ? new byte[0] : singlePayload; + if (methodType == MethodDescriptor.MethodType.UNARY) { + ClientCalls.asyncUnaryCall(call, payload, responseObserver); + } else { + ClientCalls.asyncServerStreamingCall(call, payload, responseObserver); + } + } else if (requestObserver != null) { + requestObserver.onCompleted(); + } + } + } +} diff --git a/modules/jooby-grpc/src/test/java/io/jooby/grpc/DefaultGrpcProcessorTest.java b/modules/jooby-grpc/src/test/java/io/jooby/grpc/DefaultGrpcProcessorTest.java new file mode 100644 index 0000000000..940ef8acfc --- /dev/null +++ b/modules/jooby-grpc/src/test/java/io/jooby/grpc/DefaultGrpcProcessorTest.java @@ -0,0 +1,221 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.grpc; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.Flow; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.jooby.GrpcExchange; +import io.jooby.internal.grpc.DefaultGrpcProcessor; +import io.jooby.internal.grpc.GrpcRequestBridge; + +public class DefaultGrpcProcessorTest { + + private ManagedChannel channel; + private Map> registry; + private GrpcExchange exchange; + private ClientCall call; + private DefaultGrpcProcessor bridge; + + @BeforeEach + @SuppressWarnings("unchecked") + public void setUp() { + channel = mock(ManagedChannel.class); + registry = mock(Map.class); + exchange = mock(GrpcExchange.class); + call = mock(ClientCall.class); + + // The interceptor wraps the channel, but eventually delegates to the real one + when(channel.newCall(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(call); + + bridge = new DefaultGrpcProcessor(registry); + bridge.setChannel(channel); + } + + @Test + @DisplayName( + "Should throw IllegalStateException if an unknown method bypasses the isGrpcMethod guard") + public void shouldRejectUnknownMethod() { + when(exchange.getRequestPath()).thenReturn("/unknown.Service/Method"); + when(registry.get("unknown.Service/Method")).thenReturn(null); + + // Assert that the bridge correctly identifies the illegal state + IllegalStateException exception = + org.junit.jupiter.api.Assertions.assertThrows( + IllegalStateException.class, () -> bridge.process(exchange)); + + // Ensure the exchange wasn't manipulated or closed, because the framework + // should crash the thread instead of trying to gracefully close a gRPC stream. + verify(exchange, org.mockito.Mockito.never()).close(anyInt(), any()); + } + + @Test + @DisplayName("Should successfully bridge a valid Bidi-Streaming gRPC call") + public void shouldProcessValidStreamingCall() { + setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING); + + Flow.Subscriber subscriber = bridge.process(exchange); + + assertNotNull(subscriber); + assertTrue(subscriber instanceof GrpcRequestBridge); + + // Verify call was actually created + verify(channel).newCall(any(MethodDescriptor.class), any(CallOptions.class)); + } + + @Test + @DisplayName("Should parse grpc-timeout header into CallOptions deadline") + public void shouldParseGrpcTimeout() { + setupValidMethod("test.Chat/TimeoutCall", MethodDescriptor.MethodType.UNARY); + + // 1000m = 1000 milliseconds + when(exchange.getHeader("grpc-timeout")).thenReturn("1000m"); + + bridge.process(exchange); + + ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(CallOptions.class); + verify(channel).newCall(any(MethodDescriptor.class), optionsCaptor.capture()); + + CallOptions options = optionsCaptor.getValue(); + assertNotNull(options.getDeadline()); + assertTrue(options.getDeadline().isExpired() == false, "Deadline should be in the future"); + } + + @Test + @DisplayName("Should correctly deframe and send gRPC payload to the client") + public void shouldFrameAndSendResponsePayload() { + setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING); + bridge.process(exchange); + + // Capture the internal listener that gRPC uses to push data back to us + ArgumentCaptor> listenerCaptor = + ArgumentCaptor.forClass(ClientCall.Listener.class); + verify(call).start(listenerCaptor.capture(), any(Metadata.class)); + ClientCall.Listener responseListener = listenerCaptor.getValue(); + + // Simulate the server pushing a payload back + byte[] serverResponse = "hello".getBytes(); + responseListener.onMessage(serverResponse); + + // Verify our bridge framed it with the 5-byte header and sent it to Jooby + ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); + verify(exchange).send(bufferCaptor.capture(), any()); + + ByteBuffer framedBuffer = bufferCaptor.getValue(); + assertEquals(5 + serverResponse.length, framedBuffer.limit()); + assertEquals((byte) 0, framedBuffer.get(), "Compressed flag should be 0"); + assertEquals(serverResponse.length, framedBuffer.getInt(), "Length should match payload"); + + byte[] capturedPayload = new byte[serverResponse.length]; + framedBuffer.get(capturedPayload); + assertArrayEquals(serverResponse, capturedPayload); + } + + @Test + @DisplayName("Should close exchange with HTTP/2 trailing status when server throws error") + public void shouldCloseExchangeOnError() { + setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING); + bridge.process(exchange); + + ArgumentCaptor> listenerCaptor = + ArgumentCaptor.forClass(ClientCall.Listener.class); + verify(call).start(listenerCaptor.capture(), any(Metadata.class)); + ClientCall.Listener responseListener = listenerCaptor.getValue(); + + // Simulate an internal server error from the gRPC engine + Status errorStatus = Status.INVALID_ARGUMENT.withDescription("Bad data"); + responseListener.onClose(errorStatus, new Metadata()); + + // Verify it mapped down to the core exchange SPI + verify(exchange).close(errorStatus.getCode().value(), "Bad data"); + } + + @Test + @DisplayName("Should gracefully close exchange with Status 0 on completion") + public void shouldCloseExchangeOnComplete() { + setupValidMethod("test.Chat/Stream", MethodDescriptor.MethodType.BIDI_STREAMING); + bridge.process(exchange); + + ArgumentCaptor> listenerCaptor = + ArgumentCaptor.forClass(ClientCall.Listener.class); + verify(call).start(listenerCaptor.capture(), any(Metadata.class)); + ClientCall.Listener responseListener = listenerCaptor.getValue(); + + // Simulate clean completion + responseListener.onClose(Status.OK, new Metadata()); + + verify(exchange).close(Status.OK.getCode().value(), null); + } + + @Test + @DisplayName("Should extract and decode custom metadata headers") + public void shouldExtractMetadata() { + // CRITICAL FIX: Use BIDI_STREAMING instead of UNARY. + // Unary delays call.start() until the request stream completes. Bidi starts immediately. + setupValidMethod("test.Chat/Headers", MethodDescriptor.MethodType.BIDI_STREAMING); + + Map incomingHeaders = + Map.of( + "x-custom-id", + "12345", + "x-custom-bin", + Base64.getEncoder().encodeToString("binary_data".getBytes())); + when(exchange.getHeaders()).thenReturn(incomingHeaders); + + bridge.process(exchange); + + // Verify that the metadata was extracted and attached to the call + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + verify(call).start(any(), metadataCaptor.capture()); + + Metadata metadata = metadataCaptor.getValue(); + + assertEquals( + "12345", metadata.get(Metadata.Key.of("x-custom-id", Metadata.ASCII_STRING_MARSHALLER))); + assertArrayEquals( + "binary_data".getBytes(), + metadata.get(Metadata.Key.of("x-custom-bin", Metadata.BINARY_BYTE_MARSHALLER))); + } + + /** Helper to mock out a valid MethodDescriptor in the registry. */ + @SuppressWarnings("unchecked") + private void setupValidMethod(String methodPath, MethodDescriptor.MethodType type) { + MethodDescriptor descriptor = + MethodDescriptor.newBuilder() + .setType(type) + .setFullMethodName(methodPath) + .setRequestMarshaller(mock(MethodDescriptor.Marshaller.class)) + .setResponseMarshaller(mock(MethodDescriptor.Marshaller.class)) + .build(); + + when(exchange.getRequestPath()).thenReturn("/" + methodPath); + //noinspection rawtypes + when(registry.get(methodPath)).thenReturn((MethodDescriptor) descriptor); + } +} diff --git a/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcDeframerTest.java b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcDeframerTest.java new file mode 100644 index 0000000000..5239681cc9 --- /dev/null +++ b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcDeframerTest.java @@ -0,0 +1,126 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.grpc; + +import static org.junit.jupiter.api.Assertions.*; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.jooby.internal.grpc.GrpcDeframer; + +public class GrpcDeframerTest { + + private GrpcDeframer deframer; + private List outputMessages; + + @BeforeEach + public void setUp() { + deframer = new GrpcDeframer(); + outputMessages = new ArrayList<>(); + } + + @Test + public void shouldParseSingleCompleteMessage() { + byte[] payload = "hello grpc".getBytes(); + ByteBuffer frame = createGrpcFrame(payload); + + deframer.process(frame, msg -> outputMessages.add(msg)); + + assertEquals(1, outputMessages.size()); + assertArrayEquals(payload, outputMessages.get(0)); + } + + @Test + public void shouldParseFragmentedHeader() { + byte[] payload = "fragmented header".getBytes(); + byte[] frame = createGrpcFrame(payload).array(); + + // Send the first 2 bytes of the header + deframer.process(ByteBuffer.wrap(frame, 0, 2), msg -> outputMessages.add(msg)); + assertEquals(0, outputMessages.size(), "Should not emit message yet"); + + // Send the rest of the header and the payload + deframer.process(ByteBuffer.wrap(frame, 2, frame.length - 2), msg -> outputMessages.add(msg)); + + assertEquals(1, outputMessages.size()); + assertArrayEquals(payload, outputMessages.get(0)); + } + + @Test + public void shouldParseFragmentedPayload() { + byte[] payload = "this is a very long payload that gets split".getBytes(); + byte[] frame = createGrpcFrame(payload).array(); + + // Send the 5-byte header + first 10 bytes of payload (15 bytes total) + deframer.process(ByteBuffer.wrap(frame, 0, 15), msg -> outputMessages.add(msg)); + assertEquals(0, outputMessages.size(), "Should not emit message until full payload arrives"); + + // Send the remainder of the payload + deframer.process(ByteBuffer.wrap(frame, 15, frame.length - 15), msg -> outputMessages.add(msg)); + + assertEquals(1, outputMessages.size()); + assertArrayEquals(payload, outputMessages.get(0)); + } + + @Test + public void shouldParseMultipleMessagesInSingleBuffer() { + byte[] payload1 = "message 1".getBytes(); + byte[] payload2 = "message 2".getBytes(); + + ByteBuffer frame1 = createGrpcFrame(payload1); + ByteBuffer frame2 = createGrpcFrame(payload2); + + // Combine both frames into a single buffer + ByteBuffer combined = ByteBuffer.allocate(frame1.capacity() + frame2.capacity()); + combined.put(frame1).put(frame2); + combined.flip(); + + deframer.process(combined, msg -> outputMessages.add(msg)); + + assertEquals(2, outputMessages.size()); + assertArrayEquals(payload1, outputMessages.get(0)); + assertArrayEquals(payload2, outputMessages.get(1)); + } + + @Test + public void shouldHandleZeroLengthPayload() { + byte[] payload = new byte[0]; + ByteBuffer frame = createGrpcFrame(payload); + + deframer.process(frame, msg -> outputMessages.add(msg)); + + assertEquals(1, outputMessages.size()); + assertArrayEquals(payload, outputMessages.get(0)); + } + + @Test + public void shouldHandleExtremeFragmentationByteByByte() { + byte[] payload = "byte by byte".getBytes(); + byte[] frame = createGrpcFrame(payload).array(); + + for (byte b : frame) { + deframer.process(ByteBuffer.wrap(new byte[] {b}), msg -> outputMessages.add(msg)); + } + + assertEquals(1, outputMessages.size()); + assertArrayEquals(payload, outputMessages.get(0)); + } + + /** Helper to wrap a raw payload in the standard 5-byte gRPC framing. */ + private ByteBuffer createGrpcFrame(byte[] payload) { + ByteBuffer buffer = ByteBuffer.allocate(5 + payload.length); + buffer.put((byte) 0); // Compressed flag + buffer.putInt(payload.length); // Length + buffer.put(payload); // Data + buffer.flip(); + return buffer; + } +} diff --git a/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcRequestBridgeTest.java b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcRequestBridgeTest.java new file mode 100644 index 0000000000..2d6ed11699 --- /dev/null +++ b/modules/jooby-grpc/src/test/java/io/jooby/grpc/GrpcRequestBridgeTest.java @@ -0,0 +1,159 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.grpc; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.concurrent.Flow.Subscription; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import io.grpc.ClientCall; +import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.jooby.internal.grpc.GrpcRequestBridge; + +public class GrpcRequestBridgeTest { + + private ClientCall call; + private Subscription subscription; + private ClientCallStreamObserver requestObserver; + private ClientResponseObserver responseObserver; + private GrpcRequestBridge bridge; + + @BeforeEach + @SuppressWarnings("unchecked") + public void setUp() { + call = mock(ClientCall.class); + subscription = mock(Subscription.class); + requestObserver = mock(ClientCallStreamObserver.class); + responseObserver = mock(ClientResponseObserver.class); + + // Default to BIDI_STREAMING to test standard flow-control and backpressure + bridge = new GrpcRequestBridge(call, MethodDescriptor.MethodType.BIDI_STREAMING); + bridge.setRequestObserver(requestObserver); + bridge.setResponseObserver(responseObserver); + } + + @Test + @DisplayName("Should request initial demand (1) upon subscription") + public void shouldRequestInitialDemandOnSubscribe() { + bridge.onSubscribe(subscription); + + verify(subscription).request(1); + } + + @Test + @DisplayName("Should forward payload to requestObserver and request more if gRPC buffer is ready") + public void shouldSendMessageAndRequestMoreIfReady() { + bridge.onSubscribe(subscription); + reset(subscription); // Clear the initial request(1) counter + + when(requestObserver.isReady()).thenReturn(true); + + byte[] payload = "test".getBytes(); + ByteBuffer frame = createFrame(payload); + + bridge.onNext(frame); + + ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); + verify(requestObserver).onNext(captor.capture()); + assertArrayEquals(payload, captor.getValue(), "The deframed payload should match exactly"); + + // Because isReady() is true, it should demand the next network chunk + verify(subscription).request(1); + } + + @Test + @DisplayName( + "Should forward payload but apply backpressure (do not request) if gRPC is not ready") + public void shouldNotRequestMoreIfNotReady() { + bridge.onSubscribe(subscription); + reset(subscription); + + when(requestObserver.isReady()).thenReturn(false); + + byte[] payload = "test".getBytes(); + bridge.onNext(createFrame(payload)); + + verify(requestObserver).onNext(any()); + + // Since isReady() is false, it should NOT request more data, effectively applying backpressure + verify(subscription, never()).request(anyLong()); + } + + @Test + @DisplayName("Should complete the requestObserver when the network stream completes") + public void shouldCompleteRequestObserverOnComplete() { + bridge.onSubscribe(subscription); + bridge.onComplete(); + + verify(requestObserver).onCompleted(); + } + + @Test + @DisplayName("Should propagate network errors to the requestObserver") + public void shouldPropagateErrorToObserver() { + bridge.onSubscribe(subscription); + Throwable error = new RuntimeException("Stream network failure"); + + bridge.onError(error); + + verify(requestObserver).onError(error); + } + + @Test + @DisplayName("Unary calls should accumulate payload without forwarding until EOF") + public void shouldHandleUnaryCallsDifferently() { + bridge = new GrpcRequestBridge(call, MethodDescriptor.MethodType.UNARY); + bridge.setResponseObserver(responseObserver); + bridge.onSubscribe(subscription); + reset(subscription); + + byte[] payload = "unary".getBytes(); + bridge.onNext(createFrame(payload)); + + // For Unary and Server Streaming, chunks are NOT passed via onNext + verify(requestObserver, never()).onNext(any()); + + // It should keep requesting data from the network until EOF is reached + verify(subscription).request(1); + } + + @Test + @DisplayName("onGrpcReady callback should trigger network demand if stream is active") + public void shouldRequestMoreOnGrpcReady() { + bridge.onSubscribe(subscription); + reset(subscription); + + when(requestObserver.isReady()).thenReturn(true); + + bridge.onGrpcReady(); + + verify(subscription).request(1); + } + + private ByteBuffer createFrame(byte[] payload) { + ByteBuffer frame = ByteBuffer.allocate(5 + payload.length); + frame.put((byte) 0); // Uncompressed flag + frame.putInt(payload.length); + frame.put(payload); + frame.flip(); // Prepare buffer for reading + return frame; + } +} diff --git a/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java b/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java index d54ad90ae0..db3bc6132a 100644 --- a/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java +++ b/modules/jooby-jackson3/src/test/java/io/jooby/jackson3/Jackson3JsonModuleTest.java @@ -16,13 +16,13 @@ import org.junit.jupiter.api.Test; -import tools.jackson.databind.ObjectMapper; -import tools.jackson.dataformat.xml.XmlMapper; import io.jooby.Body; import io.jooby.Context; import io.jooby.MediaType; import io.jooby.output.OutputFactory; import io.jooby.output.OutputOptions; +import tools.jackson.databind.ObjectMapper; +import tools.jackson.dataformat.xml.XmlMapper; public class Jackson3JsonModuleTest { diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcExchange.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcExchange.java new file mode 100644 index 0000000000..7abd91364d --- /dev/null +++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcExchange.java @@ -0,0 +1,100 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.jetty; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; + +import io.jooby.GrpcExchange; + +public class JettyGrpcExchange implements GrpcExchange { + + private final Request request; + private final Response response; + private final Callback jettyCallback; + private boolean headersSent = false; + + // Create a mutable trailers object that Jetty will pull from at the end of the stream + private final HttpFields.Mutable trailers = HttpFields.build(); + + public JettyGrpcExchange(Request request, Response response, Callback jettyCallback) { + this.request = request; + this.response = response; + this.jettyCallback = jettyCallback; + + response.getHeaders().put("Content-Type", "application/grpc"); + + // Register the supplier BEFORE the response commits + response.setTrailersSupplier(() -> trailers); + } + + @Override + public String getRequestPath() { + return request.getHttpURI().getPath(); + } + + @Override + public String getHeader(String name) { + return request.getHeaders().get(name); + } + + @Override + public Map getHeaders() { + Map map = new HashMap<>(); + for (var field : request.getHeaders()) { + map.put(field.getName(), field.getValue()); + } + return map; + } + + @Override + public void send(ByteBuffer payload, Consumer callback) { + headersSent = true; + + response.write( + false, + payload, + new Callback() { + @Override + public void succeeded() { + callback.accept(null); + } + + @Override + public void failed(Throwable x) { + callback.accept(x); + } + }); + } + + @Override + public void close(int statusCode, String description) { + if (headersSent) { + // Trailers-Appended: Data was sent, populate the mutable trailers object + trailers.add("grpc-status", String.valueOf(statusCode)); + if (description != null) { + trailers.add("grpc-message", description); + } + + // Complete stream. Jetty will automatically read from the supplier we registered earlier. + response.write(true, ByteBuffer.allocate(0), jettyCallback); + } else { + // Trailers-Only: No data was sent, trailers become standard HTTP headers. + response.getHeaders().put("grpc-status", String.valueOf(statusCode)); + if (description != null) { + response.getHeaders().put("grpc-message", description); + } + response.write(true, ByteBuffer.allocate(0), jettyCallback); + } + } +} diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcHandler.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcHandler.java new file mode 100644 index 0000000000..9ae74a03a6 --- /dev/null +++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcHandler.java @@ -0,0 +1,51 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.jetty; + +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; + +import io.jooby.GrpcProcessor; + +public class JettyGrpcHandler extends Handler.Wrapper { + + private final GrpcProcessor processor; + + public JettyGrpcHandler(Handler next, GrpcProcessor processor) { + this.processor = processor; + setHandler(next); + } + + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception { + var contentType = request.getHeaders().get("Content-Type"); + + if (processor.isGrpcMethod(request.getHttpURI().getPath()) + && contentType != null + && contentType.startsWith("application/grpc")) { + + if (!"HTTP/2.0".equals(request.getConnectionMetaData().getProtocol())) { + response.setStatus(426); // Upgrade Required + response.getHeaders().put("Connection", "Upgrade"); + response.getHeaders().put("Upgrade", "h2c"); + callback.succeeded(); + return true; + } + + var exchange = new JettyGrpcExchange(request, response, callback); + var subscriber = processor.process(exchange); + + new JettyGrpcInputBridge(request, subscriber, callback).start(); + + return true; + } + + // not grpc, move next + return super.handle(request, response, callback); + } +} diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcInputBridge.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcInputBridge.java new file mode 100644 index 0000000000..a78101553a --- /dev/null +++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyGrpcInputBridge.java @@ -0,0 +1,91 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.jetty; + +import java.nio.ByteBuffer; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.Callback; + +public class JettyGrpcInputBridge implements Flow.Subscription, Runnable { + + private final Request request; + private final Flow.Subscriber subscriber; + private final Callback callback; + private final AtomicLong demand = new AtomicLong(); + + public JettyGrpcInputBridge( + Request request, Flow.Subscriber subscriber, Callback callback) { + this.request = request; + this.subscriber = subscriber; + this.callback = callback; + } + + public void start() { + subscriber.onSubscribe(this); + } + + @Override + public void request(long n) { + if (n <= 0) { + subscriber.onError(new IllegalArgumentException("Demand must be positive")); + return; + } + + if (demand.getAndAdd(n) == 0) { + run(); + } + } + + @Override + public void cancel() { + demand.set(0); + callback.failed(new CancellationException("gRPC stream cancelled by client")); + } + + @Override + public void run() { + try { + while (demand.get() > 0) { + var chunk = request.read(); + + if (chunk == null) { + request.demand(this); + return; + } + + try { + var failure = chunk.getFailure(); + if (failure != null) { + subscriber.onError(failure); + callback.failed(failure); + return; + } + + var buffer = chunk.getByteBuffer(); + if (buffer != null && buffer.hasRemaining()) { + subscriber.onNext(buffer); + demand.decrementAndGet(); + } + + if (chunk.isLast()) { + subscriber.onComplete(); + return; + } + + } finally { + chunk.release(); + } + } + } catch (Throwable t) { + subscriber.onError(t); + callback.failed(t); + } + } +} diff --git a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java index ab3b8d50cc..1a1d03d459 100644 --- a/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java +++ b/modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/http2/JettyHttp2Configurer.java @@ -5,8 +5,6 @@ */ package io.jooby.internal.jetty.http2; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; @@ -25,13 +23,23 @@ public class JettyHttp2Configurer { public List configure(HttpConfiguration input) { if (input.getCustomizer(SecureRequestCustomizer.class) != null) { ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(H2, H2_17, HTTP_1_1); - alpn.setDefaultProtocol(HTTP_1_1); + alpn.setDefaultProtocol(H2); - HTTP2ServerConnectionFactory https2 = new HTTP2ServerConnectionFactory(input); + HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(input); + h2.setInitialStreamRecvWindow(1024 * 1024); + h2.setInitialSessionRecvWindow(10 * 1024 * 1024); - return Arrays.asList(alpn, https2); + // FIX: Set Max Concurrent Streams higher if you have many bidi clients + h2.setMaxConcurrentStreams(1000); + return List.of(alpn, h2); } else { - return Collections.singletonList(new HTTP2CServerConnectionFactory(input)); + var h2c = new HTTP2CServerConnectionFactory(input); + h2c.setInitialStreamRecvWindow(1024 * 1024); + h2c.setInitialSessionRecvWindow(10 * 1024 * 1024); + + // FIX: Set Max Concurrent Streams higher if you have many bidi clients + h2c.setMaxConcurrentStreams(1000); + return List.of(h2c); } } } diff --git a/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java b/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java index 6188177e03..ab25f22b3a 100644 --- a/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java +++ b/modules/jooby-jetty/src/main/java/io/jooby/jetty/JettyServer.java @@ -32,9 +32,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.jooby.*; import io.jooby.exception.StartupException; -import io.jooby.internal.jetty.JettyHandler; -import io.jooby.internal.jetty.JettyHttpExpectAndContinueHandler; -import io.jooby.internal.jetty.PrefixHandler; +import io.jooby.internal.jetty.*; import io.jooby.internal.jetty.http2.JettyHttp2Configurer; import io.jooby.output.OutputFactory; @@ -303,12 +301,17 @@ private List> createHandler( if (options.isExpectContinue() == Boolean.TRUE) { handler = new JettyHttpExpectAndContinueHandler(handler); } + GrpcProcessor grpcProcessor = + application.getServices().getOrNull(GrpcProcessor.class); + if (grpcProcessor != null) { + handler = new JettyGrpcHandler(handler, grpcProcessor); + } return Map.entry(application.getContextPath(), handler); }) .toList(); } - @NonNull @Override + @Override public List getLoggerOff() { return List.of( "org.eclipse.jetty.server.Server", diff --git a/modules/jooby-jetty/src/main/java/module-info.java b/modules/jooby-jetty/src/main/java/module-info.java index e10ff88cd1..8b1a6e495b 100644 --- a/modules/jooby-jetty/src/main/java/module-info.java +++ b/modules/jooby-jetty/src/main/java/module-info.java @@ -18,6 +18,7 @@ requires org.eclipse.jetty.http2.server; requires org.eclipse.jetty.websocket.server; requires java.desktop; + requires org.eclipse.jetty.http; provides Server with JettyServer; diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java new file mode 100644 index 0000000000..5a4b4adb1b --- /dev/null +++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java @@ -0,0 +1,115 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.netty; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import io.jooby.GrpcExchange; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; + +public class NettyGrpcExchange implements GrpcExchange { + + private final ChannelHandlerContext ctx; + private final HttpRequest request; + private boolean headersSent = false; + + public NettyGrpcExchange(ChannelHandlerContext ctx, HttpRequest request) { + this.ctx = ctx; + this.request = request; + } + + @Override + public String getRequestPath() { + String uri = request.uri(); + int queryIndex = uri.indexOf('?'); + return queryIndex > 0 ? uri.substring(0, queryIndex) : uri; + } + + @Override + public String getHeader(String name) { + return request.headers().get(name); + } + + @Override + public Map getHeaders() { + Map map = new HashMap<>(); + for (Map.Entry entry : request.headers()) { + map.put(entry.getKey(), entry.getValue()); + } + return map; + } + + private void sendHeadersIfNecessary() { + if (!headersSent) { + // Send the initial HTTP/2 HEADERS frame (Status 200) + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc"); + ctx.write(response); + headersSent = true; + } + } + + @Override + public void send(ByteBuffer payload, Consumer callback) { + sendHeadersIfNecessary(); + + // Wrap the NIO ByteBuffer in a Netty ByteBuf without copying + HttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(payload)); + + // Write and flush, then map Netty's Future to your single-lambda callback + ctx.writeAndFlush(chunk) + .addListener( + future -> { + if (future.isSuccess()) { + callback.accept(null); + } else { + callback.accept(future.cause()); + } + }); + } + + @Override + public void close(int statusCode, String description) { + if (headersSent) { + // Trailers-Appended: Send the final HTTP/2 HEADERS frame with END_STREAM flag + LastHttpContent lastContent = new DefaultLastHttpContent(); + lastContent.trailingHeaders().set("grpc-status", String.valueOf(statusCode)); + if (description != null) { + lastContent.trailingHeaders().set("grpc-message", description); + } + // writeAndFlush the LastHttpContent, then close the Netty stream channel + ctx.writeAndFlush(lastContent).addListener(ChannelFutureListener.CLOSE); + } else { + // Trailers-Only: No body was sent, so standard headers become the trailers + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc"); + response.headers().set("grpc-status", String.valueOf(statusCode)); + if (description != null) { + response.headers().set("grpc-message", description); + } + ctx.write(response); + + // Close out the stream with an empty DATA frame possessing the END_STREAM flag + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + .addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java new file mode 100644 index 0000000000..b7254e1a34 --- /dev/null +++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java @@ -0,0 +1,101 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.netty; + +import io.jooby.GrpcProcessor; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.*; +import io.netty.util.ReferenceCountUtil; + +public class NettyGrpcHandler extends ChannelInboundHandlerAdapter { + + private final GrpcProcessor processor; + private final boolean isHttp2; + + // State for the current stream + private boolean isGrpc = false; + private NettyGrpcInputBridge inputBridge; + + public NettyGrpcHandler(GrpcProcessor processor, boolean isHttp2) { + this.processor = processor; + this.isHttp2 = isHttp2; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + + // 1. Intercept the initial Request headers + if (msg instanceof HttpRequest req) { + var contentType = req.headers().get(HttpHeaderNames.CONTENT_TYPE); + var path = req.uri(); + int queryIndex = path.indexOf('?'); + path = queryIndex > 0 ? path.substring(0, queryIndex) : path; + + if (processor.isGrpcMethod(path) + && contentType != null + && contentType.startsWith("application/grpc")) { + isGrpc = true; + + if (!isHttp2) { + // gRPC requires HTTP/2. Reject HTTP/1.1 calls immediately. + var response = + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.UPGRADE_REQUIRED); + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE); + response.headers().set(HttpHeaderNames.UPGRADE, "h2c"); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + ReferenceCountUtil.release(msg); + return; + } + + // We will implement NettyGrpcExchange in the next step + var exchange = new NettyGrpcExchange(ctx, req); + var subscriber = processor.process(exchange); + + inputBridge = new NettyGrpcInputBridge(ctx, subscriber); + inputBridge.start(); + + ReferenceCountUtil.release(msg); // We consumed the headers + return; + } + } + + // 2. Intercept subsequent body chunks for this gRPC stream + if (isGrpc && msg instanceof HttpContent chunk) { + try { + if (inputBridge != null) { + inputBridge.onChunk(chunk); + } + } finally { + // Always release Netty's direct memory buffers + ReferenceCountUtil.release(chunk); + } + return; + } + + // Not a gRPC request. Pass down the pipeline to Jooby's NettyHandler + super.channelRead(ctx, msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (isGrpc && inputBridge != null) { + inputBridge.cancel(); // Client disconnected abruptly + } + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (isGrpc) { + ctx.close(); + } else { + super.exceptionCaught(ctx, cause); + } + } +} diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java new file mode 100644 index 0000000000..48457c2aff --- /dev/null +++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java @@ -0,0 +1,82 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.netty; + +import java.nio.ByteBuffer; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; + +public class NettyGrpcInputBridge implements Flow.Subscription { + + private final ChannelHandlerContext ctx; + private final Flow.Subscriber subscriber; + private final AtomicLong demand = new AtomicLong(); + + public NettyGrpcInputBridge(ChannelHandlerContext ctx, Flow.Subscriber subscriber) { + this.ctx = ctx; + this.subscriber = subscriber; + } + + public void start() { + // Disable auto-read. We will manually request reads based on gRPC demand. + ctx.channel().config().setAutoRead(false); + subscriber.onSubscribe(this); + } + + @Override + public void request(long n) { + if (n <= 0) { + subscriber.onError(new IllegalArgumentException("Demand must be positive")); + return; + } + + if (demand.getAndAdd(n) == 0) { + // We transitioned from 0 to n demand, trigger a read from the socket + ctx.read(); + } + } + + @Override + public void cancel() { + demand.set(0); + ctx.close(); // Abort the connection + } + + /** Called by the NettyGrpcHandler when a new chunk arrives from the network. */ + public void onChunk(HttpContent chunk) { + try { + ByteBuf content = chunk.content(); + if (content.isReadable()) { + // Convert Netty ByteBuf to standard Java ByteBuffer + ByteBuffer buffer = content.nioBuffer(); + + // Pass to the gRPC deframer + subscriber.onNext(buffer); + + long currentDemand = demand.decrementAndGet(); + if (currentDemand > 0) { + // Still have demand, ask Netty for the next chunk + ctx.read(); + } + } + + if (chunk instanceof LastHttpContent) { + subscriber.onComplete(); + } else if (demand.get() > 0 && !content.isReadable()) { + // Edge case: Empty chunk but not LastHttpContent, read next + ctx.read(); + } + } catch (Throwable t) { + subscriber.onError(t); + ctx.close(); + } + } +} diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java index a09667006b..d62c400d08 100644 --- a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java +++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java @@ -85,9 +85,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { // possibly body: long contentLength = contentLength(req); if (contentLength > 0 || isTransferEncodingChunked(req)) { - context.httpDataFactory = new DefaultHttpDataFactory(bufferSize); - context.httpDataFactory.setBaseDir(app.getTmpdir().toString()); - context.setDecoder(newDecoder(req, context.httpDataFactory, maxFormFields)); + if (req.getClass() == DefaultFullHttpRequest.class) { + // HTTP2 aggregates all into a full http request. + if (((DefaultFullHttpRequest) req).content().readableBytes() > maxRequestSize) { + router.match(context).execute(context, Route.REQUEST_ENTITY_TOO_LARGE); + return; + } + // full body is here move + router.match(context).execute(context); + } else { + context.httpDataFactory = new DefaultHttpDataFactory(bufferSize); + context.httpDataFactory.setBaseDir(app.getTmpdir().toString()); + context.setDecoder(newDecoder(req, context.httpDataFactory, maxFormFields)); + } } else { // no body, move on router.match(context).execute(context); diff --git a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java index c20a6bcea5..bfafa63fdb 100644 --- a/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java +++ b/modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java @@ -9,6 +9,7 @@ import java.util.concurrent.ScheduledExecutorService; import io.jooby.Context; +import io.jooby.GrpcProcessor; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; @@ -33,6 +34,7 @@ public class NettyPipeline extends ChannelInitializer { private final boolean expectContinue; private final Integer compressionLevel; private final NettyDateService dateService; + private final GrpcProcessor grpcProcessor; public NettyPipeline( SslContext sslContext, @@ -45,7 +47,8 @@ public NettyPipeline( boolean http2, boolean expectContinue, Integer compressionLevel, - NettyDateService dateService) { + NettyDateService dateService, + GrpcProcessor grpcProcessor) { this.sslContext = sslContext; this.decoderConfig = decoderConfig; this.contextSelector = contextSelector; @@ -57,6 +60,7 @@ public NettyPipeline( this.expectContinue = expectContinue; this.compressionLevel = compressionLevel; this.dateService = dateService; + this.grpcProcessor = grpcProcessor; } @Override @@ -77,6 +81,12 @@ public void initChannel(SocketChannel ch) { private void setupHttp11(ChannelPipeline p) { p.addLast("codec", createServerCodec()); addCommonHandlers(p); + + // Inject gRPC handler (isHttp2 = false to trigger 426 Upgrade Required) + if (grpcProcessor != null) { + p.addLast("grpc", new NettyGrpcHandler(grpcProcessor, false)); + } + p.addLast("handler", createHandler(p.channel().eventLoop())); } @@ -103,6 +113,12 @@ private void setupHttp11Upgrade(ChannelPipeline pipeline) { (int) maxRequestSize)); addCommonHandlers(pipeline); + + // Inject gRPC handler (isHttp2 = false to trigger 426 Upgrade Required) + if (grpcProcessor != null) { + pipeline.addLast("grpc", new NettyGrpcHandler(grpcProcessor, false)); + } + pipeline.addLast("handler", createHandler(pipeline.channel().eventLoop())); } @@ -196,6 +212,12 @@ private static class Http2StreamInitializer extends ChannelInitializer @Override protected void initChannel(Channel ch) { ch.pipeline().addLast("http2", new Http2StreamFrameToHttpObjectCodec(true)); + + // Inject gRPC handler (isHttp2 = true). This handles the actual multiplexed gRPC traffic. + if (pipeline.grpcProcessor != null) { + ch.pipeline().addLast("grpc", new NettyGrpcHandler(pipeline.grpcProcessor, true)); + } + ch.pipeline().addLast("handler", pipeline.createHandler(ch.eventLoop())); } } diff --git a/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java b/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java index 2f62da8fb2..f798a43c1d 100644 --- a/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java +++ b/modules/jooby-netty/src/main/java/io/jooby/netty/NettyServer.java @@ -155,9 +155,16 @@ public Server start(@NonNull Jooby... application) { var outputFactory = (NettyOutputFactory) getOutputFactory(); var allocator = outputFactory.getAllocator(); var http2 = options.isHttp2() == Boolean.TRUE; + + // Retrieve the GrpcProcessor from the application's service registry + GrpcProcessor grpcProcessor = + http2 ? applications.get(0).getServices().getOrNull(GrpcProcessor.class) : null; + /* Bootstrap: */ if (!options.isHttpsOnly()) { - var http = newBootstrap(allocator, transport, newPipeline(options, null, http2), eventLoop); + var http = + newBootstrap( + allocator, transport, newPipeline(options, null, http2, grpcProcessor), eventLoop); http.bind(options.getHost(), options.getPort()).get(); } @@ -170,7 +177,11 @@ public Server start(@NonNull Jooby... application) { var clientAuth = sslOptions.getClientAuth(); var sslContext = wrap(javaSslContext, toClientAuth(clientAuth), protocol, http2); var https = - newBootstrap(allocator, transport, newPipeline(options, sslContext, http2), eventLoop); + newBootstrap( + allocator, + transport, + newPipeline(options, sslContext, http2, grpcProcessor), + eventLoop); portInUse = options.getSecurePort(); https.bind(options.getHost(), portInUse).get(); } else if (options.isHttpsOnly()) { @@ -216,7 +227,8 @@ private ClientAuth toClientAuth(SslOptions.ClientAuth clientAuth) { }; } - private NettyPipeline newPipeline(ServerOptions options, SslContext sslContext, boolean http2) { + private NettyPipeline newPipeline( + ServerOptions options, SslContext sslContext, boolean http2, GrpcProcessor grpcProcessor) { var decoderConfig = new HttpDecoderConfig() .setMaxInitialLineLength(_4KB) @@ -235,7 +247,8 @@ private NettyPipeline newPipeline(ServerOptions options, SslContext sslContext, http2, options.isExpectContinue() == Boolean.TRUE, options.getCompressionLevel(), - dateLoop); + dateLoop, + grpcProcessor); } @Override diff --git a/modules/jooby-netty/src/main/java/module-info.java b/modules/jooby-netty/src/main/java/module-info.java index 5f9e7f16e4..0831e5136c 100644 --- a/modules/jooby-netty/src/main/java/module-info.java +++ b/modules/jooby-netty/src/main/java/module-info.java @@ -25,7 +25,6 @@ requires static io.netty.transport.classes.epoll; requires static io.netty.transport.classes.kqueue; requires static io.netty.transport.classes.io_uring; - requires java.desktop; provides Server with NettyServer; diff --git a/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java b/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java index 27edcd6b71..1538e8cad4 100644 --- a/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java +++ b/modules/jooby-openapi/src/main/java/io/jooby/internal/openapi/AnnotationParser.java @@ -130,7 +130,8 @@ public Optional getJakartaDefaultValue(List annotations) Stream.of(annotations()).filter(it -> it.getName().startsWith("jakarta.ws.rs")).toList(); for (var a : annotations) { if (a.values != null) { - var matches = names.stream().anyMatch(it -> "Ljakarta/ws/rs/DefaultValue;".equals(a.desc)); + var matches = + names.stream().anyMatch(it -> "Ljakarta/ws/rs/DefaultValue;".equals(a.desc)); if (matches) { return AnnotationUtils.findAnnotationValue(a, "value").map(Objects::toString); } diff --git a/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java b/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java index 300ae3631c..f0fdf53f30 100644 --- a/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java +++ b/modules/jooby-openapi/src/test/java/issues/i3835/App3835Jakarta.java @@ -5,10 +5,10 @@ */ package issues.i3835; -import io.jooby.Jooby; - import static io.jooby.openapi.MvcExtensionGenerator.toMvcExtension; +import io.jooby.Jooby; + public class App3835Jakarta extends Jooby { { mvc(toMvcExtension(C3835Jakarta.class)); diff --git a/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java b/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java index 1734cef8bc..e2bc17c744 100644 --- a/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java +++ b/modules/jooby-openapi/src/test/java/issues/i3835/C3835Jakarta.java @@ -5,14 +5,14 @@ */ package issues.i3835; +import java.util.List; +import java.util.Map; + import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.QueryParam; -import java.util.List; -import java.util.Map; - @Path("/3835") public class C3835Jakarta { diff --git a/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcExchange.java b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcExchange.java new file mode 100644 index 0000000000..139061f54d --- /dev/null +++ b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcExchange.java @@ -0,0 +1,173 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.undertow; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import org.xnio.channels.StreamSinkChannel; + +import io.jooby.GrpcExchange; +import io.undertow.server.HttpServerExchange; +import io.undertow.server.protocol.http.HttpAttachments; +import io.undertow.util.HeaderMap; +import io.undertow.util.HeaderValues; +import io.undertow.util.Headers; +import io.undertow.util.HttpString; + +public class UndertowGrpcExchange implements GrpcExchange { + + private final HttpServerExchange exchange; + private boolean headersSent = false; + private StreamSinkChannel responseChannel; + + public UndertowGrpcExchange(HttpServerExchange exchange) { + this.exchange = exchange; + } + + @Override + public String getRequestPath() { + return exchange.getRequestPath(); + } + + @Override + public String getHeader(String name) { + return exchange.getRequestHeaders().getFirst(name); + } + + @Override + public Map getHeaders() { + Map map = new HashMap<>(); + for (HeaderValues values : exchange.getRequestHeaders()) { + map.put(values.getHeaderName().toString(), values.getFirst()); + } + return map; + } + + @Override + public void send(ByteBuffer payload, Consumer callback) { + if (!headersSent) { + exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/grpc"); + this.responseChannel = exchange.getResponseChannel(); + headersSent = true; + } + + // Write and immediately flush to prevent bidirectional deadlocks + doWriteAndFlush(payload, callback); + } + + private void doWriteAndFlush(ByteBuffer payload, Consumer callback) { + try { + int res = responseChannel.write(payload); + + if (payload.hasRemaining()) { + // Wait for socket to become writable + responseChannel + .getWriteSetter() + .set( + ch -> { + try { + ch.write(payload); + if (!payload.hasRemaining()) { + ch.suspendWrites(); + doFlush(callback); // Proceed to flush + } + } catch (IOException e) { + ch.suspendWrites(); + callback.accept(e); + } + }); + responseChannel.resumeWrites(); + } else { + // Written fully, proceed to flush immediately + doFlush(callback); + } + } catch (IOException e) { + callback.accept(e); + } + } + + private void doFlush(Consumer callback) { + try { + if (responseChannel.flush()) { + callback.accept(null); // Fully flushed to network + } else { + // Wait for socket to become flushable + responseChannel + .getWriteSetter() + .set( + ch -> { + try { + if (ch.flush()) { + ch.suspendWrites(); + callback.accept(null); + } + } catch (IOException e) { + ch.suspendWrites(); + callback.accept(e); + } + }); + responseChannel.resumeWrites(); + } + } catch (IOException e) { + callback.accept(e); + } + } + + @Override + public void close(int statusCode, String description) { + if (headersSent) { + exchange.putAttachment( + HttpAttachments.RESPONSE_TRAILER_SUPPLIER, + () -> { + HeaderMap trailers = new HeaderMap(); + trailers.put(HttpString.tryFromString("grpc-status"), String.valueOf(statusCode)); + if (description != null) { + trailers.put(HttpString.tryFromString("grpc-message"), description); + } + return trailers; + }); + + try { + responseChannel.shutdownWrites(); + if (!responseChannel.flush()) { + responseChannel + .getWriteSetter() + .set( + ch -> { + try { + if (ch.flush()) { + ch.suspendWrites(); + exchange.endExchange(); + } + } catch (IOException ignored) { + ch.suspendWrites(); + exchange.endExchange(); + } + }); + responseChannel.resumeWrites(); + } else { + exchange.endExchange(); + } + } catch (IOException e) { + exchange.endExchange(); + } + + } else { + exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/grpc"); + exchange + .getResponseHeaders() + .put(HttpString.tryFromString("grpc-status"), String.valueOf(statusCode)); + if (description != null) { + exchange.getResponseHeaders().put(HttpString.tryFromString("grpc-message"), description); + } + exchange.endExchange(); + } + } +} diff --git a/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcHandler.java b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcHandler.java new file mode 100644 index 0000000000..36643bcd08 --- /dev/null +++ b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcHandler.java @@ -0,0 +1,54 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.undertow; + +import io.jooby.GrpcProcessor; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.Headers; +import io.undertow.util.Protocols; + +public class UndertowGrpcHandler implements HttpHandler { + + private final HttpHandler next; + private final GrpcProcessor processor; + + public UndertowGrpcHandler(HttpHandler next, GrpcProcessor processor) { + this.next = next; + this.processor = processor; + } + + @Override + public void handleRequest(HttpServerExchange exchange) throws Exception { + var contentType = exchange.getRequestHeaders().getFirst(Headers.CONTENT_TYPE); + + if (processor.isGrpcMethod(exchange.getRequestPath()) + && contentType != null + && contentType.startsWith("application/grpc")) { + + // gRPC strictly requires HTTP/2 + if (!exchange.getProtocol().equals(Protocols.HTTP_2_0)) { + exchange.setStatusCode(426); // Upgrade Required + exchange.getResponseHeaders().put(Headers.CONNECTION, "Upgrade"); + exchange.getResponseHeaders().put(Headers.UPGRADE, "h2c"); + exchange.endExchange(); + return; + } + + var grpcExchange = new UndertowGrpcExchange(exchange); + var subscriber = processor.process(grpcExchange); + + // Starts the reactive pipeline and acquires the XNIO channel + var inputBridge = new UndertowGrpcInputBridge(exchange, subscriber); + inputBridge.start(); + + return; // Fully handled, do not pass to the standard router + } + + // Not a gRPC request, delegate to the next handler in the chain + next.handleRequest(exchange); + } +} diff --git a/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcInputBridge.java b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcInputBridge.java new file mode 100644 index 0000000000..84d05a0293 --- /dev/null +++ b/modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowGrpcInputBridge.java @@ -0,0 +1,96 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.internal.undertow; + +import java.nio.ByteBuffer; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; + +import org.xnio.ChannelListener; +import org.xnio.IoUtils; +import org.xnio.channels.StreamSourceChannel; + +import io.undertow.server.HttpServerExchange; + +public class UndertowGrpcInputBridge + implements Flow.Subscription, ChannelListener { + + private final HttpServerExchange exchange; + private final Flow.Subscriber subscriber; + private final AtomicLong demand = new AtomicLong(); + private StreamSourceChannel channel; + + private final ByteBuffer buffer = ByteBuffer.allocate(8192); + + public UndertowGrpcInputBridge( + HttpServerExchange exchange, Flow.Subscriber subscriber) { + this.exchange = exchange; + this.subscriber = subscriber; + } + + public void start() { + this.channel = exchange.getRequestChannel(); + this.channel.getReadSetter().set(this); + subscriber.onSubscribe(this); + } + + @Override + public void request(long n) { + if (n <= 0) { + subscriber.onError(new IllegalArgumentException("Demand must be positive")); + return; + } + + if (demand.getAndAdd(n) == 0 && channel != null) { + // CRITICAL FIX: wakeupReads() forces the listener to fire immediately, + // draining any data that arrived in the same packet as the headers. + channel.wakeupReads(); + } + } + + @Override + public void cancel() { + demand.set(0); + IoUtils.safeClose(channel); + exchange.endExchange(); + } + + @Override + public void handleEvent(StreamSourceChannel channel) { + try { + while (demand.get() > 0) { + buffer.clear(); + int res = channel.read(buffer); + + if (res == -1) { + channel.suspendReads(); + subscriber.onComplete(); + return; + } else if (res == 0) { + // Buffer drained, waiting for more data from the network + return; + } + + buffer.flip(); + ByteBuffer chunk = ByteBuffer.allocate(buffer.remaining()); + chunk.put(buffer); + chunk.flip(); + + subscriber.onNext(chunk); + demand.decrementAndGet(); + } + + if (demand.get() == 0) { + channel.suspendReads(); + } + + } catch (Throwable t) { + subscriber.onError(t); + IoUtils.safeClose(channel); + exchange.endExchange(); + } + } +} diff --git a/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java b/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java index f335a220ed..c918a4acb9 100644 --- a/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java +++ b/modules/jooby-undertow/src/main/java/io/jooby/undertow/UndertowServer.java @@ -18,6 +18,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.jooby.*; import io.jooby.exception.StartupException; +import io.jooby.internal.undertow.UndertowGrpcHandler; import io.jooby.internal.undertow.UndertowHandler; import io.jooby.internal.undertow.UndertowWebSocket; import io.jooby.output.OutputFactory; @@ -101,6 +102,13 @@ public Server start(@NonNull Jooby... application) { options.getMaxRequestSize(), options.getDefaultHeaders()); + GrpcProcessor grpcProcessor = + applications.get(0).getServices().getOrNull(GrpcProcessor.class); + + if (grpcProcessor != null) { + handler = new UndertowGrpcHandler(handler, grpcProcessor); + } + if (options.getCompressionLevel() != null) { int compressionLevel = options.getCompressionLevel(); handler = @@ -154,7 +162,7 @@ public Server start(@NonNull Jooby... application) { builder.addHttpListener(options.getPort(), options.getHost()); } - // HTTP @ + // HTTP/2 builder.setServerOption(ENABLE_HTTP2, options.isHttp2() == Boolean.TRUE); var classLoader = this.applications.get(0).getClassLoader(); SSLContext sslContext = options.getSSLContext(classLoader); diff --git a/modules/pom.xml b/modules/pom.xml index be324dbec9..f7c20aa517 100644 --- a/modules/pom.xml +++ b/modules/pom.xml @@ -66,6 +66,8 @@ jooby-thymeleaf jooby-camel + jooby-grpc + jooby-avaje-validator jooby-hibernate-validator diff --git a/pom.xml b/pom.xml index 3ad1677b2d..c577729768 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ 7.0.0 + 1.78.0 1.5.32 diff --git a/tests/pom.xml b/tests/pom.xml index beb8c743e1..ed5a9833e0 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -116,6 +116,11 @@ jooby-guice ${jooby.version} + + io.jooby + jooby-grpc + ${jooby.version} + io.jooby jooby-pac4j @@ -174,6 +179,30 @@ kotlin-reflect + + io.grpc + grpc-services + ${grpc.version} + + + + io.grpc + grpc-servlet + ${grpc.version} + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + + io.grpc + grpc-okhttp + ${grpc.version} + + org.slf4j jcl-over-slf4j @@ -296,7 +325,33 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + org.jetbrains.kotlin kotlin-maven-plugin diff --git a/tests/src/main/proto/chat.proto b/tests/src/main/proto/chat.proto new file mode 100644 index 0000000000..2c1e7c36d5 --- /dev/null +++ b/tests/src/main/proto/chat.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package io.jooby.i3875; + +option java_package = "io.jooby.i3875"; +option java_multiple_files = true; + +service ChatService { + // BiDi: Client sends a stream of messages, + // Server responds to each one individually. + rpc ChatStream (stream ChatMessage) returns (stream ChatMessage); +} + +message ChatMessage { + string user = 1; + string text = 2; +} diff --git a/tests/src/main/proto/hello.proto b/tests/src/main/proto/hello.proto new file mode 100644 index 0000000000..80f776555b --- /dev/null +++ b/tests/src/main/proto/hello.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package io.jooby.i3875; + +option java_package = "io.jooby.i3875"; +option java_multiple_files = true; + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} diff --git a/tests/src/test/java/io/jooby/i3875/EchoChatService.java b/tests/src/test/java/io/jooby/i3875/EchoChatService.java new file mode 100644 index 0000000000..676bf7baba --- /dev/null +++ b/tests/src/test/java/io/jooby/i3875/EchoChatService.java @@ -0,0 +1,37 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.i3875; + +import io.grpc.stub.StreamObserver; + +public class EchoChatService extends ChatServiceGrpc.ChatServiceImplBase { + + @Override + public StreamObserver chatStream(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(ChatMessage request) { + ChatMessage response = + ChatMessage.newBuilder() + .setUser("Server") + .setText("Echo: " + request.getText()) + .build(); + + responseObserver.onNext(response); + } + + @Override + public void onError(Throwable t) { + System.err.println("Stream error: " + t.getMessage()); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } +} diff --git a/tests/src/test/java/io/jooby/i3875/EchoGreeterService.java b/tests/src/test/java/io/jooby/i3875/EchoGreeterService.java new file mode 100644 index 0000000000..c76922aee3 --- /dev/null +++ b/tests/src/test/java/io/jooby/i3875/EchoGreeterService.java @@ -0,0 +1,25 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.i3875; + +import io.grpc.stub.StreamObserver; + +public class EchoGreeterService extends GreeterGrpc.GreeterImplBase { + + private final EchoService echoService; + + @jakarta.inject.Inject + public EchoGreeterService(EchoService echoService) { + this.echoService = echoService; + } + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder().setMessage(echoService.echo(req.getName())).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } +} diff --git a/tests/src/test/java/io/jooby/i3875/EchoService.java b/tests/src/test/java/io/jooby/i3875/EchoService.java new file mode 100644 index 0000000000..25d44d594e --- /dev/null +++ b/tests/src/test/java/io/jooby/i3875/EchoService.java @@ -0,0 +1,13 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.i3875; + +public class EchoService { + + public String echo(String value) { + return "Hello " + value; + } +} diff --git a/tests/src/test/java/io/jooby/i3875/GrpcTest.java b/tests/src/test/java/io/jooby/i3875/GrpcTest.java new file mode 100644 index 0000000000..fd2a8a6de1 --- /dev/null +++ b/tests/src/test/java/io/jooby/i3875/GrpcTest.java @@ -0,0 +1,380 @@ +/* + * Jooby https://jooby.io + * Apache License Version 2.0 https://jooby.io/LICENSE.txt + * Copyright 2014 Edgar Espina + */ +package io.jooby.i3875; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.services.ProtoReflectionServiceV1; +import io.grpc.reflection.v1.ServerReflectionGrpc; +import io.grpc.reflection.v1.ServerReflectionRequest; +import io.grpc.reflection.v1.ServerReflectionResponse; +import io.grpc.stub.StreamObserver; +import io.jooby.Jooby; +import io.jooby.ServerOptions; +import io.jooby.grpc.GrpcModule; +import io.jooby.guice.GuiceModule; +import io.jooby.junit.ServerTest; +import io.jooby.junit.ServerTestRunner; + +public class GrpcTest { + + private void setupApp(Jooby app) { + app.install(new GuiceModule()); + + app.install( + new GrpcModule(new EchoChatService(), ProtoReflectionServiceV1.newInstance()) + .bind(EchoGreeterService.class)); + } + + @ServerTest + void shouldHandleUnaryRequests(ServerTestRunner runner) { + runner + .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .define(this::setupApp) + .ready( + http -> { + var channel = + ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort()) + .usePlaintext() + .build(); + + try { + var stub = GreeterGrpc.newBlockingStub(channel); + var response = + stub.sayHello(HelloRequest.newBuilder().setName("Pablo Marmol").build()); + + assertThat(response.getMessage()).isEqualTo("Hello Pablo Marmol"); + } finally { + channel.shutdown(); + } + }); + } + + @ServerTest + void shouldHandleDeadlineExceeded(ServerTestRunner runner) { + runner + .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .define(this::setupApp) + .ready( + http -> { + var channel = + ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort()) + .usePlaintext() + .build(); + + try { + // Attach an impossibly short deadline (1 millisecond) to the stub + var stub = + GreeterGrpc.newBlockingStub(channel) + .withDeadlineAfter(1, TimeUnit.MILLISECONDS); + + var exception = + org.junit.jupiter.api.Assertions.assertThrows( + StatusRuntimeException.class, + () -> + stub.sayHello( + HelloRequest.newBuilder().setName("Pablo Marmol").build())); + + // Assert that the bridge correctly caught the timeout and returned Status 4 + assertThat(exception.getStatus().getCode()) + .isEqualTo(Status.Code.DEADLINE_EXCEEDED); + } finally { + channel.shutdown(); + } + }); + } + + @ServerTest + void shouldHandleBidiStreaming(ServerTestRunner runner) { + runner + .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .define(this::setupApp) + .ready( + http -> { + var channel = + ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort()) + .usePlaintext() + .build(); + + try { + var asyncStub = ChatServiceGrpc.newStub(channel); + var responses = new CopyOnWriteArrayList(); + var latch = new CountDownLatch(1); + + StreamObserver responseObserver = + new StreamObserver<>() { + @Override + public void onNext(ChatMessage value) { + responses.add(value.getText()); + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }; + + var requestObserver = asyncStub.chatStream(responseObserver); + + requestObserver.onNext( + ChatMessage.newBuilder().setUser("JavaClient").setText("Ping 1").build()); + + // Add a tiny delay to prevent CI thread-scheduler race conditions + // where the server closes the stream before Undertow finishes flushing Ping 2. + Thread.sleep(50); + + requestObserver.onNext( + ChatMessage.newBuilder().setUser("JavaClient").setText("Ping 2").build()); + + // Allow Ping 2 to reach the server before sending the close signal + Thread.sleep(50); + + requestObserver.onCompleted(); + + // Wait for the server stream to gracefully complete + boolean completed = latch.await(5, TimeUnit.SECONDS); + + assertThat(completed).isTrue(); + assertThat(responses).containsExactly("Echo: Ping 1", "Echo: Ping 2"); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + channel.shutdown(); + } + }); + } + + @ServerTest + void shouldHandleServerStreaming(ServerTestRunner runner) { + runner + .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .define(this::setupApp) + .ready( + http -> { + var channel = + ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort()) + .usePlaintext() + .build(); + + try { + var asyncStub = ChatServiceGrpc.newStub(channel); + var responses = new CopyOnWriteArrayList(); + var latch = new CountDownLatch(1); + + StreamObserver responseObserver = + new StreamObserver<>() { + @Override + public void onNext(ChatMessage value) { + responses.add(value.getText()); + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }; + + // Assuming a server-streaming method exists: rpc ServerStream(ChatMessage) returns + // (stream ChatMessage) + // asyncStub.serverStream(ChatMessage.newBuilder().setText("Trigger").build(), + // responseObserver); + // + // boolean completed = latch.await(5, TimeUnit.SECONDS); + // assertThat(completed).isTrue(); + // assertThat(responses.size()).isGreaterThan(1); + + } finally { + channel.shutdown(); + } + }); + } + + @ServerTest + void shouldHandleReflection(ServerTestRunner runner) { + runner + .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .define(this::setupApp) + .ready( + http -> { + var channel = + ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort()) + .usePlaintext() + .build(); + + try { + var stub = ServerReflectionGrpc.newStub(channel); + var registeredServices = new CopyOnWriteArrayList(); + var latch = new CountDownLatch(1); + + StreamObserver responseObserver = + new StreamObserver<>() { + @Override + public void onNext(ServerReflectionResponse response) { + response + .getListServicesResponse() + .getServiceList() + .forEach(s -> registeredServices.add(s.getName())); + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }; + + var requestObserver = stub.serverReflectionInfo(responseObserver); + + requestObserver.onNext( + ServerReflectionRequest.newBuilder() + .setListServices("") + .setHost("localhost") + .build()); + requestObserver.onCompleted(); + + boolean completed = latch.await(5, TimeUnit.SECONDS); + + assertThat(completed).isTrue(); + assertThat(registeredServices) + .contains( + "io.jooby.i3875.Greeter", + "io.jooby.i3875.ChatService", + "grpc.reflection.v1.ServerReflection"); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + channel.shutdown(); + } + }); + } + + @ServerTest + void shouldHandleGrpcurlReflection(ServerTestRunner runner) { + org.junit.jupiter.api.Assumptions.assumeTrue( + isGrpcurlInstalled(), "grpcurl is not installed. Skipping strict HTTP/2 compliance test."); + + runner + .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .define(this::setupApp) + .ready( + http -> { + try { + var pb = + new ProcessBuilder( + "grpcurl", "-plaintext", "localhost:" + runner.getAllocatedPort(), "list"); + + var process = pb.start(); + var output = + new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8); + var error = + new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8); + + int exitCode = process.waitFor(); + + assertThat(exitCode) + .withFailMessage("grpcurl failed with error: " + error) + .isEqualTo(0); + + assertThat(output) + .contains( + "io.jooby.i3875.Greeter", + "io.jooby.i3875.ChatService", + "grpc.reflection.v1.ServerReflection"); + + } catch (Exception e) { + throw new RuntimeException("Failed to execute grpcurl test", e); + } + }); + } + + /** + * When a gRPC client requests a method that doesn't exist, our native handlers will ignore it. + * Jooby's core router will then catch it and return a standard HTTP 404 Not Found. According to + * the gRPC-over-HTTP/2 specification, the gRPC client will automatically translate a pure HTTP + * 404 into a gRPC UNIMPLEMENTED (Status Code 12) exception. + * + * @param runner Sever runner. + */ + @ServerTest + void shouldHandleMethodNotFound(ServerTestRunner runner) { + runner + .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .define(this::setupApp) + .ready( + http -> { + var channel = + ManagedChannelBuilder.forAddress("localhost", runner.getAllocatedPort()) + .usePlaintext() + .build(); + + try { + // 1. Create a fake method descriptor for a non-existent method + var unknownMethod = + io.grpc.MethodDescriptor.newBuilder() + .setType(io.grpc.MethodDescriptor.MethodType.UNARY) + .setFullMethodName("io.jooby.i3875.Greeter/UnknownMethod") + .setRequestMarshaller( + io.grpc.protobuf.ProtoUtils.marshaller( + HelloRequest.getDefaultInstance())) + .setResponseMarshaller( + io.grpc.protobuf.ProtoUtils.marshaller(HelloReply.getDefaultInstance())) + .build(); + + // 2. Execute the call manually and expect an exception + var exception = + org.junit.jupiter.api.Assertions.assertThrows( + io.grpc.StatusRuntimeException.class, + () -> + io.grpc.stub.ClientCalls.blockingUnaryCall( + channel, + unknownMethod, + io.grpc.CallOptions.DEFAULT, + HelloRequest.newBuilder().setName("Pablo Marmol").build())); + + // 3. Assert that Jooby's HTTP 404 is correctly translated by the gRPC client into + // UNIMPLEMENTED + assertThat(exception.getStatus().getCode()) + .isEqualTo(io.grpc.Status.Code.UNIMPLEMENTED); + + } finally { + channel.shutdown(); + } + }); + } + + private boolean isGrpcurlInstalled() { + try { + var process = new ProcessBuilder("grpcurl", "-version").start(); + return process.waitFor() == 0; + } catch (Exception e) { + return false; + } + } +} diff --git a/tests/src/test/java/io/jooby/test/Http2Test.java b/tests/src/test/java/io/jooby/test/Http2Test.java index f0405aeda1..89796b2e35 100644 --- a/tests/src/test/java/io/jooby/test/Http2Test.java +++ b/tests/src/test/java/io/jooby/test/Http2Test.java @@ -42,7 +42,7 @@ public class Http2Test { @ServerTest public void h2body(ServerTestRunner runner) { runner - .options(new ServerOptions().setHttp2(true).setSecurePort(8443)) + .options(new ServerOptions().setHttp2(true)) .define( app -> { app.install(new JacksonModule());