diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index d97c8402..bf90880f 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -1,6 +1,6 @@ use error_stack::Report; use fastly::http::Method; -use fastly::{Error, Request, Response}; +use fastly::{Request, Response}; use log_fastly::Logger; use trusted_server_core::auction::endpoints::handle_auction; @@ -18,7 +18,9 @@ use trusted_server_core::proxy::{ handle_first_party_click, handle_first_party_proxy, handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, }; -use trusted_server_core::publisher::{handle_publisher_request, handle_tsjs_dynamic}; +use trusted_server_core::publisher::{ + handle_publisher_request, handle_tsjs_dynamic, stream_publisher_body, PublisherResponse, +}; use trusted_server_core::request_signing::{ handle_deactivate_key, handle_rotate_key, handle_trusted_server_discovery, handle_verify_signature, @@ -29,21 +31,33 @@ use trusted_server_core::settings_data::get_settings; mod error; use crate::error::to_error_response; -#[fastly::main] -fn main(req: Request) -> Result { +/// Entry point for the Fastly Compute program. +/// +/// Uses an undecorated `main()` with `Request::from_client()` instead of +/// `#[fastly::main]` so we can call `stream_to_client()` or `send_to_client()` +/// explicitly. `#[fastly::main]` is syntactic sugar that auto-calls +/// `send_to_client()` on the returned `Response`, which is incompatible with +/// streaming. +fn main() { init_logger(); + let req = Request::from_client(); + // Keep the health probe independent from settings loading and routing so // readiness checks still get a cheap liveness response during startup. if req.get_method() == Method::GET && req.get_path() == "/health" { - return Ok(Response::from_status(200).with_body_text_plain("ok")); + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; } let settings = match get_settings() { Ok(s) => s, Err(e) => { log::error!("Failed to load settings: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; log::debug!("Settings {settings:?}"); @@ -55,16 +69,21 @@ fn main(req: Request) -> Result { Ok(r) => r, Err(e) => { log::error!("Failed to create integration registry: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; - futures::executor::block_on(route_request( + // route_request may send the response directly (streaming path) or + // return it for us to send (buffered path). + if let Some(response) = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, req, - )) + )) { + response.send_to_client(); + } } async fn route_request( @@ -72,7 +91,7 @@ async fn route_request( orchestrator: &AuctionOrchestrator, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result { +) -> Option { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -83,7 +102,7 @@ async fn route_request( if let Some(mut response) = enforce_basic_auth(settings, &req) { finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Some(response); } // Get path and method for routing @@ -139,7 +158,32 @@ async fn route_request( ); match handle_publisher_request(settings, integration_registry, req) { - Ok(response) => Ok(response), + Ok(PublisherResponse::Stream { + mut response, + body, + params, + }) => { + // Streaming path: finalize headers, then stream body to client. + finalize_response(settings, geo_info.as_ref(), &mut response); + let mut streaming_body = response.stream_to_client(); + if let Err(e) = stream_publisher_body( + body, + &mut streaming_body, + ¶ms, + settings, + integration_registry, + ) { + // Headers already sent (200 OK). Log and abort — client + // sees a truncated response. Standard proxy behavior. + log::error!("Streaming processing failed: {e:?}"); + drop(streaming_body); + } else if let Err(e) = streaming_body.finish() { + log::error!("Failed to finish streaming body: {e}"); + } + // Response already sent via stream_to_client() + return None; + } + Ok(PublisherResponse::Buffered(response)) => Ok(response), Err(e) => { log::error!("Failed to proxy to publisher origin: {:?}", e); Err(e) @@ -153,7 +197,7 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - Ok(response) + Some(response) } /// Applies all standard response headers: geo, version, staging, and configured headers. diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index a2f54441..6a450623 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -1,3 +1,5 @@ +use std::io::Write; + use error_stack::{Report, ResultExt}; use fastly::http::{header, StatusCode}; use fastly::{Body, Request, Response}; @@ -93,12 +95,21 @@ struct ProcessResponseParams<'a> { integration_registry: &'a IntegrationRegistry, } -/// Process response body in streaming fashion with compression preservation -fn process_response_streaming( +/// Process response body through the streaming pipeline. +/// +/// Selects the appropriate processor based on content type (HTML rewriter, +/// RSC Flight rewriter, or URL replacer) and pipes chunks from `body` +/// through it into `output`. The caller decides what `output` is — a +/// `Vec` for buffered responses, or a `StreamingBody` for streaming. +/// +/// # Errors +/// +/// Returns an error if processor creation or chunk processing fails. +fn process_response_streaming( body: Body, + output: &mut W, params: &ProcessResponseParams, -) -> Result> { - // Check if this is HTML content +) -> Result<(), Report> { let is_html = params.content_type.contains("text/html"); let is_rsc_flight = params.content_type.contains("text/x-component"); log::debug!( @@ -110,15 +121,14 @@ fn process_response_streaming( params.origin_host ); - // Determine compression type let compression = Compression::from_content_encoding(params.content_encoding); + let config = PipelineConfig { + input_compression: compression, + output_compression: compression, + chunk_size: 8192, + }; - // Create output body to collect results - let mut output = Vec::new(); - - // Choose processor based on content type if is_html { - // Use HTML rewriter for HTML content let processor = create_html_stream_processor( params.origin_host, params.request_host, @@ -126,57 +136,26 @@ fn process_response_streaming( params.settings, params.integration_registry, )?; - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else if is_rsc_flight { - // RSC Flight responses are length-prefixed (T rows). A naive string replacement will - // corrupt the stream by changing byte lengths without updating the prefixes. let processor = RscFlightUrlRewriter::new( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else { - // Use simple text replacer for non-HTML content let replacer = create_url_replacer( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, replacer); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, replacer).process(body, output)?; } - log::debug!( - "Streaming processing complete - output size: {} bytes", - output.len() - ); - Ok(Body::from(output)) + Ok(()) } /// Create a unified HTML stream processor @@ -200,28 +179,87 @@ fn create_html_stream_processor( Ok(create_html_processor(config)) } +/// Result of publisher request handling, indicating whether the response +/// body should be streamed or has already been buffered. +pub enum PublisherResponse { + /// Response is fully buffered and ready to send via `send_to_client()`. + Buffered(Response), + /// Response headers are ready. The caller must: + /// 1. Call `finalize_response()` on the response + /// 2. Call `response.stream_to_client()` to get a `StreamingBody` + /// 3. Call `stream_publisher_body()` with the body and streaming writer + /// 4. Call `StreamingBody::finish()` + Stream { + /// Response with all headers set (synthetic ID, cookies, etc.) + /// but body not yet written. `Content-Length` already removed. + response: Response, + /// Origin body to be piped through the streaming pipeline. + body: Body, + /// Parameters for `process_response_streaming`. + params: OwnedProcessResponseParams, + }, +} + +/// Owned version of [`ProcessResponseParams`] for returning from +/// `handle_publisher_request` without lifetime issues. +pub struct OwnedProcessResponseParams { + pub content_encoding: String, + pub origin_host: String, + pub origin_url: String, + pub request_host: String, + pub request_scheme: String, + pub content_type: String, +} + +/// Stream the publisher response body through the processing pipeline. +/// +/// Called by the adapter after `stream_to_client()` has committed the +/// response headers. Writes processed chunks directly to `output`. +/// +/// # Errors +/// +/// Returns an error if processing fails mid-stream. Since headers are +/// already committed, the caller should log the error and drop the +/// `StreamingBody` (client sees a truncated response). +pub fn stream_publisher_body( + body: Body, + output: &mut W, + params: &OwnedProcessResponseParams, + settings: &Settings, + integration_registry: &IntegrationRegistry, +) -> Result<(), Report> { + let borrowed = ProcessResponseParams { + content_encoding: ¶ms.content_encoding, + origin_host: ¶ms.origin_host, + origin_url: ¶ms.origin_url, + request_host: ¶ms.request_host, + request_scheme: ¶ms.request_scheme, + settings, + content_type: ¶ms.content_type, + integration_registry, + }; + process_response_streaming(body, output, &borrowed) +} + /// Proxies requests to the publisher's origin server. /// -/// This function forwards incoming requests to the configured origin URL, -/// preserving headers and request body. It's used as a fallback for routes -/// not explicitly handled by the trusted server. +/// Returns a [`PublisherResponse`] indicating whether the response can be +/// streamed or must be sent buffered. The streaming path is chosen when: +/// - The backend returns a 2xx status +/// - The response has a processable content type +/// - No HTML post-processors are registered (the streaming gate) /// /// # Errors /// -/// Returns a [`TrustedServerError`] if: -/// - The proxy request fails -/// - The origin backend is unreachable +/// Returns a [`TrustedServerError`] if the proxy request fails or the +/// origin backend is unreachable. pub fn handle_publisher_request( settings: &Settings, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result> { +) -> Result> { log::debug!("Proxying request to publisher_origin"); - // Prebid.js requests are not intercepted here anymore. The HTML processor removes - // publisher-supplied Prebid scripts; the unified TSJS bundle includes Prebid.js when enabled. - - // Extract request host and scheme (uses Host header and TLS detection after edge sanitization) let request_info = RequestInfo::from_request(&req); let request_host = &request_info.host; let request_scheme = &request_info.scheme; @@ -290,13 +328,22 @@ pub fn handle_publisher_request( message: "Failed to proxy request to origin".to_string(), })?; - // Log all response headers for debugging log::debug!("Response headers:"); for (name, value) in response.get_headers() { log::debug!(" {}: {:?}", name, value); } - // Check if the response has a text-based content type that we should process + // Set synthetic ID / cookie headers BEFORE body processing. + // These are body-independent (computed from request cookies + consent). + apply_synthetic_id_headers( + settings, + &mut response, + &synthetic_id, + ssc_allowed, + existing_ssc_cookie.as_deref(), + &consent_context, + ); + let content_type = response .get_header(header::CONTENT_TYPE) .map(|h| h.to_str().unwrap_or_default()) @@ -307,82 +354,99 @@ pub fn handle_publisher_request( || content_type.contains("application/javascript") || content_type.contains("application/json"); - if should_process && !request_host.is_empty() { - // Check if the response is compressed - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); - - // Log response details for debugging + if !should_process || request_host.is_empty() { log::debug!( - "Processing response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", - content_type, content_encoding, request_host, origin_host + "Skipping response processing - should_process: {}, request_host: '{}'", + should_process, + request_host ); + return Ok(PublisherResponse::Buffered(response)); + } - // Take the response body for streaming processing - let body = response.take_body(); + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); - // Process the body using streaming approach - let params = ProcessResponseParams { - content_encoding: &content_encoding, - origin_host: &origin_host, - origin_url: &settings.publisher.origin_url, - request_host, - request_scheme, - settings, - content_type: &content_type, - integration_registry, - }; - match process_response_streaming(body, ¶ms) { - Ok(processed_body) => { - // Set the processed body back - response.set_body(processed_body); - - // Remove Content-Length as the size has likely changed - response.remove_header(header::CONTENT_LENGTH); - - // Keep Content-Encoding header since we're returning compressed content - log::debug!( - "Preserved Content-Encoding: {} for compressed response", - content_encoding - ); - - log::debug!("Completed streaming processing of response body"); - } - Err(e) => { - log::error!("Failed to process response body: {:?}", e); - // Return an error response - return Err(e); - } - } - } else { + // Streaming gate: can we stream this response? + // - No HTML post-processors registered (they need the full document) + // - Non-HTML content always streams (post-processors only apply to HTML) + let is_html = content_type.contains("text/html"); + let has_post_processors = !integration_registry.html_post_processors().is_empty(); + let can_stream = !is_html || !has_post_processors; + + if can_stream { log::debug!( - "Skipping response processing - should_process: {}, request_host: '{}'", - should_process, - request_host + "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host ); + + let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + + return Ok(PublisherResponse::Stream { + response, + body, + params: OwnedProcessResponseParams { + content_encoding, + origin_host, + origin_url: settings.publisher.origin_url.clone(), + request_host: request_host.to_string(), + request_scheme: request_scheme.to_string(), + content_type, + }, + }); } - // Consent-gated SSC creation: - // - Consent given → set synthetic ID header + cookie. - // - Consent absent + existing cookie → revoke (expire cookie + delete KV entry). - // - Consent absent + no cookie → do nothing. + // Buffered fallback: post-processors need the full document. + log::debug!( + "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); + + let body = response.take_body(); + let params = ProcessResponseParams { + content_encoding: &content_encoding, + origin_host: &origin_host, + origin_url: &settings.publisher.origin_url, + request_host, + request_scheme, + settings, + content_type: &content_type, + integration_registry, + }; + let mut output = Vec::new(); + process_response_streaming(body, &mut output, ¶ms)?; + + response.set_body(Body::from(output)); + response.remove_header(header::CONTENT_LENGTH); + + Ok(PublisherResponse::Buffered(response)) +} + +/// Apply synthetic ID and cookie headers to the response. +/// +/// Extracted so headers can be set before streaming begins (headers must +/// be finalized before `stream_to_client()` commits them). +fn apply_synthetic_id_headers( + settings: &Settings, + response: &mut Response, + synthetic_id: &str, + ssc_allowed: bool, + existing_ssc_cookie: Option<&str>, + consent_context: &crate::consent::ConsentContext, +) { if ssc_allowed { - // Fastly's HeaderValue API rejects \r, \n, and \0, so the synthetic ID - // cannot inject additional response headers. - response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); - // Cookie persistence is skipped if the synthetic ID contains RFC 6265-illegal - // characters. The header is still emitted when consent allows it. - set_synthetic_cookie(settings, &mut response, synthetic_id.as_str()); - } else if let Some(cookie_synthetic_id) = existing_ssc_cookie.as_deref() { + response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id); + set_synthetic_cookie(settings, response, synthetic_id); + } else if let Some(cookie_synthetic_id) = existing_ssc_cookie { log::info!( "SSC revoked for '{}': consent withdrawn (jurisdiction={})", cookie_synthetic_id, consent_context.jurisdiction, ); - expire_synthetic_cookie(settings, &mut response); + expire_synthetic_cookie(settings, response); if let Some(store_name) = &settings.consent.consent_store { crate::consent::kv::delete_consent_from_kv(store_name, cookie_synthetic_id); } @@ -392,8 +456,6 @@ pub fn handle_publisher_request( consent_context.jurisdiction, ); } - - Ok(response) } #[cfg(test)]