diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 540ab29d..079681db 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -17,9 +17,22 @@ use crate::settings::Settings; use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; use crate::tsjs; +/// Wraps [`HtmlRewriterAdapter`] with optional post-processing. +/// +/// When `post_processors` is empty (the common streaming path), chunks pass +/// through immediately with no extra copying. When post-processors are +/// registered, intermediate output is accumulated in `accumulated_output` +/// until `is_last`, then post-processors run on the full document. This adds +/// an extra copy per chunk compared to the pre-streaming adapter (which +/// accumulated raw input instead of rewriter output). The overhead is +/// acceptable because the post-processor path is already fully buffered — +/// the real streaming win comes from the empty-post-processor path in Phase 2. struct HtmlWithPostProcessing { inner: HtmlRewriterAdapter, post_processors: Vec>, + /// Buffer that accumulates all intermediate output when post-processors + /// need the full document. Left empty on the streaming-only path. + accumulated_output: Vec, origin_host: String, request_host: String, request_scheme: String, @@ -29,12 +42,26 @@ struct HtmlWithPostProcessing { impl StreamProcessor for HtmlWithPostProcessing { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { let output = self.inner.process_chunk(chunk, is_last)?; - if !is_last || output.is_empty() || self.post_processors.is_empty() { + + // Streaming-optimized path: no post-processors, pass through immediately. + if self.post_processors.is_empty() { return Ok(output); } - let Ok(output_str) = std::str::from_utf8(&output) else { - return Ok(output); + // Post-processors need the full document. Accumulate until the last chunk. + self.accumulated_output.extend_from_slice(&output); + if !is_last { + return Ok(Vec::new()); + } + + // Final chunk: run post-processors on the full accumulated output. + let full_output = std::mem::take(&mut self.accumulated_output); + if full_output.is_empty() { + return Ok(full_output); + } + + let Ok(output_str) = std::str::from_utf8(&full_output) else { + return Ok(full_output); }; let ctx = IntegrationHtmlContext { @@ -50,10 +77,10 @@ impl StreamProcessor for HtmlWithPostProcessing { .iter() .any(|p| p.should_process(output_str, &ctx)) { - return Ok(output); + return Ok(full_output); } - let mut html = String::from_utf8(output).map_err(|e| { + let mut html = String::from_utf8(full_output).map_err(|e| { io::Error::other(format!( "HTML post-processing expected valid UTF-8 output: {e}" )) @@ -77,10 +104,11 @@ impl StreamProcessor for HtmlWithPostProcessing { Ok(html.into_bytes()) } - fn reset(&mut self) { - self.inner.reset(); - self.document_state.clear(); - } + /// No-op. `HtmlWithPostProcessing` wraps a single-use + /// [`HtmlRewriterAdapter`] that cannot be reset. Clearing auxiliary + /// state without resetting the rewriter would leave the processor + /// in an inconsistent state, so this method intentionally does nothing. + fn reset(&mut self) {} } /// Configuration for HTML processing @@ -427,6 +455,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso }), ]; + let has_script_rewriters = !script_rewriters.is_empty(); for script_rewriter in script_rewriters { let selector = script_rewriter.selector(); let rewriter = script_rewriter.clone(); @@ -464,9 +493,21 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso ..RewriterSettings::default() }; + // Use buffered mode when script rewriters are registered. lol_html fragments + // text nodes across input chunk boundaries, breaking rewriters that expect + // complete text (e.g., __NEXT_DATA__, GTM). Buffered mode feeds the entire + // document in one write() call, preserving text node integrity. + // Phase 3 will make rewriters fragment-safe, enabling streaming for all configs. + let inner = if has_script_rewriters { + HtmlRewriterAdapter::new_buffered(rewriter_settings) + } else { + HtmlRewriterAdapter::new(rewriter_settings) + }; + HtmlWithPostProcessing { - inner: HtmlRewriterAdapter::new(rewriter_settings), + inner, post_processors, + accumulated_output: Vec::new(), origin_host: config.origin_host, request_host: config.request_host, request_scheme: config.request_scheme, @@ -991,4 +1032,151 @@ mod tests { .collect::() ); } + + #[test] + fn post_processors_accumulate_while_streaming_path_passes_through() { + use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; + use lol_html::Settings; + + // --- Streaming path: no post-processors → output emitted per chunk --- + let mut streaming = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: Vec::new(), + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + let chunk1 = streaming + .process_chunk(b"", false) + .expect("should process chunk1"); + let chunk2 = streaming + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let chunk3 = streaming + .process_chunk(b"", true) + .expect("should process final chunk"); + + assert!( + !chunk1.is_empty() || !chunk2.is_empty(), + "should emit intermediate output on streaming path" + ); + + let mut streaming_all = chunk1; + streaming_all.extend_from_slice(&chunk2); + streaming_all.extend_from_slice(&chunk3); + + // --- Buffered path: post-processor registered → accumulates until is_last --- + struct NoopPostProcessor; + impl IntegrationHtmlPostProcessor for NoopPostProcessor { + fn integration_id(&self) -> &'static str { + "test-noop" + } + fn post_process(&self, _html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool { + false + } + } + + let mut buffered = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: vec![Arc::new(NoopPostProcessor)], + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + let buf1 = buffered + .process_chunk(b"", false) + .expect("should process chunk1"); + let buf2 = buffered + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let buf3 = buffered + .process_chunk(b"", true) + .expect("should process final chunk"); + + assert!( + buf1.is_empty() && buf2.is_empty(), + "should return empty for intermediate chunks when post-processors are registered" + ); + assert!( + !buf3.is_empty(), + "should emit all output in final chunk when post-processors are registered" + ); + + // Both paths should produce identical output + let streaming_str = + String::from_utf8(streaming_all).expect("streaming output should be valid UTF-8"); + let buffered_str = String::from_utf8(buf3).expect("buffered output should be valid UTF-8"); + assert_eq!( + streaming_str, buffered_str, + "streaming and buffered paths should produce identical output" + ); + } + + #[test] + fn active_post_processor_receives_full_document_and_mutates_output() { + use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; + use lol_html::Settings; + + struct AppendCommentProcessor; + impl IntegrationHtmlPostProcessor for AppendCommentProcessor { + fn integration_id(&self) -> &'static str { + "test-append" + } + fn should_process(&self, html: &str, _ctx: &IntegrationHtmlContext<'_>) -> bool { + html.contains("") + } + fn post_process(&self, html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool { + html.push_str(""); + true + } + } + + let mut processor = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: vec![Arc::new(AppendCommentProcessor)], + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + // Feed multiple chunks + let r1 = processor + .process_chunk(b"", false) + .expect("should process chunk1"); + let r2 = processor + .process_chunk(b"

content

", false) + .expect("should process chunk2"); + let r3 = processor + .process_chunk(b"", true) + .expect("should process final chunk"); + + // Intermediate chunks return empty (buffered for post-processor) + assert!( + r1.is_empty() && r2.is_empty(), + "should buffer intermediate chunks" + ); + + // Final chunk contains the full document with post-processor mutation + let output = String::from_utf8(r3).expect("should be valid UTF-8"); + assert!( + output.contains("

content

"), + "should contain original content" + ); + assert!( + output.contains(""), + "should contain complete document" + ); + assert!( + output.contains(""), + "should contain post-processor mutation" + ); + } } diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index cda62e6f..5a4ea290 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -6,8 +6,11 @@ //! - Memory-efficient streaming //! - UTF-8 boundary handling -use error_stack::{Report, ResultExt}; +use std::cell::RefCell; use std::io::{self, Read, Write}; +use std::rc::Rc; + +use error_stack::{Report, ResultExt}; use crate::error::TrustedServerError; @@ -32,7 +35,7 @@ pub trait StreamProcessor { } /// Compression type for the stream -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Compression { None, Gzip, @@ -53,7 +56,21 @@ impl Compression { } } -/// Configuration for the streaming pipeline +/// Configuration for the streaming pipeline. +/// +/// # Supported compression combinations +/// +/// | Input | Output | Behavior | +/// |-------|--------|----------| +/// | None | None | Pass-through processing | +/// | Gzip | Gzip | Decompress → process → recompress | +/// | Gzip | None | Decompress → process | +/// | Deflate | Deflate | Decompress → process → recompress | +/// | Deflate | None | Decompress → process | +/// | Brotli | Brotli | Decompress → process → recompress | +/// | Brotli | None | Decompress → process | +/// +/// All other combinations return an error at runtime. pub struct PipelineConfig { /// Input compression type pub input_compression: Compression, @@ -91,6 +108,10 @@ impl StreamingPipeline

