From 98772768d9db29e9264d2f0c82ea53853eea0d78 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:29:05 -0700 Subject: [PATCH 1/5] Migrate entry point from #[fastly::main] to undecorated main() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace #[fastly::main] with an undecorated main() that calls Request::from_client() and explicitly sends responses via send_to_client(). This is required for Phase 2's stream_to_client() support — #[fastly::main] auto-calls send_to_client() on the returned Response, which is incompatible with streaming. The program still compiles to wasm32-wasip1 and runs on Fastly Compute — #[fastly::main] was just syntactic sugar. Also simplifies route_request to return Response directly instead of Result, since it already converts all errors to HTTP responses internally. --- .../trusted-server-adapter-fastly/src/main.rs | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index d97c8402..38c74cb0 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -1,6 +1,6 @@ use error_stack::Report; use fastly::http::Method; -use fastly::{Error, Request, Response}; +use fastly::{Request, Response}; use log_fastly::Logger; use trusted_server_core::auction::endpoints::handle_auction; @@ -29,21 +29,33 @@ use trusted_server_core::settings_data::get_settings; mod error; use crate::error::to_error_response; -#[fastly::main] -fn main(req: Request) -> Result { +/// Entry point for the Fastly Compute program. +/// +/// Uses an undecorated `main()` with `Request::from_client()` instead of +/// `#[fastly::main]` so we can call `stream_to_client()` or `send_to_client()` +/// explicitly. `#[fastly::main]` is syntactic sugar that auto-calls +/// `send_to_client()` on the returned `Response`, which is incompatible with +/// streaming. +fn main() { init_logger(); + let req = Request::from_client(); + // Keep the health probe independent from settings loading and routing so // readiness checks still get a cheap liveness response during startup. if req.get_method() == Method::GET && req.get_path() == "/health" { - return Ok(Response::from_status(200).with_body_text_plain("ok")); + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; } let settings = match get_settings() { Ok(s) => s, Err(e) => { log::error!("Failed to load settings: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; log::debug!("Settings {settings:?}"); @@ -55,16 +67,19 @@ fn main(req: Request) -> Result { Ok(r) => r, Err(e) => { log::error!("Failed to create integration registry: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; - futures::executor::block_on(route_request( + let response = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, req, - )) + )); + + response.send_to_client(); } async fn route_request( @@ -72,7 +87,7 @@ async fn route_request( orchestrator: &AuctionOrchestrator, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result { +) -> Response { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -83,7 +98,7 @@ async fn route_request( if let Some(mut response) = enforce_basic_auth(settings, &req) { finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return response; } // Get path and method for routing @@ -153,7 +168,7 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - Ok(response) + response } /// Applies all standard response headers: geo, version, staging, and configured headers. From d59f9bccf75dc4c39f757004d0f6cb973c4956af Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:30:19 -0700 Subject: [PATCH 2/5] Refactor process_response_streaming to accept W: Write Change signature from returning Body (with internal Vec) to writing into a generic &mut W: Write parameter. This enables Task 8 to pass StreamingBody directly as the output sink. The call site in handle_publisher_request passes &mut Vec for now, preserving the buffered behavior until the streaming path is wired up. --- crates/trusted-server-core/src/publisher.rs | 92 ++++++--------------- 1 file changed, 26 insertions(+), 66 deletions(-) diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index a2f54441..6a010c5f 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -93,12 +93,21 @@ struct ProcessResponseParams<'a> { integration_registry: &'a IntegrationRegistry, } -/// Process response body in streaming fashion with compression preservation -fn process_response_streaming( +/// Process response body through the streaming pipeline. +/// +/// Selects the appropriate processor based on content type (HTML rewriter, +/// RSC Flight rewriter, or URL replacer) and pipes chunks from `body` +/// through it into `output`. The caller decides what `output` is — a +/// `Vec` for buffered responses, or a `StreamingBody` for streaming. +/// +/// # Errors +/// +/// Returns an error if processor creation or chunk processing fails. +fn process_response_streaming( body: Body, + output: &mut W, params: &ProcessResponseParams, -) -> Result> { - // Check if this is HTML content +) -> Result<(), Report> { let is_html = params.content_type.contains("text/html"); let is_rsc_flight = params.content_type.contains("text/x-component"); log::debug!( @@ -110,15 +119,14 @@ fn process_response_streaming( params.origin_host ); - // Determine compression type let compression = Compression::from_content_encoding(params.content_encoding); + let config = PipelineConfig { + input_compression: compression, + output_compression: compression, + chunk_size: 8192, + }; - // Create output body to collect results - let mut output = Vec::new(); - - // Choose processor based on content type if is_html { - // Use HTML rewriter for HTML content let processor = create_html_stream_processor( params.origin_host, params.request_host, @@ -126,57 +134,26 @@ fn process_response_streaming( params.settings, params.integration_registry, )?; - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else if is_rsc_flight { - // RSC Flight responses are length-prefixed (T rows). A naive string replacement will - // corrupt the stream by changing byte lengths without updating the prefixes. let processor = RscFlightUrlRewriter::new( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else { - // Use simple text replacer for non-HTML content let replacer = create_url_replacer( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, replacer); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, replacer).process(body, output)?; } - log::debug!( - "Streaming processing complete - output size: {} bytes", - output.len() - ); - Ok(Body::from(output)) + Ok(()) } /// Create a unified HTML stream processor @@ -335,28 +312,11 @@ pub fn handle_publisher_request( content_type: &content_type, integration_registry, }; - match process_response_streaming(body, ¶ms) { - Ok(processed_body) => { - // Set the processed body back - response.set_body(processed_body); + let mut output = Vec::new(); + process_response_streaming(body, &mut output, ¶ms)?; - // Remove Content-Length as the size has likely changed - response.remove_header(header::CONTENT_LENGTH); - - // Keep Content-Encoding header since we're returning compressed content - log::debug!( - "Preserved Content-Encoding: {} for compressed response", - content_encoding - ); - - log::debug!("Completed streaming processing of response body"); - } - Err(e) => { - log::error!("Failed to process response body: {:?}", e); - // Return an error response - return Err(e); - } - } + response.set_body(Body::from(output)); + response.remove_header(header::CONTENT_LENGTH); } else { log::debug!( "Skipping response processing - should_process: {}, request_host: '{}'", From 986f92dd75b4b471d5d5e06a18cac4210837867b Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:35:32 -0700 Subject: [PATCH 3/5] Add streaming path to publisher proxy via StreamingBody MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split handle_publisher_request into streaming and buffered paths based on the streaming gate: - Streaming: 2xx + processable content + no HTML post-processors - Buffered: post-processors registered (Next.js) or non-processable Streaming path returns PublisherResponse::Stream with the origin body and processing params. The adapter calls finalize_response() to set all headers, then stream_to_client() to commit them, and pipes the body through stream_publisher_body() into StreamingBody. Synthetic ID/cookie headers are set before body processing (they are body-independent), so they are included in the streamed headers. Mid-stream errors log and drop the StreamingBody — client sees a truncated response, standard proxy behavior. --- .../trusted-server-adapter-fastly/src/main.rs | 49 ++++- crates/trusted-server-core/src/publisher.rs | 190 +++++++++++++----- 2 files changed, 184 insertions(+), 55 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 38c74cb0..4e4e62f1 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -18,7 +18,9 @@ use trusted_server_core::proxy::{ handle_first_party_click, handle_first_party_proxy, handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, }; -use trusted_server_core::publisher::{handle_publisher_request, handle_tsjs_dynamic}; +use trusted_server_core::publisher::{ + handle_publisher_request, handle_tsjs_dynamic, stream_publisher_body, PublisherResponse, +}; use trusted_server_core::request_signing::{ handle_deactivate_key, handle_rotate_key, handle_trusted_server_discovery, handle_verify_signature, @@ -72,14 +74,16 @@ fn main() { } }; - let response = futures::executor::block_on(route_request( + // route_request may send the response directly (streaming path) or + // return it for us to send (buffered path). + if let Some(response) = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, req, - )); - - response.send_to_client(); + )) { + response.send_to_client(); + } } async fn route_request( @@ -87,7 +91,7 @@ async fn route_request( orchestrator: &AuctionOrchestrator, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Response { +) -> Option { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -98,7 +102,7 @@ async fn route_request( if let Some(mut response) = enforce_basic_auth(settings, &req) { finalize_response(settings, geo_info.as_ref(), &mut response); - return response; + return Some(response); } // Get path and method for routing @@ -154,7 +158,34 @@ async fn route_request( ); match handle_publisher_request(settings, integration_registry, req) { - Ok(response) => Ok(response), + Ok(PublisherResponse::Stream { + mut response, + body, + params, + }) => { + // Streaming path: finalize headers, then stream body to client. + finalize_response(settings, geo_info.as_ref(), &mut response); + let mut streaming_body = response.stream_to_client(); + if let Err(e) = stream_publisher_body( + body, + &mut streaming_body, + ¶ms, + settings, + integration_registry, + ) { + // Headers already sent (200 OK). Log and abort — client + // sees a truncated response. Standard proxy behavior. + log::error!("Streaming processing failed: {e:?}"); + drop(streaming_body); + } else { + streaming_body + .finish() + .expect("should finish streaming body"); + } + // Response already sent via stream_to_client() + return None; + } + Ok(PublisherResponse::Buffered(response)) => Ok(response), Err(e) => { log::error!("Failed to proxy to publisher origin: {:?}", e); Err(e) @@ -168,7 +199,7 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - response + Some(response) } /// Applies all standard response headers: geo, version, staging, and configured headers. diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 6a010c5f..efd65fa1 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -1,3 +1,5 @@ +use std::io::Write; + use error_stack::{Report, ResultExt}; use fastly::http::{header, StatusCode}; use fastly::{Body, Request, Response}; @@ -177,28 +179,87 @@ fn create_html_stream_processor( Ok(create_html_processor(config)) } +/// Result of publisher request handling, indicating whether the response +/// body should be streamed or has already been buffered. +pub enum PublisherResponse { + /// Response is fully buffered and ready to send via `send_to_client()`. + Buffered(Response), + /// Response headers are ready. The caller must: + /// 1. Call `finalize_response()` on the response + /// 2. Call `response.stream_to_client()` to get a `StreamingBody` + /// 3. Call `stream_publisher_body()` with the body and streaming writer + /// 4. Call `StreamingBody::finish()` + Stream { + /// Response with all headers set (synthetic ID, cookies, etc.) + /// but body not yet written. `Content-Length` already removed. + response: Response, + /// Origin body to be piped through the streaming pipeline. + body: Body, + /// Parameters for `process_response_streaming`. + params: OwnedProcessResponseParams, + }, +} + +/// Owned version of [`ProcessResponseParams`] for returning from +/// `handle_publisher_request` without lifetime issues. +pub struct OwnedProcessResponseParams { + pub content_encoding: String, + pub origin_host: String, + pub origin_url: String, + pub request_host: String, + pub request_scheme: String, + pub content_type: String, +} + +/// Stream the publisher response body through the processing pipeline. +/// +/// Called by the adapter after `stream_to_client()` has committed the +/// response headers. Writes processed chunks directly to `output`. +/// +/// # Errors +/// +/// Returns an error if processing fails mid-stream. Since headers are +/// already committed, the caller should log the error and drop the +/// `StreamingBody` (client sees a truncated response). +pub fn stream_publisher_body( + body: Body, + output: &mut W, + params: &OwnedProcessResponseParams, + settings: &Settings, + integration_registry: &IntegrationRegistry, +) -> Result<(), Report> { + let borrowed = ProcessResponseParams { + content_encoding: ¶ms.content_encoding, + origin_host: ¶ms.origin_host, + origin_url: ¶ms.origin_url, + request_host: ¶ms.request_host, + request_scheme: ¶ms.request_scheme, + settings, + content_type: ¶ms.content_type, + integration_registry, + }; + process_response_streaming(body, output, &borrowed) +} + /// Proxies requests to the publisher's origin server. /// -/// This function forwards incoming requests to the configured origin URL, -/// preserving headers and request body. It's used as a fallback for routes -/// not explicitly handled by the trusted server. +/// Returns a [`PublisherResponse`] indicating whether the response can be +/// streamed or must be sent buffered. The streaming path is chosen when: +/// - The backend returns a 2xx status +/// - The response has a processable content type +/// - No HTML post-processors are registered (the streaming gate) /// /// # Errors /// -/// Returns a [`TrustedServerError`] if: -/// - The proxy request fails -/// - The origin backend is unreachable +/// Returns a [`TrustedServerError`] if the proxy request fails or the +/// origin backend is unreachable. pub fn handle_publisher_request( settings: &Settings, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result> { +) -> Result> { log::debug!("Proxying request to publisher_origin"); - // Prebid.js requests are not intercepted here anymore. The HTML processor removes - // publisher-supplied Prebid scripts; the unified TSJS bundle includes Prebid.js when enabled. - - // Extract request host and scheme (uses Host header and TLS detection after edge sanitization) let request_info = RequestInfo::from_request(&req); let request_host = &request_info.host; let request_scheme = &request_info.scheme; @@ -212,27 +273,14 @@ pub fn handle_publisher_request( req.get_header("x-forwarded-proto"), ); - // Parse cookies once for reuse by both consent extraction and synthetic ID logic. let cookie_jar = handle_request_cookies(&req)?; - - // Capture the current SSC cookie value for revocation handling. - // This must come from the cookie itself (not the x-synthetic-id header) - // to ensure KV deletion targets the same identifier being revoked. let existing_ssc_cookie = cookie_jar .as_ref() .and_then(|jar| jar.get(COOKIE_SYNTHETIC_ID)) .map(|cookie| cookie.value().to_owned()); - // Generate synthetic identifiers before the request body is consumed. - // Always generated for internal use (KV lookups, logging) even when - // consent is absent — the cookie is only *set* when consent allows it. let synthetic_id = get_or_generate_synthetic_id(settings, &req)?; - // Extract, decode, and log consent signals (TCF, GPP, US Privacy, GPC) - // from the incoming request. The ConsentContext carries both raw strings - // (for OpenRTB forwarding) and decoded data (for enforcement). - // When a consent_store is configured, this also persists consent to KV - // and falls back to stored consent when cookies are absent. let geo = crate::geo::GeoInfo::from_request(&req); let consent_context = build_consent_context(&ConsentPipelineInput { jar: cookie_jar.as_ref(), @@ -267,13 +315,22 @@ pub fn handle_publisher_request( message: "Failed to proxy request to origin".to_string(), })?; - // Log all response headers for debugging log::debug!("Response headers:"); for (name, value) in response.get_headers() { log::debug!(" {}: {:?}", name, value); } - // Check if the response has a text-based content type that we should process + // Set synthetic ID / cookie headers BEFORE body processing. + // These are body-independent (computed from request cookies + consent). + apply_synthetic_id_headers( + settings, + &mut response, + &synthetic_id, + ssc_allowed, + existing_ssc_cookie.as_deref(), + &consent_context, + ); + let content_type = response .get_header(header::CONTENT_TYPE) .map(|h| h.to_str().unwrap_or_default()) @@ -284,24 +341,60 @@ pub fn handle_publisher_request( || content_type.contains("application/javascript") || content_type.contains("application/json"); - if should_process && !request_host.is_empty() { - // Check if the response is compressed + // Streaming gate: can we stream this response? + // - Must have processable content + // - Must have a request host for URL rewriting + // - Backend must return success (already guaranteed — errors propagated above) + // - No HTML post-processors registered (they need the full document) + let is_html = content_type.contains("text/html"); + let has_post_processors = !integration_registry.html_post_processors().is_empty(); + let can_stream = + should_process && !request_host.is_empty() && (!is_html || !has_post_processors); + + if can_stream { let content_encoding = response .get_header(header::CONTENT_ENCODING) .map(|h| h.to_str().unwrap_or_default()) .unwrap_or_default() .to_lowercase(); - // Log response details for debugging log::debug!( - "Processing response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", content_type, content_encoding, request_host, origin_host ); - // Take the response body for streaming processing let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + + return Ok(PublisherResponse::Stream { + response, + body, + params: OwnedProcessResponseParams { + content_encoding, + origin_host, + origin_url: settings.publisher.origin_url.clone(), + request_host: request_host.to_string(), + request_scheme: request_scheme.to_string(), + content_type, + }, + }); + } - // Process the body using streaming approach + // Buffered fallback: process body in memory (post-processors need full document, + // or content type doesn't need processing). + if should_process && !request_host.is_empty() { + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); + + log::debug!( + "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); + + let body = response.take_body(); let params = ProcessResponseParams { content_encoding: &content_encoding, origin_host: &origin_host, @@ -325,24 +418,31 @@ pub fn handle_publisher_request( ); } - // Consent-gated SSC creation: - // - Consent given → set synthetic ID header + cookie. - // - Consent absent + existing cookie → revoke (expire cookie + delete KV entry). - // - Consent absent + no cookie → do nothing. + Ok(PublisherResponse::Buffered(response)) +} + +/// Apply synthetic ID and cookie headers to the response. +/// +/// Extracted so headers can be set before streaming begins (headers must +/// be finalized before `stream_to_client()` commits them). +fn apply_synthetic_id_headers( + settings: &Settings, + response: &mut Response, + synthetic_id: &str, + ssc_allowed: bool, + existing_ssc_cookie: Option<&str>, + consent_context: &crate::consent::ConsentContext, +) { if ssc_allowed { - // Fastly's HeaderValue API rejects \r, \n, and \0, so the synthetic ID - // cannot inject additional response headers. - response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); - // Cookie persistence is skipped if the synthetic ID contains RFC 6265-illegal - // characters. The header is still emitted when consent allows it. - set_synthetic_cookie(settings, &mut response, synthetic_id.as_str()); - } else if let Some(cookie_synthetic_id) = existing_ssc_cookie.as_deref() { + response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id); + set_synthetic_cookie(settings, response, synthetic_id); + } else if let Some(cookie_synthetic_id) = existing_ssc_cookie { log::info!( "SSC revoked for '{}': consent withdrawn (jurisdiction={})", cookie_synthetic_id, consent_context.jurisdiction, ); - expire_synthetic_cookie(settings, &mut response); + expire_synthetic_cookie(settings, response); if let Some(store_name) = &settings.consent.consent_store { crate::consent::kv::delete_consent_from_kv(store_name, cookie_synthetic_id); } @@ -352,8 +452,6 @@ pub fn handle_publisher_request( consent_context.jurisdiction, ); } - - Ok(response) } #[cfg(test)] From 3873e14452dc0415b29aedb7981b5659be428b4d Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:41:10 -0700 Subject: [PATCH 4/5] Address review: replace expect with log, restore stripped comments - Replace streaming_body.finish().expect() with log::error on failure (expect panics in WASM, and headers are already committed anyway) - Restore explanatory comments for cookie parsing, SSC capture, synthetic ID generation, and consent extraction ordering --- crates/trusted-server-adapter-fastly/src/main.rs | 6 ++---- crates/trusted-server-core/src/publisher.rs | 13 +++++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 4e4e62f1..bf90880f 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -177,10 +177,8 @@ async fn route_request( // sees a truncated response. Standard proxy behavior. log::error!("Streaming processing failed: {e:?}"); drop(streaming_body); - } else { - streaming_body - .finish() - .expect("should finish streaming body"); + } else if let Err(e) = streaming_body.finish() { + log::error!("Failed to finish streaming body: {e}"); } // Response already sent via stream_to_client() return None; diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index efd65fa1..2f10479f 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -273,14 +273,27 @@ pub fn handle_publisher_request( req.get_header("x-forwarded-proto"), ); + // Parse cookies once for reuse by both consent extraction and synthetic ID logic. let cookie_jar = handle_request_cookies(&req)?; + + // Capture the current SSC cookie value for revocation handling. + // This must come from the cookie itself (not the x-synthetic-id header) + // to ensure KV deletion targets the same identifier being revoked. let existing_ssc_cookie = cookie_jar .as_ref() .and_then(|jar| jar.get(COOKIE_SYNTHETIC_ID)) .map(|cookie| cookie.value().to_owned()); + // Generate synthetic identifiers before the request body is consumed. + // Always generated for internal use (KV lookups, logging) even when + // consent is absent — the cookie is only *set* when consent allows it. let synthetic_id = get_or_generate_synthetic_id(settings, &req)?; + // Extract, decode, and log consent signals (TCF, GPP, US Privacy, GPC) + // from the incoming request. The ConsentContext carries both raw strings + // (for OpenRTB forwarding) and decoded data (for enforcement). + // When a consent_store is configured, this also persists consent to KV + // and falls back to stored consent when cookies are absent. let geo = crate::geo::GeoInfo::from_request(&req); let consent_context = build_consent_context(&ConsentPipelineInput { jar: cookie_jar.as_ref(), From c7edd82e291e9c99c457e0bc8620c44020a9575a Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:46:00 -0700 Subject: [PATCH 5/5] Deduplicate content-encoding extraction and simplify flow Hoist the non-processable early return above the streaming gate so content_encoding extraction happens once. The streaming gate condition is also simplified since should_process and request_host are already guaranteed at that point. --- crates/trusted-server-core/src/publisher.rs | 83 +++++++++------------ 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 2f10479f..6a450623 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -354,23 +354,29 @@ pub fn handle_publisher_request( || content_type.contains("application/javascript") || content_type.contains("application/json"); + if !should_process || request_host.is_empty() { + log::debug!( + "Skipping response processing - should_process: {}, request_host: '{}'", + should_process, + request_host + ); + return Ok(PublisherResponse::Buffered(response)); + } + + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); + // Streaming gate: can we stream this response? - // - Must have processable content - // - Must have a request host for URL rewriting - // - Backend must return success (already guaranteed — errors propagated above) // - No HTML post-processors registered (they need the full document) + // - Non-HTML content always streams (post-processors only apply to HTML) let is_html = content_type.contains("text/html"); let has_post_processors = !integration_registry.html_post_processors().is_empty(); - let can_stream = - should_process && !request_host.is_empty() && (!is_html || !has_post_processors); + let can_stream = !is_html || !has_post_processors; if can_stream { - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); - log::debug!( "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", content_type, content_encoding, request_host, origin_host @@ -393,43 +399,28 @@ pub fn handle_publisher_request( }); } - // Buffered fallback: process body in memory (post-processors need full document, - // or content type doesn't need processing). - if should_process && !request_host.is_empty() { - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); + // Buffered fallback: post-processors need the full document. + log::debug!( + "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); - log::debug!( - "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", - content_type, content_encoding, request_host, origin_host - ); + let body = response.take_body(); + let params = ProcessResponseParams { + content_encoding: &content_encoding, + origin_host: &origin_host, + origin_url: &settings.publisher.origin_url, + request_host, + request_scheme, + settings, + content_type: &content_type, + integration_registry, + }; + let mut output = Vec::new(); + process_response_streaming(body, &mut output, ¶ms)?; - let body = response.take_body(); - let params = ProcessResponseParams { - content_encoding: &content_encoding, - origin_host: &origin_host, - origin_url: &settings.publisher.origin_url, - request_host, - request_scheme, - settings, - content_type: &content_type, - integration_registry, - }; - let mut output = Vec::new(); - process_response_streaming(body, &mut output, ¶ms)?; - - response.set_body(Body::from(output)); - response.remove_header(header::CONTENT_LENGTH); - } else { - log::debug!( - "Skipping response processing - should_process: {}, request_host: '{}'", - should_process, - request_host - ); - } + response.set_body(Body::from(output)); + response.remove_header(header::CONTENT_LENGTH); Ok(PublisherResponse::Buffered(response)) }