{ /// Process a stream from input to output /// + /// Handles all supported compression transformations by wrapping the raw + /// reader/writer in the appropriate decoder/encoder, then delegating to + /// [`Self::process_chunks`]. + /// /// # Errors /// /// Returns an error if the compression transformation is unsupported or if reading/writing fails. @@ -103,44 +124,104 @@ impl StreamingPipeline

{ self.config.input_compression, self.config.output_compression, ) { - (Compression::None, Compression::None) => self.process_uncompressed(input, output), - (Compression::Gzip, Compression::Gzip) => self.process_gzip_to_gzip(input, output), - (Compression::Gzip, Compression::None) => self.process_gzip_to_none(input, output), + (Compression::None, Compression::None) => self.process_chunks(input, output), + (Compression::Gzip, Compression::Gzip) => { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let decoder = GzDecoder::new(input); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + self.process_chunks(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize gzip encoder".to_string(), + })?; + Ok(()) + } + (Compression::Gzip, Compression::None) => { + use flate2::read::GzDecoder; + + self.process_chunks(GzDecoder::new(input), output) + } (Compression::Deflate, Compression::Deflate) => { - self.process_deflate_to_deflate(input, output) + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let decoder = ZlibDecoder::new(input); + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + self.process_chunks(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize deflate encoder".to_string(), + })?; + Ok(()) } (Compression::Deflate, Compression::None) => { - self.process_deflate_to_none(input, output) + use flate2::read::ZlibDecoder; + + self.process_chunks(ZlibDecoder::new(input), output) } (Compression::Brotli, Compression::Brotli) => { - self.process_brotli_to_brotli(input, output) + use brotli::enc::writer::CompressorWriter; + use brotli::enc::BrotliEncoderParams; + use brotli::Decompressor; + + let decoder = Decompressor::new(input, 4096); + let params = BrotliEncoderParams { + quality: 4, + lgwin: 22, + ..Default::default() + }; + let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); + self.process_chunks(decoder, &mut encoder)?; + // CompressorWriter emits the brotli stream trailer via flush(), + // which process_chunks already called. into_inner() avoids a + // redundant flush on drop and makes finalization explicit. + // Note: unlike flate2's finish(), CompressorWriter has no + // fallible finalization method — flush() is the only option. + let _ = encoder.into_inner(); + Ok(()) + } + (Compression::Brotli, Compression::None) => { + use brotli::Decompressor; + + self.process_chunks(Decompressor::new(input, 4096), output) } - (Compression::Brotli, Compression::None) => self.process_brotli_to_none(input, output), _ => Err(Report::new(TrustedServerError::Proxy { message: "Unsupported compression transformation".to_string(), })), } } - /// Process uncompressed stream - fn process_uncompressed( + /// Read chunks from `reader`, pass each through the processor, and write output to `writer`. + /// + /// This is the single unified chunk loop used by all compression paths. + /// The method calls `writer.flush()` before returning. For the `None → None` + /// path this is the only finalization needed. For compressed paths, the caller + /// must still call the encoder's type-specific finalization after this returns: + /// - **flate2** (`GzEncoder`, `ZlibEncoder`): call `finish()` — `flush()` does + /// not write the gzip/deflate trailer. + /// - **brotli** (`CompressorWriter`): `flush()` does finalize the stream, so + /// the caller only needs `into_inner()` to reclaim the writer. + /// + /// # Errors + /// + /// Returns an error if reading, processing, or writing any chunk fails. + fn process_chunks( &mut self, - mut input: R, - mut output: W, + mut reader: R, + mut writer: W, ) -> Result<(), Report> { let mut buffer = vec![0u8; self.config.chunk_size]; loop { - match input.read(&mut buffer) { + match reader.read(&mut buffer) { Ok(0) => { - // End of stream - process any remaining data let final_chunk = self.processor.process_chunk(&[], true).change_context( TrustedServerError::Proxy { message: "Failed to process final chunk".to_string(), }, )?; if !final_chunk.is_empty() { - output.write_all(&final_chunk).change_context( + writer.write_all(&final_chunk).change_context( TrustedServerError::Proxy { message: "Failed to write final chunk".to_string(), }, @@ -149,7 +230,6 @@ impl StreamingPipeline

{ break; } Ok(n) => { - // Process this chunk let processed = self .processor .process_chunk(&buffer[..n], false) @@ -157,7 +237,7 @@ impl StreamingPipeline

{ message: "Failed to process chunk".to_string(), })?; if !processed.is_empty() { - output + writer .write_all(&processed) .change_context(TrustedServerError::Proxy { message: "Failed to write processed chunk".to_string(), @@ -166,248 +246,89 @@ impl StreamingPipeline

{ } Err(e) => { return Err(Report::new(TrustedServerError::Proxy { - message: format!("Failed to read from input: {}", e), + message: format!("Failed to read: {e}"), })); } } } - output.flush().change_context(TrustedServerError::Proxy { + writer.flush().change_context(TrustedServerError::Proxy { message: "Failed to flush output".to_string(), })?; Ok(()) } +} - /// Process gzip compressed stream - fn process_gzip_to_gzip( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::GzDecoder; - use flate2::write::GzEncoder; - use flate2::Compression; - - // Decompress input - let mut decoder = GzDecoder::new(input); - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: "Failed to decompress gzip".to_string(), - })?; - - log::info!("Decompressed size: {} bytes", decompressed.len()); - - // Process the decompressed content - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("Processed size: {} bytes", processed.len()); - - // Recompress the output - let mut encoder = GzEncoder::new(output, Compression::default()); - encoder - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write to gzip encoder".to_string(), - })?; - encoder.finish().change_context(TrustedServerError::Proxy { - message: "Failed to finish gzip encoder".to_string(), - })?; - - Ok(()) - } - - /// Decompress input, process content, and write uncompressed output. - fn decompress_and_process( - &mut self, - mut decoder: R, - mut output: W, - codec_name: &str, - ) -> Result<(), Report> { - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: format!("Failed to decompress {codec_name}"), - })?; - - log::info!( - "{codec_name} decompressed size: {} bytes", - decompressed.len() - ); - - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("{codec_name} processed size: {} bytes", processed.len()); - - output - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write output".to_string(), - })?; - - Ok(()) - } - - /// Process gzip compressed input to uncompressed output (decompression only) - fn process_gzip_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::GzDecoder; - - self.decompress_and_process(GzDecoder::new(input), output, "gzip") - } - - /// Process deflate compressed stream - fn process_deflate_to_deflate( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::ZlibDecoder; - use flate2::write::ZlibEncoder; - use flate2::Compression; - - let decoder = ZlibDecoder::new(input); - let encoder = ZlibEncoder::new(output, Compression::default()); - - self.process_through_compression(decoder, encoder) - } - - /// Process deflate compressed input to uncompressed output (decompression only) - fn process_deflate_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::ZlibDecoder; - - self.decompress_and_process(ZlibDecoder::new(input), output, "deflate") - } - - /// Process brotli compressed stream - fn process_brotli_to_brotli( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use brotli::enc::writer::CompressorWriter; - use brotli::enc::BrotliEncoderParams; - use brotli::Decompressor; - - let decoder = Decompressor::new(input, 4096); - let params = BrotliEncoderParams { - quality: 4, - lgwin: 22, - ..Default::default() - }; - let encoder = CompressorWriter::with_params(output, 4096, ¶ms); - - self.process_through_compression(decoder, encoder) - } - - /// Process brotli compressed input to uncompressed output (decompression only) - fn process_brotli_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use brotli::Decompressor; - - self.decompress_and_process(Decompressor::new(input, 4096), output, "brotli") - } - - /// Generic processing through compression layers - fn process_through_compression( - &mut self, - mut decoder: R, - mut encoder: W, - ) -> Result<(), Report> { - let mut buffer = vec![0u8; self.config.chunk_size]; - - loop { - match decoder.read(&mut buffer) { - Ok(0) => { - // End of stream - let final_chunk = self.processor.process_chunk(&[], true).change_context( - TrustedServerError::Proxy { - message: "Failed to process final chunk".to_string(), - }, - )?; - if !final_chunk.is_empty() { - encoder.write_all(&final_chunk).change_context( - TrustedServerError::Proxy { - message: "Failed to write final chunk".to_string(), - }, - )?; - } - break; - } - Ok(n) => { - let processed = self - .processor - .process_chunk(&buffer[..n], false) - .change_context(TrustedServerError::Proxy { - message: "Failed to process chunk".to_string(), - })?; - if !processed.is_empty() { - encoder.write_all(&processed).change_context( - TrustedServerError::Proxy { - message: "Failed to write processed chunk".to_string(), - }, - )?; - } - } - Err(e) => { - return Err(Report::new(TrustedServerError::Proxy { - message: format!("Failed to read from decoder: {}", e), - })); - } - } - } - - // Flush encoder (this also finishes compression) - encoder.flush().change_context(TrustedServerError::Proxy { - message: "Failed to flush encoder".to_string(), - })?; - - // For GzEncoder and similar, we need to finish() to properly close the stream - // The flush above might not be enough - drop(encoder); +/// Shared output buffer used as an [`lol_html::OutputSink`]. +/// +/// The `HtmlRewriter` invokes [`OutputSink::handle_chunk`] synchronously during +/// each [`HtmlRewriter::write`] call, so the buffer is drained after every +/// `process_chunk` invocation to emit output incrementally. +struct RcVecSink(Rc>>); - Ok(()) +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); } } -/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor` -/// Important: Due to `lol_html`'s ownership model, we must accumulate input -/// and process it all at once when the stream ends. This is a limitation -/// of the `lol_html` library's API design. +/// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`]. +/// +/// Operates in one of two modes: +/// +/// - **Streaming** ([`new`](Self::new)): output is emitted incrementally on every +/// [`process_chunk`](StreamProcessor::process_chunk) call. Use when no script +/// rewriters are registered. +/// - **Buffered** ([`new_buffered`](Self::new_buffered)): input is accumulated and +/// processed in a single `write()` call on `is_last`. Use when script rewriters +/// are registered, because `lol_html` fragments text nodes across chunk boundaries +/// and rewriters that expect complete text content would silently miss rewrites on +/// split fragments. (See Phase 3 plan for making rewriters fragment-safe.) +/// +/// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`] +/// is a no-op because the rewriter consumes its settings on construction. pub struct HtmlRewriterAdapter { - settings: lol_html::Settings<'static, 'static>, + rewriter: Option>, + output: Rc>>, + /// When true, input is accumulated and fed to `lol_html` in one pass on `is_last`. + buffered: bool, + /// Accumulated input for the buffered path. accumulated_input: Vec, } impl HtmlRewriterAdapter { - /// Create a new HTML rewriter adapter + /// Create a new HTML rewriter adapter that streams output per chunk. + /// + /// Use [`Self::new_buffered`] when script rewriters are registered to + /// avoid text node fragmentation. #[must_use] pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); + Self { + rewriter: Some(rewriter), + output, + buffered: false, + accumulated_input: Vec::new(), + } + } + + /// Create a new HTML rewriter adapter that buffers all input before processing. + /// + /// This avoids `lol_html` text node fragmentation that breaks script rewriters + /// expecting complete text content. The entire document is fed to the rewriter + /// in a single `write()` call when `is_last` is true. + #[must_use] + pub fn new_buffered(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); Self { - settings, + rewriter: Some(rewriter), + output, + buffered: true, accumulated_input: Vec::new(), } } @@ -415,60 +336,70 @@ impl HtmlRewriterAdapter { impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - // Accumulate input chunks - self.accumulated_input.extend_from_slice(chunk); - - if !chunk.is_empty() { - log::debug!( - "Buffering chunk: {} bytes, total buffered: {} bytes", - chunk.len(), - self.accumulated_input.len() - ); + if self.buffered { + // Buffered mode: accumulate input, process all at once on is_last. + if !chunk.is_empty() { + if self.rewriter.is_none() { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); + } else { + self.accumulated_input.extend_from_slice(chunk); + } + } + if !is_last { + return Ok(Vec::new()); + } + if let Some(rewriter) = &mut self.rewriter { + if !self.accumulated_input.is_empty() { + let input = std::mem::take(&mut self.accumulated_input); + rewriter.write(&input).map_err(|e| { + log::error!("Failed to process HTML: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + } else { + // Streaming mode: feed chunks to `lol_html` incrementally. + match &mut self.rewriter { + Some(rewriter) => { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + None if !chunk.is_empty() => { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); + } + None => {} + } } - // Only process when we have all the input if is_last { - log::info!( - "Processing complete document: {} bytes", - self.accumulated_input.len() - ); - - // Process all accumulated input at once - let mut output = Vec::new(); - - // Create rewriter with output sink - let mut rewriter = lol_html::HtmlRewriter::new( - std::mem::take(&mut self.settings), - |chunk: &[u8]| { - output.extend_from_slice(chunk); - }, - ); - - // Process the entire document - rewriter.write(&self.accumulated_input).map_err(|e| { - log::error!("Failed to process HTML: {}", e); - io::Error::other(format!("HTML processing failed: {}", e)) - })?; - - // Finalize the rewriter - rewriter.end().map_err(|e| { - log::error!("Failed to finalize: {}", e); - io::Error::other(format!("HTML finalization failed: {}", e)) - })?; - - log::debug!("Output size: {} bytes", output.len()); - self.accumulated_input.clear(); - Ok(output) - } else { - // Return empty until we have all input - // This is a limitation of lol_html's API - Ok(Vec::new()) + if let Some(rewriter) = self.rewriter.take() { + rewriter.end().map_err(|e| { + log::error!("Failed to finalize HTML: {e}"); + io::Error::other(format!("HTML finalization failed: {e}")) + })?; + } } - } - fn reset(&mut self) { - self.accumulated_input.clear(); + // Drain whatever lol_html produced since the last call + Ok(std::mem::take(&mut *self.output.borrow_mut())) } + + /// No-op. `HtmlRewriterAdapter` is single-use: the rewriter consumes its + /// [`Settings`](lol_html::Settings) on construction and cannot be recreated. + /// Calling [`process_chunk`](StreamProcessor::process_chunk) after + /// [`process_chunk`](StreamProcessor::process_chunk) with `is_last = true` + /// will produce empty output. + fn reset(&mut self) {} } /// Adapter to use our existing `StreamingReplacer` as a `StreamProcessor` @@ -485,6 +416,110 @@ mod tests { use super::*; use crate::streaming_replacer::{Replacement, StreamingReplacer}; + /// Verify that `lol_html` fragments text nodes when input chunks split + /// mid-text-node. This is critical: if `lol_html` does fragment, then + /// script rewriters (`NextJS` `__NEXT_DATA__`, `GTM`) that expect full + /// text content will silently miss rewrites when the streaming adapter + /// feeds chunks incrementally. + #[test] + fn lol_html_fragments_text_across_chunk_boundaries() { + use std::cell::RefCell; + use std::rc::Rc; + + let fragments: Rc>> = Rc::new(RefCell::new(Vec::new())); + let fragments_clone = Rc::clone(&fragments); + + let mut rewriter = lol_html::HtmlRewriter::new( + lol_html::Settings { + element_content_handlers: vec![lol_html::text!("script", move |text| { + fragments_clone + .borrow_mut() + .push((text.as_str().to_string(), text.last_in_text_node())); + Ok(()) + })], + ..lol_html::Settings::default() + }, + |_chunk: &[u8]| {}, + ); + + // Split "googletagmanager.com/gtm.js" across two chunks + rewriter + .write(b"") + .expect("should write chunk2"); + rewriter.end().expect("should end"); + + let frags = fragments.borrow(); + // lol_html should emit at least 2 text fragments since input was split + assert!( + frags.len() >= 2, + "should fragment text across chunk boundaries, got {} fragments: {:?}", + frags.len(), + *frags + ); + // No single fragment should contain the full domain + assert!( + !frags + .iter() + .any(|(text, _)| text.contains("googletagmanager.com")), + "no individual fragment should contain the full domain when split across chunks: {:?}", + *frags + ); + } + + /// Companion to [`lol_html_fragments_text_across_chunk_boundaries`]: + /// proves that `new_buffered()` prevents fragmentation by feeding the + /// entire document to `lol_html` in one `write()` call. + #[test] + fn buffered_adapter_prevents_text_fragmentation() { + use std::cell::RefCell; + use std::rc::Rc; + + let fragments: Rc>> = Rc::new(RefCell::new(Vec::new())); + let fragments_clone = Rc::clone(&fragments); + + let settings = lol_html::Settings { + element_content_handlers: vec![lol_html::text!("script", move |text| { + fragments_clone + .borrow_mut() + .push((text.as_str().to_string(), text.last_in_text_node())); + Ok(()) + })], + ..lol_html::Settings::default() + }; + + let mut adapter = HtmlRewriterAdapter::new_buffered(settings); + + // Feed the same split chunks as the fragmentation test + let r1 = adapter + .process_chunk(b"", true) + .expect("should process chunk2"); + assert!( + !r2.is_empty(), + "buffered adapter should emit output on is_last" + ); + + let frags = fragments.borrow(); + // With buffered mode, the text handler should see the complete string + assert!( + frags + .iter() + .any(|(text, _)| text.contains("googletagmanager.com")), + "buffered adapter should deliver complete text to handler, got: {:?}", + *frags + ); + } + #[test] fn test_uncompressed_pipeline() { let replacer = StreamingReplacer::new(vec![Replacement { @@ -534,7 +569,7 @@ mod tests { } #[test] - fn test_html_rewriter_adapter_accumulates_until_last() { + fn test_html_rewriter_adapter_streams_incrementally() { use lol_html::{element, Settings}; // Create a simple HTML rewriter that replaces text @@ -548,32 +583,40 @@ mod tests { let mut adapter = HtmlRewriterAdapter::new(settings); - // Test that intermediate chunks return empty let chunk1 = b""; let result1 = adapter .process_chunk(chunk1, false) .expect("should process chunk1"); - assert_eq!(result1.len(), 0, "Should return empty for non-last chunk"); let chunk2 = b"

original

"; let result2 = adapter .process_chunk(chunk2, false) .expect("should process chunk2"); - assert_eq!(result2.len(), 0, "Should return empty for non-last chunk"); - // Test that last chunk processes everything let chunk3 = b""; let result3 = adapter .process_chunk(chunk3, true) .expect("should process final chunk"); + + // Concatenate all outputs and verify the final HTML is correct + let mut all_output = result1; + all_output.extend_from_slice(&result2); + all_output.extend_from_slice(&result3); + assert!( - !result3.is_empty(), - "Should return processed content for last chunk" + !all_output.is_empty(), + "should produce non-empty concatenated output" ); - let output = String::from_utf8(result3).expect("output should be valid UTF-8"); - assert!(output.contains("replaced"), "Should have replaced content"); - assert!(output.contains(""), "Should have complete HTML"); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains("replaced"), + "should have replaced content in concatenated output" + ); + assert!( + output.contains(""), + "should have complete HTML in concatenated output" + ); } #[test] @@ -590,59 +633,294 @@ mod tests { } large_html.push_str(""); - // Process in chunks + // Process in chunks and collect all output let chunk_size = 1024; let bytes = large_html.as_bytes(); - let mut chunks = bytes.chunks(chunk_size); - let mut last_chunk = chunks.next().unwrap_or(&[]); + let mut chunks = bytes.chunks(chunk_size).peekable(); + let mut all_output = Vec::new(); - for chunk in chunks { + while let Some(chunk) = chunks.next() { + let is_last = chunks.peek().is_none(); let result = adapter - .process_chunk(last_chunk, false) - .expect("should process intermediate chunk"); - assert_eq!(result.len(), 0, "Intermediate chunks should return empty"); - last_chunk = chunk; + .process_chunk(chunk, is_last) + .expect("should process chunk"); + all_output.extend_from_slice(&result); } - // Process last chunk - let result = adapter - .process_chunk(last_chunk, true) - .expect("should process last chunk"); - assert!(!result.is_empty(), "Last chunk should return content"); + assert!( + !all_output.is_empty(), + "should produce non-empty output for large document" + ); - let output = String::from_utf8(result).expect("output should be valid UTF-8"); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); assert!( output.contains("Paragraph 999"), - "Should contain all content" + "should contain all content from large document" ); } #[test] - fn test_html_rewriter_adapter_reset() { + fn test_html_rewriter_adapter_reset_then_finalize() { use lol_html::Settings; let settings = Settings::default(); let mut adapter = HtmlRewriterAdapter::new(settings); - // Process some content - adapter - .process_chunk(b"", false) - .expect("should process html tag"); - adapter - .process_chunk(b"test", false) - .expect("should process body"); + let result1 = adapter + .process_chunk(b"test", false) + .expect("should process html"); - // Reset should clear accumulated input + // reset() is a documented no-op — adapter is single-use adapter.reset(); - // After reset, adapter should be ready for new input - let result = adapter - .process_chunk(b"

new

", true) - .expect("should process new content after reset"); - let output = String::from_utf8(result).expect("output should be valid UTF-8"); + // Finalize still works; the rewriter is still alive + let result2 = adapter + .process_chunk(b"", true) + .expect("should finalize after reset"); + + let mut all_output = result1; + all_output.extend_from_slice(&result2); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains("test"), + "should produce correct output despite no-op reset" + ); + } + + #[test] + fn test_deflate_round_trip_produces_valid_output() { + // Verify that deflate-to-deflate produces valid output that decompresses + // correctly, confirming that encoder finalization works. + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + use std::io::{Read as _, Write as _}; + + let input_data = b"hello world"; + + // Compress input + let mut compressed_input = Vec::new(); + { + let mut enc = ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Deflate, + output_compression: Compression::Deflate, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process deflate-to-deflate"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + ZlibDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through deflate round-trip" + ); + } + + #[test] + fn test_gzip_to_gzip_produces_correct_output() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + use std::io::{Read as _, Write as _}; + + // Arrange + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + // Act + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-gzip"); + + // Assert + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through gzip round-trip" + ); + } + + #[test] + fn test_gzip_to_none_produces_correct_output() { + use flate2::write::GzEncoder; + use std::io::Write as _; + + // Arrange + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::None, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + // Act + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-none"); + + // Assert + let result = String::from_utf8(output).expect("should be valid UTF-8 uncompressed output"); assert_eq!( - output, "

new

", - "Should only contain new input after reset" + result, "hi world", + "should have replaced content after gzip decompression" + ); + } + + #[test] + fn test_brotli_round_trip_produces_valid_output() { + use brotli::enc::writer::CompressorWriter; + use brotli::Decompressor; + use std::io::{Read as _, Write as _}; + + let input_data = b"hello world"; + + // Compress input with brotli + let mut compressed_input = Vec::new(); + { + let mut enc = CompressorWriter::new(&mut compressed_input, 4096, 4, 22); + enc.write_all(input_data) + .expect("should compress test input"); + enc.flush().expect("should flush brotli encoder"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Brotli, + output_compression: Compression::Brotli, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process brotli-to-brotli"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + Decompressor::new(&output[..], 4096) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through brotli round-trip" + ); + } + + #[test] + fn test_html_rewriter_adapter_emits_output_per_chunk() { + use lol_html::Settings; + + let settings = Settings::default(); + let mut adapter = HtmlRewriterAdapter::new(settings); + + // Send three chunks — lol_html may buffer internally, so individual + // chunk outputs may vary by version. The contract is that concatenated + // output is correct, and that output is not deferred entirely to is_last. + let result1 = adapter + .process_chunk(b"", false) + .expect("should process chunk1"); + let result2 = adapter + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let result3 = adapter + .process_chunk(b"", true) + .expect("should process final chunk"); + + // At least one intermediate chunk should produce output (verifies + // we're not deferring everything to is_last like the old adapter). + assert!( + !result1.is_empty() || !result2.is_empty(), + "should emit some output before is_last" + ); + + // Concatenated output must be correct + let mut all_output = result1; + all_output.extend_from_slice(&result2); + all_output.extend_from_slice(&result3); + + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains(""), + "should contain html tag in concatenated output" + ); + assert!( + output.contains("

hello

"), + "should contain paragraph in concatenated output" + ); + assert!( + output.contains(""), + "should contain closing html tag in concatenated output" ); } @@ -683,4 +961,61 @@ mod tests { "Should not contain original URL" ); } + + #[test] + fn test_gzip_pipeline_with_html_rewriter() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + use lol_html::{element, Settings}; + use std::io::{Read as _, Write as _}; + + let settings = Settings { + element_content_handlers: vec![element!("a[href]", |el| { + if let Some(href) = el.get_attribute("href") { + if href.contains("example.com") { + el.set_attribute("href", &href.replace("example.com", "test.com"))?; + } + } + Ok(()) + })], + ..Settings::default() + }; + + let input = b"Link"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input).expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let adapter = HtmlRewriterAdapter::new(settings); + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + let mut pipeline = StreamingPipeline::new(config, adapter); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("pipeline should process gzip HTML"); + + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output"); + + let result = String::from_utf8(decompressed).expect("output should be valid UTF-8"); + assert!( + result.contains("https://test.com"), + "should have replaced URL through gzip HTML pipeline" + ); + assert!( + !result.contains("example.com"), + "should not contain original URL after gzip HTML pipeline" + ); + } } diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md new file mode 100644 index 00000000..4afca7fe --- /dev/null +++ b/docs/superpowers/plans/2026-03-25-streaming-response.md @@ -0,0 +1,991 @@ +# Streaming Response Optimization — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use +> superpowers:subagent-driven-development (recommended) or +> superpowers:executing-plans to implement this plan task-by-task. Steps use +> checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Stream HTTP responses through the publisher proxy instead of buffering +them, reducing peak memory from ~4x response size to constant and improving +time-to-last-byte. + +**Architecture:** Two independent phases. Phase 1 makes the internal streaming +pipeline truly chunk-emitting (HtmlRewriterAdapter, compression paths, encoder +finalization). Phase 2 wires up Fastly's `StreamingBody` API so processed chunks +flow directly to the client. Each phase is shippable independently. + +**Tech Stack:** Rust 1.91.1, Fastly Compute SDK 0.11.12 +(`stream_to_client`/`send_to_client`/`StreamingBody`), `lol_html` (HTML +rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression). + +**Spec:** `docs/superpowers/specs/2026-03-25-streaming-response-design.md` +**Issue:** #563 + +--- + +## File Map + +| File | Role | Phase | +|------|------|-------| +| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 | +| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 | + +--- + +## Phase 1: Make the Pipeline Chunk-Emitting + +> **Implementation note (2026-03-26):** Tasks 1-3 were implemented as planned, +> then followed by a refactor that unified all 9 `process_*_to_*` methods into +> a single `process_chunks` method with inline decoder/encoder creation in +> `process()`. This eliminated ~150 lines of duplication. The refactor was +> committed as "Unify compression paths into single process_chunks method". +> Tasks 1-3 descriptions below reflect the original plan; the final code is +> cleaner than described. + +### Task 1: Fix encoder finalization in `process_through_compression` + +This is the prerequisite for Task 2. The current code calls `flush()` then +`drop(encoder)`, silently swallowing finalization errors. Must fix before +moving gzip to this path. + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:334-393` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +- [ ] **Step 1: Write a test verifying deflate round-trip correctness** + +Add to the `#[cfg(test)]` module at the bottom of +`streaming_processor.rs`: + +```rust +#[test] +fn test_deflate_round_trip_produces_valid_output() { + // Verify that deflate-to-deflate (which uses process_through_compression) + // produces valid output that decompresses correctly. This establishes the + // correctness contract before we change the finalization path. + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let input_data = b"hello world"; + + // Compress input + let mut compressed_input = Vec::new(); + { + let mut enc = ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Deflate, + output_compression: Compression::Deflate, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process deflate-to-deflate"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + ZlibDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through deflate round-trip" + ); +} +``` + +- [ ] **Step 2: Run test to verify it passes (baseline)** + +Run: `cargo test --package trusted-server-core test_deflate_round_trip_produces_valid_output` + +Expected: PASS (current code happens to work for this case since +`ZlibEncoder::drop` calls `finish` internally — the test establishes the +contract). + +- [ ] **Step 3: Change `process_through_compression` to take `&mut W` and remove `drop(encoder)`** + +`finish()` is not on the `Write` trait — each encoder type +(`GzEncoder`, `ZlibEncoder`, `CompressorWriter`) has its own `finish()`. +The fix: change the signature to take `&mut W` so the caller retains +ownership and calls `finish()` explicitly. + +Change signature (line 335-338): + +```rust + fn process_through_compression( + &mut self, + mut decoder: R, + encoder: &mut W, + ) -> Result<(), Report> { +``` + +Replace lines 383-393 (the `flush` + `drop` block): + +```rust + encoder.flush().change_context(TrustedServerError::Proxy { + message: "Failed to flush encoder".to_string(), + })?; + + // Caller owns encoder and must call finish() after this returns. + Ok(()) + } +``` + +Then update `process_deflate_to_deflate` (lines 276-289): + +```rust + fn process_deflate_to_deflate( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let decoder = ZlibDecoder::new(input); + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize deflate encoder".to_string(), + })?; + Ok(()) + } +``` + +And update `process_brotli_to_brotli` (lines 303-321): + +```rust + fn process_brotli_to_brotli( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use brotli::enc::writer::CompressorWriter; + use brotli::enc::BrotliEncoderParams; + use brotli::Decompressor; + + let decoder = Decompressor::new(input, 4096); + let mut params = BrotliEncoderParams::default(); + params.quality = 4; + params.lgwin = 22; + let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); + self.process_through_compression(decoder, &mut encoder)?; + // CompressorWriter finalizes on flush (already called) and into_inner + encoder.into_inner(); + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All existing tests pass plus the new one. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Fix encoder finalization: explicit finish instead of drop" +``` + +--- + +### Task 2: Convert `process_gzip_to_gzip` to chunk-based processing + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:183-225` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +- [ ] **Step 1: Write a test for gzip chunk-based round-trip** + +```rust +#[test] +fn test_gzip_to_gzip_produces_correct_output() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let input_data = b"hello world"; + + // Compress input as gzip + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-gzip"); + + // Decompress and verify + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress gzip output"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through gzip round-trip" + ); +} +``` + +- [ ] **Step 2: Run test to verify it passes (baseline)** + +Run: `cargo test --package trusted-server-core test_gzip_to_gzip_produces_correct_output` + +Expected: PASS (current code works, just buffers everything). + +- [ ] **Step 3: Rewrite `process_gzip_to_gzip` to use `process_through_compression`** + +Replace `process_gzip_to_gzip` (lines 183-225): + +```rust + fn process_gzip_to_gzip( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let decoder = GzDecoder::new(input); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize gzip encoder".to_string(), + })?; + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Convert process_gzip_to_gzip to chunk-based processing" +``` + +--- + +### Task 3: Convert `decompress_and_process` to chunk-based processing + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:227-262` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +Note: the `*_to_none` callers (`process_gzip_to_none`, +`process_deflate_to_none`, `process_brotli_to_none` at lines 264-332) do +not need changes — they call `decompress_and_process` with the same +signature. + +- [ ] **Step 1: Write a test for gzip-to-none chunk-based processing** + +```rust +#[test] +fn test_gzip_to_none_produces_correct_output() { + use flate2::write::GzEncoder; + + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::None, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-none"); + + assert_eq!( + String::from_utf8(output).expect("should be valid UTF-8"), + "hi world", + "should have replaced content and output uncompressed" + ); +} +``` + +- [ ] **Step 2: Run test to verify baseline** + +Run: `cargo test --package trusted-server-core test_gzip_to_none_produces_correct_output` + +Expected: PASS. + +- [ ] **Step 3: Rewrite `decompress_and_process` to use chunk loop** + +Replace `decompress_and_process` (lines 227-262) with a chunk-based +version that mirrors `process_uncompressed`: + +```rust + fn decompress_and_process( + &mut self, + mut decoder: R, + mut output: W, + _codec_name: &str, + ) -> Result<(), Report> { + let mut buffer = vec![0u8; self.config.chunk_size]; + + loop { + match decoder.read(&mut buffer) { + Ok(0) => { + let final_chunk = + self.processor.process_chunk(&[], true).change_context( + TrustedServerError::Proxy { + message: "Failed to process final chunk".to_string(), + }, + )?; + if !final_chunk.is_empty() { + output.write_all(&final_chunk).change_context( + TrustedServerError::Proxy { + message: "Failed to write final chunk".to_string(), + }, + )?; + } + break; + } + Ok(n) => { + let processed = self + .processor + .process_chunk(&buffer[..n], false) + .change_context(TrustedServerError::Proxy { + message: "Failed to process chunk".to_string(), + })?; + if !processed.is_empty() { + output.write_all(&processed).change_context( + TrustedServerError::Proxy { + message: "Failed to write processed chunk".to_string(), + }, + )?; + } + } + Err(e) => { + return Err(Report::new(TrustedServerError::Proxy { + message: format!("Failed to read from decoder: {e}"), + })); + } + } + } + + output.flush().change_context(TrustedServerError::Proxy { + message: "Failed to flush output".to_string(), + })?; + + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Convert decompress_and_process to chunk-based processing" +``` + +--- + +### Task 4: Rewrite `HtmlRewriterAdapter` for incremental streaming + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:396-472` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +Important context: `create_html_processor` in `html_processor.rs` returns +`HtmlWithPostProcessing`, which wraps `HtmlRewriterAdapter`. The wrapper's +`process_chunk` (line 31-34 of `html_processor.rs`) returns intermediate +output immediately for `!is_last` chunks — it passes through, not +swallows. When the post-processor list is empty (streaming gate condition), +the wrapper is a no-op passthrough. No changes needed to +`html_processor.rs`. + +- [ ] **Step 1: Write a test proving incremental output** + +```rust +#[test] +fn test_html_rewriter_adapter_emits_output_per_chunk() { + use lol_html::Settings; + + let settings = Settings::default(); + let mut adapter = HtmlRewriterAdapter::new(settings); + + // First chunk should produce output (not empty) + let result1 = adapter + .process_chunk(b"", false) + .expect("should process chunk 1"); + assert!( + !result1.is_empty(), + "should emit output for non-last chunk, got empty" + ); + + // Second chunk should also produce output + let result2 = adapter + .process_chunk(b"

hello

", false) + .expect("should process chunk 2"); + assert!( + !result2.is_empty(), + "should emit output for second non-last chunk, got empty" + ); + + // Final chunk + let result3 = adapter + .process_chunk(b"", true) + .expect("should process final chunk"); + + // Concatenated output should be the full document + let mut full_output = result1; + full_output.extend_from_slice(&result2); + full_output.extend_from_slice(&result3); + let output_str = String::from_utf8(full_output).expect("should be valid UTF-8"); + assert!( + output_str.contains("") && output_str.contains("hello"), + "should contain complete document, got: {output_str}" + ); +} +``` + +- [ ] **Step 2: Run test to verify it fails (current code returns empty for non-last chunks)** + +Run: `cargo test --package trusted-server-core test_html_rewriter_adapter_emits_output_per_chunk` + +Expected: FAIL — assertion `should emit output for non-last chunk` fails. + +- [ ] **Step 3: Rewrite `HtmlRewriterAdapter` to stream incrementally** + +Replace the struct and impl (lines 396-472): + +```rust +/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor`. +/// +/// Creates the rewriter eagerly and emits output on every `process_chunk` +/// call. Single-use: `reset()` is a no-op since `Settings` are consumed +/// by the rewriter constructor. +pub struct HtmlRewriterAdapter { + rewriter: Option>, + output: Rc>>, +} + +/// Output sink that appends to a shared `Vec`. +struct RcVecSink(Rc>>); + +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); + } +} + +impl HtmlRewriterAdapter { + /// Create a new HTML rewriter adapter. + /// + /// The rewriter is created immediately, consuming the settings. + #[must_use] + pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); + Self { + rewriter: Some(rewriter), + output, + } + } +} + +impl StreamProcessor for HtmlRewriterAdapter { + fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { + if let Some(rewriter) = &mut self.rewriter { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + + if is_last { + if let Some(rewriter) = self.rewriter.take() { + rewriter.end().map_err(|e| { + log::error!("Failed to finalize HTML: {e}"); + io::Error::other(format!("HTML finalization failed: {e}")) + })?; + } + } + + // Drain whatever lol_html produced since last call. + // Safe: sink borrow released before we borrow here. + Ok(std::mem::take(&mut *self.output.borrow_mut())) + } + + fn reset(&mut self) { + // No-op: rewriter consumed Settings on construction. + // Single-use by design (one per request). + } +} +``` + +Add these imports at the top of `streaming_processor.rs`: + +```rust +use std::cell::RefCell; +use std::rc::Rc; +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: The new per-chunk test passes. Some existing tests that assert +"intermediate chunks return empty" will now fail and need updating. + +- [ ] **Step 5: Update existing tests for new behavior** + +Update `test_html_rewriter_adapter_accumulates_until_last` — it currently +asserts empty output for non-last chunks. Change assertions to expect +non-empty intermediate output and verify the concatenated result. + +Update `test_html_rewriter_adapter_handles_large_input` — same: remove +assertions that intermediate chunks are empty. + +Update `test_html_rewriter_adapter_reset` — `reset()` is now a no-op. +Remove or update this test since the adapter is single-use. + +- [ ] **Step 6: Run all tests again** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 7: Run clippy** + +Run: `cargo clippy --workspace --all-targets --all-features -- -D warnings` + +Expected: No warnings. + +- [ ] **Step 8: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Rewrite HtmlRewriterAdapter for incremental lol_html streaming" +``` + +--- + +### Task 5: Phase 1 full verification + +- [ ] **Step 1: Run full test suite** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 2: Run JS tests** + +Run: `cd crates/js/lib && npx vitest run` + +Expected: All tests pass. + +- [ ] **Step 3: Run clippy and fmt** + +Run: `cargo clippy --workspace --all-targets --all-features -- -D warnings && cargo fmt --all -- --check` + +Expected: Clean. + +- [ ] **Step 4: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +--- + +## Phase 2: Stream Response to Client + +### Task 6: Migrate entry point from `#[fastly::main]` to raw `main()` + +**Files:** +- Modify: `crates/trusted-server-adapter-fastly/src/main.rs:32-68` + +- [ ] **Step 1: Rewrite `main` function** + +Replace lines 32-68: + +```rust +fn main() { + init_logger(); + + let req = Request::from_client(); + + // Health probe: independent from settings/routing. + if req.get_method() == Method::GET && req.get_path() == "/health" { + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; + } + + let settings = match get_settings() { + Ok(s) => s, + Err(e) => { + log::error!("Failed to load settings: {:?}", e); + to_error_response(&e).send_to_client(); + return; + } + }; + log::debug!("Settings {settings:?}"); + + let orchestrator = build_orchestrator(&settings); + + let integration_registry = match IntegrationRegistry::new(&settings) { + Ok(r) => r, + Err(e) => { + log::error!("Failed to create integration registry: {:?}", e); + to_error_response(&e).send_to_client(); + return; + } + }; + + let response = futures::executor::block_on(route_request( + &settings, + &orchestrator, + &integration_registry, + req, + )); + + match response { + Ok(resp) => resp.send_to_client(), + Err(e) => to_error_response(&e).send_to_client(), + } +} +``` + +- [ ] **Step 2: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 3: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +- [ ] **Step 4: Commit** + +``` +git add crates/trusted-server-adapter-fastly/src/main.rs +git commit -m "Migrate entry point from #[fastly::main] to raw main()" +``` + +--- + +### Task 7: Refactor `process_response_streaming` to accept `W: Write` + +**Files:** +- Modify: `crates/trusted-server-core/src/publisher.rs:97-180` + +- [ ] **Step 1: Change signature to accept generic writer** + +Change `process_response_streaming` from returning `Body` to writing into +a generic `W: Write`: + +```rust +fn process_response_streaming( + body: Body, + output: &mut W, + params: &ProcessResponseParams, +) -> Result<(), Report> { +``` + +Remove `let mut output = Vec::new();` (line 117) and +`Ok(Body::from(output))` (line 179). The caller passes the output writer. + +- [ ] **Step 2: Update the call site in `handle_publisher_request`** + +In `handle_publisher_request`, replace the current call (lines 338-341): + +```rust +// Before: +match process_response_streaming(body, ¶ms) { + Ok(processed_body) => { + response.set_body(processed_body); + +// After: +let mut output = Vec::new(); +match process_response_streaming(body, &mut output, ¶ms) { + Ok(()) => { + response.set_body(Body::from(output)); +``` + +This preserves existing behavior — the buffered path still works. + +- [ ] **Step 3: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass (behavior unchanged). + +- [ ] **Step 4: Commit** + +``` +git add crates/trusted-server-core/src/publisher.rs +git commit -m "Refactor process_response_streaming to accept generic writer" +``` + +--- + +### Task 8: Add streaming path to publisher proxy + +**Files:** +- Modify: `crates/trusted-server-core/src/publisher.rs` +- Modify: `crates/trusted-server-adapter-fastly/src/main.rs` + +This is the core change. `handle_publisher_request` needs to support two +modes: buffered (returns `Response`) and streaming (sends response directly +via `StreamingBody`). The streaming path requires access to Fastly-specific +types (`StreamingBody`, `send_to_client`), but `publisher.rs` lives in +`trusted-server-core` which is platform-agnostic. + +**Approach:** Add a `ResponseMode` enum or callback that +`handle_publisher_request` uses to decide how to send the response. The +simplest approach: split into a preparation phase (returns headers + body +stream + processing params) and a send phase (in the fastly adapter). + +Alternatively, since `StreamingPipeline::process` already takes `W: Write`, +the adapter can call `process_response_streaming` with a `StreamingBody` +directly. The key is that the adapter needs to: + +1. Call `handle_publisher_request` logic up to the point of body processing +2. Decide buffered vs streaming +3. Either buffer or stream + +This task is complex — the implementer should read the spec's Step 2 +carefully and adapt the approach to minimize changes. The plan provides the +structure; exact code depends on how the publisher function is decomposed. + +- [ ] **Step 1: Export `finalize_response` or its logic for use before streaming** + +In `main.rs`, make `finalize_response` callable from the publisher path. +Either make it `pub` and move to `trusted-server-core`, or pass a +pre-finalized response to the streaming path. + +- [ ] **Step 2: Add streaming gate check** + +Add a helper in `publisher.rs`: + +```rust +fn should_stream( + status: u16, + content_type: &str, + integration_registry: &IntegrationRegistry, +) -> bool { + if !(200..300).contains(&status) { + return false; + } + // Only html_post_processors gate streaming — NOT script_rewriters. + // Script rewriters (Next.js, GTM) run inside lol_html element handlers + // during streaming and do not require full-document buffering. + // Currently only Next.js registers a post-processor. + let is_html = content_type.contains("text/html"); + if is_html && !integration_registry.html_post_processors().is_empty() { + return false; + } + true +} +``` + +- [ ] **Step 3: Restructure `handle_publisher_request` to support streaming** + +Split the function into: +1. Pre-processing: request info, cookies, synthetic ID, consent, backend + request — everything before `response.take_body()` +2. Header finalization: synthetic ID/cookie headers, `finalize_response()` + headers, Content-Length removal +3. Body processing: either buffered (`Vec`) or streaming + (`StreamingBody`) + +The streaming path in the fastly adapter: +```rust +// After header finalization, before body processing: +if should_stream { + let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + let mut streaming_body = response.stream_to_client(); + + match process_response_streaming(body, &mut streaming_body, ¶ms) { + Ok(()) => { + streaming_body.finish() + .expect("should finish streaming body"); + } + Err(e) => { + log::error!("Streaming processing failed: {:?}", e); + // StreamingBody dropped → client sees abort + } + } +} else { + // Existing buffered path +} +``` + +- [ ] **Step 4: Handle binary pass-through in streaming path** + +For non-text content when streaming is enabled: + +```rust +if !should_process { + let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + let mut streaming_body = response.stream_to_client(); + io::copy(&mut body, &mut streaming_body) + .expect("should copy body to streaming output"); + streaming_body.finish() + .expect("should finish streaming body"); +} +``` + +- [ ] **Step 5: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 6: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +- [ ] **Step 7: Commit** + +``` +git add crates/trusted-server-core/src/publisher.rs \ + crates/trusted-server-adapter-fastly/src/main.rs +git commit -m "Add streaming response path for publisher proxy" +``` + +--- + +### Task 9: Phase 2 full verification + +- [ ] **Step 1: Run full test suite** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 2: Run clippy, fmt, JS tests** + +```bash +cargo clippy --workspace --all-targets --all-features -- -D warnings +cargo fmt --all -- --check +cd crates/js/lib && npx vitest run +``` + +Expected: All clean. + +- [ ] **Step 3: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds. + +- [ ] **Step 4: Manual verification with Viceroy** + +Run: `fastly compute serve` + +Test: +- `curl -s http://localhost:7676/ | sha256sum` — compare with baseline +- `curl -sI http://localhost:7676/` — verify headers present (geo, version, + synthetic ID cookie if consent configured) +- `curl -s http://localhost:7676/static/tsjs=tsjs-unified.min.js` — verify + static routes still work via `send_to_client` + +- [ ] **Step 5: Chrome DevTools MCP performance capture** + +Follow the measurement protocol in the spec's "Performance measurement via +Chrome DevTools MCP" section. Compare against baseline captured on `main`. + +--- + +### Task 10: Chrome DevTools MCP baseline + comparison + +- [ ] **Step 1: Capture baseline on `main`** + +Follow spec section "Baseline capture" — use `navigate_page`, +`list_network_requests`, `lighthouse_audit`, `performance_start_trace` / +`performance_stop_trace`, `performance_analyze_insight`, +`take_memory_snapshot`. Record median TTFB, TTLB, LCP, Speed Index across +5 runs. + +- [ ] **Step 2: Capture metrics on feature branch** + +Repeat the same measurements after building the feature branch. + +- [ ] **Step 3: Compare and document results** + +Create a comparison table and save to PR description or a results file. +Check for: +- TTLB improvement (primary goal) +- No TTFB regression +- Identical response body hash (correctness) +- LCP/Speed Index improvement (secondary) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md new file mode 100644 index 00000000..034624b5 --- /dev/null +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -0,0 +1,367 @@ +# Streaming Response Optimization (Next.js Disabled) + +## Problem + +When Next.js is disabled, the publisher proxy buffers the entire response body +in memory before sending any bytes to the client. This creates two costs: + +1. **Latency** — The client receives zero bytes until the full response is + decompressed, rewritten, and recompressed. For a 222KB HTML page, this adds + hundreds of milliseconds to time-to-last-byte. +2. **Memory** — Peak memory holds ~4x the response size simultaneously + (compressed input + decompressed + processed output + recompressed output). + With WASM's ~16MB heap, this limits the size of pages we can proxy. + +## Scope + +**In scope**: All content types flowing through the publisher proxy path — HTML, +text/JSON, RSC Flight (`text/x-component`), and binary pass-through. Only when +Next.js is disabled (no post-processor requiring the full document). + +**Out of scope**: Concurrent origin+auction fetch, Next.js-enabled paths (these +require full-document post-processing by design), non-publisher routes (static +JS, auction, discovery). + +## Streaming Gate + +Before committing to `stream_to_client()`, check: + +1. Backend status is success (2xx). +2. For HTML content: `html_post_processors()` is empty — no registered + post-processors. Non-HTML content types (text/JSON, RSC Flight, binary) can + always stream regardless of post-processor registration, since + post-processors only apply to HTML. + +If either check fails for the given content type, fall back to the current +buffered path. This keeps the optimization transparent: same behavior for all +existing configurations, streaming only activates when safe. + +## Architecture + +Two implementation steps, each independently valuable and testable. + +### Step 1: Make the pipeline chunk-emitting + +Three changes to existing processors: + +#### A) `HtmlRewriterAdapter` — incremental streaming + +The current implementation accumulates the entire HTML document and processes it +on `is_last`. This is unnecessary — `lol_html::HtmlRewriter` supports +incremental `write()` calls and emits output via its `OutputSink` callback after +each chunk. + +Fix: create the rewriter eagerly in the constructor, use +`Rc>>` to share the output buffer between the sink and +`process_chunk()`, drain the buffer on every call instead of only on `is_last`. +The output buffer is drained _after_ each `rewriter.write()` returns, so the +`RefCell` borrow in the sink closure never overlaps with the drain borrow. + +Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op +since the `Settings` are consumed by the rewriter constructor. This matches +actual usage (one adapter per request). + +#### B) Chunk-based decompression for all compression paths + +`process_gzip_to_gzip` calls `read_to_end()` to decompress the entire body into +memory. The deflate and brotli keep-compression paths already use chunk-based +`process_through_compression()`. + +Fix: use the same `process_through_compression` pattern for gzip. + +Additionally, `decompress_and_process()` (used by `process_gzip_to_none`, +`process_deflate_to_none`, `process_brotli_to_none`) also calls +`read_to_end()`. These strip-compression paths must be converted to chunk-based +processing too — read decompressed chunks, process each, write uncompressed +output directly. + +Reference: `process_uncompressed` already implements the correct chunk-based +pattern (read loop → `process_chunk()` per chunk → `write_all()` → flush). The +compressed paths should follow the same structure. + +#### C) `process_through_compression` finalization — prerequisite for B + +`process_through_compression` currently calls `flush()` (with error +propagation) then `drop(encoder)` for finalization. The `flush()` only flushes +buffered data but does not write compression trailers/footers — `drop()` +handles finalization but silently swallows errors. Today this affects deflate +and brotli (which already use this path). The current `process_gzip_to_gzip` calls `encoder.finish()` explicitly — +but Step 1B moves gzip to `process_through_compression`, which would **regress** +gzip from working `finish()` to broken `drop()`. This fix prevents that +regression and also fixes the pre-existing issue for deflate/brotli. + +Fix: call `encoder.finish()` explicitly and propagate errors. This must land +before or with Step 1B. + +### Step 2: Stream response to client + +Change the publisher proxy path to use Fastly's `StreamingBody` API: + +1. Fetch from origin, receive response headers. +2. Validate status — if backend error, return buffered error response via + `send_to_client()`. +3. Check streaming gate — if `html_post_processors()` is non-empty, fall back + to buffered path. +4. Finalize all response headers. This requires reordering two things: + - **Synthetic ID/cookie headers**: today set _after_ body processing in + `handle_publisher_request`. Since they are body-independent (computed from + request cookies and consent context), move them _before_ streaming. + - **`finalize_response()`** (main.rs): today called _after_ `route_request` + returns, adding geo, version, staging, and operator headers. In the + streaming path, this must run _before_ `stream_to_client()` since the + publisher handler sends the response directly instead of returning it. +5. Remove `Content-Length` header — the final size is unknown after processing. + Fastly's `StreamingBody` sends the response using chunked transfer encoding + automatically. +6. Call `response.stream_to_client()` — headers sent to client immediately. +7. Pipe origin body through the streaming pipeline, writing chunks directly to + `StreamingBody`. +8. Call `finish()` on success; on error, log and drop (client sees truncated + response). + +For binary/non-text content: call `response.take_body()` then stream via +`io::copy(&mut body, &mut streaming_body)`. The `Body` type implements `Read` +and `StreamingBody` implements `Write`, so this streams the backend body to the +client without buffering the full content. Today binary responses skip +`take_body()` and return the response as-is — the streaming path needs to +explicitly take the body to pipe it through. + +#### Entry point change + +Migrate `main.rs` from `#[fastly::main]` to an undecorated `main()` with +`Request::from_client()`. No separate initialization call is needed — +`#[fastly::main]` is just syntactic sugar for `Request::from_client()` + +`Response::send_to_client()`. The migration is required because +`stream_to_client()` / `send_to_client()` are incompatible with +`#[fastly::main]`'s return-based model. + +```rust +fn main() { + let req = Request::from_client(); + match handle(req) { + Ok(()) => {} + Err(e) => to_error_response(&e).send_to_client(), + } +} +``` + +Note: the return type changes from `Result` to `()` (or +`Result<(), Error>`). Errors that currently propagate to `main`'s `Result` must +now be caught explicitly and sent via `send_to_client()` with +`to_error_response()`. + +Non-streaming routes (static, auction, discovery) use `send_to_client()` as +before. + +## Data Flow + +### Streaming path (HTML, text/JSON with processing) + +``` +Origin body (gzip) + → Read 8KB chunk from GzDecoder + → StreamProcessor::process_chunk(chunk, is_last) + → HtmlRewriterAdapter: lol_html.write(chunk) → sink emits rewritten bytes + → OR StreamingReplacer: URL replacement with overlap buffer + → GzEncoder::write(processed_chunk) → compressed bytes + → StreamingBody::write(compressed) → chunk sent to client + → repeat until EOF + → StreamingBody::finish() +``` + +Memory at steady state: ~8KB input chunk buffer, lol_html internal parser state, +gzip encoder window, and overlap buffer for replacer. Roughly constant regardless +of document size, versus the current ~4x document size. + +### Pass-through path (binary, images, fonts, etc.) + +``` +Origin body (via take_body()) + → io::copy(&mut body, &mut streaming_body) → streamed transfer + → StreamingBody::finish() +``` + +No decompression, no processing. Body streams through as read. + +### Buffered fallback path (error responses or post-processors present) + +``` +Origin returns 4xx/5xx OR html_post_processors() is non-empty + → Current buffered path unchanged + → send_to_client() with proper status and full body +``` + +## Error Handling + +**Backend returns error status**: Detected before calling `stream_to_client()`. +Return the backend response as-is via `send_to_client()`. Client sees the +correct error status code. No change from current behavior. + +**Processor creation fails**: `create_html_stream_processor()` or pipeline +construction errors happen _before_ `stream_to_client()` is called. Since +headers have not been sent yet, return a proper error response via +`send_to_client()`. Same as current behavior. + +**Processing fails mid-stream**: `lol_html` parse error, decompression +corruption, I/O error during chunk processing. Headers (200 OK) are already +sent. Log the error server-side, drop the `StreamingBody`. Per the Fastly SDK, +`StreamingBody` automatically aborts the response if dropped without calling +`finish()` — the client sees a connection reset / truncated response. This is +standard reverse proxy behavior. + +**Compression finalization fails**: The gzip trailer CRC32 write fails. With the +fix, `encoder.finish()` is called explicitly and errors propagate. Same +mid-stream handling — log and truncate. + +No retry logic. No fallback to buffered after streaming has started — once +headers are sent, we are committed. + +## Files Changed + +| File | Change | Risk | +| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | +| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High | +| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium | + +**Minimal changes**: `html_processor.rs` now selects `HtmlRewriterAdapter` mode +based on script rewriter presence (see [Text Node Fragmentation](#text-node-fragmentation-phase-3)), +but is otherwise unchanged. Integration registration, JS build pipeline, tsjs +module serving, auction handler, cookie/synthetic ID logic are not changed. + +Note: `HtmlWithPostProcessing` wraps `HtmlRewriterAdapter` and applies +post-processors on `is_last`. In the streaming path the post-processor list is +empty (that's the gate condition), so the wrapper is a no-op passthrough. It +remains in place — no need to bypass it. + +Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from +`html_post_processors`. Script rewriters run inside `lol_html` element handlers +and currently require buffered mode because `lol_html` fragments text nodes +across chunk boundaries (see [Phase 3](#text-node-fragmentation-phase-3)). +`html_post_processors` require the full document for post-processing. +The streaming gate checks `html_post_processors().is_empty()` for the +post-processor path; `create_html_processor` separately gates the adapter mode +on `script_rewriters`. Currently only Next.js registers a post-processor. + +## Text Node Fragmentation (Phase 3) + +`lol_html` fragments text nodes across input chunk boundaries when processing +HTML incrementally. Script rewriters (`NextJsNextDataRewriter`, +`GoogleTagManagerIntegration`) expect complete text content — if a domain string +is split across chunks, the rewrite silently fails. + +**Phase 1 workaround**: `HtmlRewriterAdapter` has two modes. `new()` streams +per chunk (no script rewriters). `new_buffered()` accumulates input and +processes in one `write()` call (script rewriters registered). +`create_html_processor` selects the mode automatically. + +**Phase 3** will make each script rewriter fragment-safe by accumulating text +fragments internally via `is_last_in_text_node`. This removes the buffered +fallback and enables streaming for all configurations. See #584. + +## Rollback Strategy + +The `#[fastly::main]` to raw `main()` migration is a structural change. If +streaming causes issues in production, the fastest rollback is reverting the +`main.rs` change — the buffered path still exists and the pipeline improvements +(Step 1) are safe to keep regardless. No feature flag needed; a git revert of +the Step 2 commit restores buffered behavior while retaining Step 1 memory +improvements. + +## Testing Strategy + +### Unit tests (streaming_processor.rs) + +- `HtmlRewriterAdapter` emits output on every `process_chunk()` call, not just + `is_last`. +- `process_gzip_to_gzip` produces correct output without `read_to_end`. +- `encoder.finish()` errors propagate (not swallowed by `drop`). +- Multi-chunk HTML produces identical output to single-chunk processing. + +### Integration tests (publisher.rs) + +- Streaming gate: when `html_post_processors()` is non-empty, response is + buffered. +- Streaming gate: when `html_post_processors()` is empty, response streams. +- Backend error (4xx/5xx) returns buffered error response with correct status. +- Binary content passes through without processing. + +### End-to-end validation (Viceroy) + +- `cargo test --workspace` — all existing tests pass. +- Manual verification via `fastly compute serve` against a real origin. +- Compare response bodies before/after to confirm byte-identical output for + HTML, text, and binary. + +### Performance measurement via Chrome DevTools MCP + +Capture before/after metrics using Chrome DevTools MCP against Viceroy locally +and staging. Run each measurement set on `main` (baseline) and the feature +branch, then compare. + +#### Baseline capture (before — on `main`) + +1. Start local server: `fastly compute serve` +2. Navigate to publisher proxy URL via `navigate_page` +3. Capture network timing: + - `list_network_requests` — record TTFB (`responseStart - requestStart`), + total time (`responseEnd - requestStart`), and transfer size for the + document request + - Filter for the main document (`resourceType: Document`) +4. Run Lighthouse audit: + - `lighthouse_audit` with categories `["performance"]` + - Record TTFB, LCP, Speed Index, Total Blocking Time +5. Capture performance trace: + - `performance_start_trace` → load page → `performance_stop_trace` + - `performance_analyze_insight` — extract "Time to First Byte" and + "Network requests" insights +6. Take memory snapshot: + - `take_memory_snapshot` — record JS heap size as a secondary check + (WASM heap is measured separately via Fastly dashboard) +7. Repeat 3-5 times for stable medians + +#### Post-implementation capture (after — on feature branch) + +Repeat the same steps on the feature branch. Compare: + +| Metric | Source | Expected change | +|--------|--------|-----------------| +| TTFB (document) | Network timing | Minimal change (gated by backend response time) | +| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally | +| LCP | Lighthouse | Improved — browser receives `` resources sooner | +| Speed Index | Lighthouse | Improved — progressive rendering starts earlier | +| Transfer size | Network timing | Unchanged (same content, same compression) | +| Response body hash | `evaluate_script` with hash | Identical — correctness check | + +#### Automated comparison script + +Use `evaluate_script` to compute a response body hash in the browser for +correctness verification: + +```js +// Run via evaluate_script after page load +const response = await fetch(location.href); +const buffer = await response.arrayBuffer(); +const hash = await crypto.subtle.digest('SHA-256', buffer); +const hex = [...new Uint8Array(hash)].map(b => b.toString(16).padStart(2, '0')).join(''); +hex; // compare this between baseline and feature branch +``` + +#### What to watch for + +- **TTFB regression**: If TTFB increases, the header finalization reordering + may be adding latency. Investigate `finalize_response()` and synthetic ID + computation timing. +- **Body mismatch**: If response body hashes differ between baseline and + feature branch, the streaming pipeline is producing different output. + Bisect between Step 1 and Step 2 to isolate. +- **LCP unchanged**: If LCP doesn't improve, the `` content may not be + reaching the browser earlier. Check whether `lol_html` emits the `` + injection in the first chunk or buffers until more input arrives. + +### Measurement (post-deploy to staging) + +- Repeat Chrome DevTools MCP measurements against staging URL. +- Compare against Viceroy results to account for real network conditions. +- Monitor WASM heap usage via Fastly dashboard. +- Verify no regressions on static endpoints or auction.