From 6968201549cffceec314cb425575eea7344a35e9 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:08:05 -0700 Subject: [PATCH 01/20] Fix encoder finalization: explicit finish instead of drop --- .../src/streaming_processor.rs | 82 ++++++++++++++++--- 1 file changed, 69 insertions(+), 13 deletions(-) diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index cda62e6f..50c595d9 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -280,12 +280,14 @@ impl StreamingPipeline

{ ) -> Result<(), Report> { use flate2::read::ZlibDecoder; use flate2::write::ZlibEncoder; - use flate2::Compression; let decoder = ZlibDecoder::new(input); - let encoder = ZlibEncoder::new(output, Compression::default()); - - self.process_through_compression(decoder, encoder) + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize deflate encoder".to_string(), + })?; + Ok(()) } /// Process deflate compressed input to uncompressed output (decompression only) @@ -315,9 +317,11 @@ impl StreamingPipeline

{ lgwin: 22, ..Default::default() }; - let encoder = CompressorWriter::with_params(output, 4096, ¶ms); - - self.process_through_compression(decoder, encoder) + let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); + self.process_through_compression(decoder, &mut encoder)?; + // CompressorWriter finalizes on flush (already called) and into_inner + encoder.into_inner(); + Ok(()) } /// Process brotli compressed input to uncompressed output (decompression only) @@ -332,10 +336,14 @@ impl StreamingPipeline

{ } /// Generic processing through compression layers + /// + /// The caller retains ownership of `encoder` and must call its + /// type-specific finalization method (e.g., `finish()` or `into_inner()`) + /// after this function returns successfully. fn process_through_compression( &mut self, mut decoder: R, - mut encoder: W, + encoder: &mut W, ) -> Result<(), Report> { let mut buffer = vec![0u8; self.config.chunk_size]; @@ -380,15 +388,11 @@ impl StreamingPipeline

{ } } - // Flush encoder (this also finishes compression) encoder.flush().change_context(TrustedServerError::Proxy { message: "Failed to flush encoder".to_string(), })?; - // For GzEncoder and similar, we need to finish() to properly close the stream - // The flush above might not be enough - drop(encoder); - + // Caller owns encoder and must call finish() after this returns. Ok(()) } } @@ -646,6 +650,58 @@ mod tests { ); } + #[test] + fn test_deflate_round_trip_produces_valid_output() { + // Verify that deflate-to-deflate (which uses process_through_compression) + // produces valid output that decompresses correctly. This establishes the + // correctness contract before we change the finalization path. + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + use std::io::{Read as _, Write as _}; + + let input_data = b"hello world"; + + // Compress input + let mut compressed_input = Vec::new(); + { + let mut enc = + ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Deflate, + output_compression: Compression::Deflate, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process deflate-to-deflate"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + ZlibDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through deflate round-trip" + ); + } + #[test] fn test_streaming_pipeline_with_html_rewriter() { use lol_html::{element, Settings}; From a4fd5c69568fd815286e7c3946efd97472b62424 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:13:06 -0700 Subject: [PATCH 02/20] Convert process_gzip_to_gzip to chunk-based processing --- .../src/streaming_processor.rs | 85 ++++++++++++------- 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 50c595d9..accf80e2 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -187,40 +187,13 @@ impl StreamingPipeline

{ ) -> Result<(), Report> { use flate2::read::GzDecoder; use flate2::write::GzEncoder; - use flate2::Compression; - // Decompress input - let mut decoder = GzDecoder::new(input); - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: "Failed to decompress gzip".to_string(), - })?; - - log::info!("Decompressed size: {} bytes", decompressed.len()); - - // Process the decompressed content - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("Processed size: {} bytes", processed.len()); - - // Recompress the output - let mut encoder = GzEncoder::new(output, Compression::default()); - encoder - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write to gzip encoder".to_string(), - })?; + let decoder = GzDecoder::new(input); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; encoder.finish().change_context(TrustedServerError::Proxy { - message: "Failed to finish gzip encoder".to_string(), + message: "Failed to finalize gzip encoder".to_string(), })?; - Ok(()) } @@ -702,6 +675,56 @@ mod tests { ); } + #[test] + fn test_gzip_to_gzip_produces_correct_output() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + use std::io::{Read as _, Write as _}; + + // Arrange + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = + GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + // Act + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-gzip"); + + // Assert + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through gzip round-trip" + ); + } + #[test] fn test_streaming_pipeline_with_html_rewriter() { use lol_html::{element, Settings}; From a4f4a7c189eeeaa5a778eac958e4881c623aa8af Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:19:29 -0700 Subject: [PATCH 03/20] Convert decompress_and_process to chunk-based processing --- .../src/streaming_processor.rs | 114 ++++++++++++++---- 1 file changed, 89 insertions(+), 25 deletions(-) diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index accf80e2..5ea7aa5b 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -197,39 +197,58 @@ impl StreamingPipeline

{ Ok(()) } - /// Decompress input, process content, and write uncompressed output. + /// Decompress input, process content in chunks, and write uncompressed output. fn decompress_and_process( &mut self, mut decoder: R, mut output: W, codec_name: &str, ) -> Result<(), Report> { - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: format!("Failed to decompress {codec_name}"), - })?; - - log::info!( - "{codec_name} decompressed size: {} bytes", - decompressed.len() - ); - - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; + let mut buffer = vec![0u8; self.config.chunk_size]; - log::info!("{codec_name} processed size: {} bytes", processed.len()); + loop { + match decoder.read(&mut buffer) { + Ok(0) => { + let final_chunk = self.processor.process_chunk(&[], true).change_context( + TrustedServerError::Proxy { + message: format!("Failed to process final {codec_name} chunk"), + }, + )?; + if !final_chunk.is_empty() { + output.write_all(&final_chunk).change_context( + TrustedServerError::Proxy { + message: format!("Failed to write final {codec_name} chunk"), + }, + )?; + } + break; + } + Ok(n) => { + let processed = self + .processor + .process_chunk(&buffer[..n], false) + .change_context(TrustedServerError::Proxy { + message: format!("Failed to process {codec_name} chunk"), + })?; + if !processed.is_empty() { + output.write_all(&processed).change_context( + TrustedServerError::Proxy { + message: format!("Failed to write {codec_name} chunk"), + }, + )?; + } + } + Err(e) => { + return Err(Report::new(TrustedServerError::Proxy { + message: format!("Failed to read from {codec_name} decoder: {e}"), + })); + } + } + } - output - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write output".to_string(), - })?; + output.flush().change_context(TrustedServerError::Proxy { + message: format!("Failed to flush {codec_name} output"), + })?; Ok(()) } @@ -725,6 +744,51 @@ mod tests { ); } + #[test] + fn test_gzip_to_none_produces_correct_output() { + use flate2::write::GzEncoder; + use std::io::Write as _; + + // Arrange + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = + GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::None, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + // Act + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-none"); + + // Assert + let result = + String::from_utf8(output).expect("should be valid UTF-8 uncompressed output"); + assert_eq!( + result, "hi world", + "should have replaced content after gzip decompression" + ); + } + #[test] fn test_streaming_pipeline_with_html_rewriter() { use lol_html::{element, Settings}; From 105244c1dab0468c4155c220adf81da04b8c3264 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:23:49 -0700 Subject: [PATCH 04/20] Rewrite HtmlRewriterAdapter for incremental lol_html streaming --- .../src/streaming_processor.rs | 241 +++++++++++------- 1 file changed, 144 insertions(+), 97 deletions(-) diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 5ea7aa5b..20171b6a 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -6,6 +6,9 @@ //! - Memory-efficient streaming //! - UTF-8 boundary handling +use std::cell::RefCell; +use std::rc::Rc; + use error_stack::{Report, ResultExt}; use std::io::{self, Read, Write}; @@ -389,81 +392,70 @@ impl StreamingPipeline

{ } } -/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor` -/// Important: Due to `lol_html`'s ownership model, we must accumulate input -/// and process it all at once when the stream ends. This is a limitation -/// of the `lol_html` library's API design. +/// Shared output buffer used as an [`lol_html::OutputSink`]. +/// +/// The `HtmlRewriter` invokes [`OutputSink::handle_chunk`] synchronously during +/// each [`HtmlRewriter::write`] call, so the buffer is drained after every +/// `process_chunk` invocation to emit output incrementally. +struct RcVecSink(Rc>>); + +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); + } +} + +/// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`]. +/// +/// Output is emitted incrementally on every [`StreamProcessor::process_chunk`] call. +/// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`] +/// is a no-op because the rewriter consumes its settings on construction. pub struct HtmlRewriterAdapter { - settings: lol_html::Settings<'static, 'static>, - accumulated_input: Vec, + rewriter: Option>, + output: Rc>>, } impl HtmlRewriterAdapter { - /// Create a new HTML rewriter adapter + /// Create a new HTML rewriter adapter that streams output per chunk. #[must_use] pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); Self { - settings, - accumulated_input: Vec::new(), + rewriter: Some(rewriter), + output, } } } impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - // Accumulate input chunks - self.accumulated_input.extend_from_slice(chunk); - - if !chunk.is_empty() { - log::debug!( - "Buffering chunk: {} bytes, total buffered: {} bytes", - chunk.len(), - self.accumulated_input.len() - ); + if let Some(rewriter) = &mut self.rewriter { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } } - // Only process when we have all the input if is_last { - log::info!( - "Processing complete document: {} bytes", - self.accumulated_input.len() - ); - - // Process all accumulated input at once - let mut output = Vec::new(); - - // Create rewriter with output sink - let mut rewriter = lol_html::HtmlRewriter::new( - std::mem::take(&mut self.settings), - |chunk: &[u8]| { - output.extend_from_slice(chunk); - }, - ); - - // Process the entire document - rewriter.write(&self.accumulated_input).map_err(|e| { - log::error!("Failed to process HTML: {}", e); - io::Error::other(format!("HTML processing failed: {}", e)) - })?; - - // Finalize the rewriter - rewriter.end().map_err(|e| { - log::error!("Failed to finalize: {}", e); - io::Error::other(format!("HTML finalization failed: {}", e)) - })?; - - log::debug!("Output size: {} bytes", output.len()); - self.accumulated_input.clear(); - Ok(output) - } else { - // Return empty until we have all input - // This is a limitation of lol_html's API - Ok(Vec::new()) + if let Some(rewriter) = self.rewriter.take() { + rewriter.end().map_err(|e| { + log::error!("Failed to finalize HTML: {e}"); + io::Error::other(format!("HTML finalization failed: {e}")) + })?; + } } + + // Drain whatever lol_html produced since the last call + Ok(std::mem::take(&mut *self.output.borrow_mut())) } fn reset(&mut self) { - self.accumulated_input.clear(); + // No-op: the rewriter consumed its Settings on construction. + // Single-use by design (one adapter per request). } } @@ -530,7 +522,7 @@ mod tests { } #[test] - fn test_html_rewriter_adapter_accumulates_until_last() { + fn test_html_rewriter_adapter_streams_incrementally() { use lol_html::{element, Settings}; // Create a simple HTML rewriter that replaces text @@ -544,32 +536,40 @@ mod tests { let mut adapter = HtmlRewriterAdapter::new(settings); - // Test that intermediate chunks return empty let chunk1 = b""; let result1 = adapter .process_chunk(chunk1, false) .expect("should process chunk1"); - assert_eq!(result1.len(), 0, "Should return empty for non-last chunk"); let chunk2 = b"

original

"; let result2 = adapter .process_chunk(chunk2, false) .expect("should process chunk2"); - assert_eq!(result2.len(), 0, "Should return empty for non-last chunk"); - // Test that last chunk processes everything let chunk3 = b""; let result3 = adapter .process_chunk(chunk3, true) .expect("should process final chunk"); + + // Concatenate all outputs and verify the final HTML is correct + let mut all_output = result1; + all_output.extend_from_slice(&result2); + all_output.extend_from_slice(&result3); + assert!( - !result3.is_empty(), - "Should return processed content for last chunk" + !all_output.is_empty(), + "should produce non-empty concatenated output" ); - let output = String::from_utf8(result3).expect("output should be valid UTF-8"); - assert!(output.contains("replaced"), "Should have replaced content"); - assert!(output.contains(""), "Should have complete HTML"); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains("replaced"), + "should have replaced content in concatenated output" + ); + assert!( + output.contains(""), + "should have complete HTML in concatenated output" + ); } #[test] @@ -586,59 +586,59 @@ mod tests { } large_html.push_str(""); - // Process in chunks + // Process in chunks and collect all output let chunk_size = 1024; let bytes = large_html.as_bytes(); - let mut chunks = bytes.chunks(chunk_size); - let mut last_chunk = chunks.next().unwrap_or(&[]); + let mut chunks = bytes.chunks(chunk_size).peekable(); + let mut all_output = Vec::new(); - for chunk in chunks { + while let Some(chunk) = chunks.next() { + let is_last = chunks.peek().is_none(); let result = adapter - .process_chunk(last_chunk, false) - .expect("should process intermediate chunk"); - assert_eq!(result.len(), 0, "Intermediate chunks should return empty"); - last_chunk = chunk; + .process_chunk(chunk, is_last) + .expect("should process chunk"); + all_output.extend_from_slice(&result); } - // Process last chunk - let result = adapter - .process_chunk(last_chunk, true) - .expect("should process last chunk"); - assert!(!result.is_empty(), "Last chunk should return content"); + assert!( + !all_output.is_empty(), + "should produce non-empty output for large document" + ); - let output = String::from_utf8(result).expect("output should be valid UTF-8"); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); assert!( output.contains("Paragraph 999"), - "Should contain all content" + "should contain all content from large document" ); } #[test] - fn test_html_rewriter_adapter_reset() { + fn test_html_rewriter_adapter_reset_is_noop() { use lol_html::Settings; let settings = Settings::default(); let mut adapter = HtmlRewriterAdapter::new(settings); // Process some content - adapter - .process_chunk(b"", false) - .expect("should process html tag"); - adapter - .process_chunk(b"test", false) - .expect("should process body"); - - // Reset should clear accumulated input + let result1 = adapter + .process_chunk(b"test", false) + .expect("should process html"); + + // Reset is a no-op — the adapter is single-use by design adapter.reset(); - // After reset, adapter should be ready for new input - let result = adapter - .process_chunk(b"

new

", true) - .expect("should process new content after reset"); - let output = String::from_utf8(result).expect("output should be valid UTF-8"); - assert_eq!( - output, "

new

", - "Should only contain new input after reset" + // The rewriter is still alive; finalize it + let result2 = adapter + .process_chunk(b"", true) + .expect("should finalize after reset"); + + let mut all_output = result1; + all_output.extend_from_slice(&result2); + + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains("test"), + "should still produce output after no-op reset" ); } @@ -789,6 +789,53 @@ mod tests { ); } + #[test] + fn test_html_rewriter_adapter_emits_output_per_chunk() { + use lol_html::Settings; + + let settings = Settings::default(); + let mut adapter = HtmlRewriterAdapter::new(settings); + + // Send three chunks + let chunk1 = b""; + let result1 = adapter + .process_chunk(chunk1, false) + .expect("should process chunk1"); + assert!( + !result1.is_empty(), + "should emit output for first chunk, got empty" + ); + + let chunk2 = b"

hello

"; + let result2 = adapter + .process_chunk(chunk2, false) + .expect("should process chunk2"); + + let chunk3 = b""; + let result3 = adapter + .process_chunk(chunk3, true) + .expect("should process final chunk"); + + // Concatenate all outputs and verify correctness + let mut all_output = result1; + all_output.extend_from_slice(&result2); + all_output.extend_from_slice(&result3); + + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains(""), + "should contain html tag in concatenated output" + ); + assert!( + output.contains("

hello

"), + "should contain paragraph in concatenated output" + ); + assert!( + output.contains(""), + "should contain closing html tag in concatenated output" + ); + } + #[test] fn test_streaming_pipeline_with_html_rewriter() { use lol_html::{element, Settings}; From d72669c6c8057c411177692d8f4be4e0ab3d95a4 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:28:08 -0700 Subject: [PATCH 05/20] Unify compression paths into single process_chunks method --- .../src/streaming_processor.rs | 300 +++++------------- 1 file changed, 73 insertions(+), 227 deletions(-) diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 20171b6a..7062df93 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -94,6 +94,10 @@ impl StreamingPipeline

{ /// Process a stream from input to output /// + /// Handles all supported compression transformations by wrapping the raw + /// reader/writer in the appropriate decoder/encoder, then delegating to + /// [`Self::process_chunks`]. + /// /// # Errors /// /// Returns an error if the compression transformation is unsupported or if reading/writing fails. @@ -106,253 +110,96 @@ impl StreamingPipeline

{ self.config.input_compression, self.config.output_compression, ) { - (Compression::None, Compression::None) => self.process_uncompressed(input, output), - (Compression::Gzip, Compression::Gzip) => self.process_gzip_to_gzip(input, output), - (Compression::Gzip, Compression::None) => self.process_gzip_to_none(input, output), + (Compression::None, Compression::None) => self.process_chunks(input, output), + (Compression::Gzip, Compression::Gzip) => { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let decoder = GzDecoder::new(input); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + self.process_chunks(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize gzip encoder".to_string(), + })?; + Ok(()) + } + (Compression::Gzip, Compression::None) => { + use flate2::read::GzDecoder; + + self.process_chunks(GzDecoder::new(input), output) + } (Compression::Deflate, Compression::Deflate) => { - self.process_deflate_to_deflate(input, output) + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let decoder = ZlibDecoder::new(input); + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + self.process_chunks(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize deflate encoder".to_string(), + })?; + Ok(()) } (Compression::Deflate, Compression::None) => { - self.process_deflate_to_none(input, output) + use flate2::read::ZlibDecoder; + + self.process_chunks(ZlibDecoder::new(input), output) } (Compression::Brotli, Compression::Brotli) => { - self.process_brotli_to_brotli(input, output) + use brotli::enc::writer::CompressorWriter; + use brotli::enc::BrotliEncoderParams; + use brotli::Decompressor; + + let decoder = Decompressor::new(input, 4096); + let params = BrotliEncoderParams { + quality: 4, + lgwin: 22, + ..Default::default() + }; + let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); + self.process_chunks(decoder, &mut encoder)?; + // CompressorWriter finalizes on flush (already called) and into_inner + encoder.into_inner(); + Ok(()) + } + (Compression::Brotli, Compression::None) => { + use brotli::Decompressor; + + self.process_chunks(Decompressor::new(input, 4096), output) } - (Compression::Brotli, Compression::None) => self.process_brotli_to_none(input, output), _ => Err(Report::new(TrustedServerError::Proxy { message: "Unsupported compression transformation".to_string(), })), } } - /// Process uncompressed stream - fn process_uncompressed( - &mut self, - mut input: R, - mut output: W, - ) -> Result<(), Report> { - let mut buffer = vec![0u8; self.config.chunk_size]; - - loop { - match input.read(&mut buffer) { - Ok(0) => { - // End of stream - process any remaining data - let final_chunk = self.processor.process_chunk(&[], true).change_context( - TrustedServerError::Proxy { - message: "Failed to process final chunk".to_string(), - }, - )?; - if !final_chunk.is_empty() { - output.write_all(&final_chunk).change_context( - TrustedServerError::Proxy { - message: "Failed to write final chunk".to_string(), - }, - )?; - } - break; - } - Ok(n) => { - // Process this chunk - let processed = self - .processor - .process_chunk(&buffer[..n], false) - .change_context(TrustedServerError::Proxy { - message: "Failed to process chunk".to_string(), - })?; - if !processed.is_empty() { - output - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write processed chunk".to_string(), - })?; - } - } - Err(e) => { - return Err(Report::new(TrustedServerError::Proxy { - message: format!("Failed to read from input: {}", e), - })); - } - } - } - - output.flush().change_context(TrustedServerError::Proxy { - message: "Failed to flush output".to_string(), - })?; - - Ok(()) - } - - /// Process gzip compressed stream - fn process_gzip_to_gzip( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::GzDecoder; - use flate2::write::GzEncoder; - - let decoder = GzDecoder::new(input); - let mut encoder = GzEncoder::new(output, flate2::Compression::default()); - self.process_through_compression(decoder, &mut encoder)?; - encoder.finish().change_context(TrustedServerError::Proxy { - message: "Failed to finalize gzip encoder".to_string(), - })?; - Ok(()) - } - - /// Decompress input, process content in chunks, and write uncompressed output. - fn decompress_and_process( - &mut self, - mut decoder: R, - mut output: W, - codec_name: &str, - ) -> Result<(), Report> { - let mut buffer = vec![0u8; self.config.chunk_size]; - - loop { - match decoder.read(&mut buffer) { - Ok(0) => { - let final_chunk = self.processor.process_chunk(&[], true).change_context( - TrustedServerError::Proxy { - message: format!("Failed to process final {codec_name} chunk"), - }, - )?; - if !final_chunk.is_empty() { - output.write_all(&final_chunk).change_context( - TrustedServerError::Proxy { - message: format!("Failed to write final {codec_name} chunk"), - }, - )?; - } - break; - } - Ok(n) => { - let processed = self - .processor - .process_chunk(&buffer[..n], false) - .change_context(TrustedServerError::Proxy { - message: format!("Failed to process {codec_name} chunk"), - })?; - if !processed.is_empty() { - output.write_all(&processed).change_context( - TrustedServerError::Proxy { - message: format!("Failed to write {codec_name} chunk"), - }, - )?; - } - } - Err(e) => { - return Err(Report::new(TrustedServerError::Proxy { - message: format!("Failed to read from {codec_name} decoder: {e}"), - })); - } - } - } - - output.flush().change_context(TrustedServerError::Proxy { - message: format!("Failed to flush {codec_name} output"), - })?; - - Ok(()) - } - - /// Process gzip compressed input to uncompressed output (decompression only) - fn process_gzip_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::GzDecoder; - - self.decompress_and_process(GzDecoder::new(input), output, "gzip") - } - - /// Process deflate compressed stream - fn process_deflate_to_deflate( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::ZlibDecoder; - use flate2::write::ZlibEncoder; - - let decoder = ZlibDecoder::new(input); - let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); - self.process_through_compression(decoder, &mut encoder)?; - encoder.finish().change_context(TrustedServerError::Proxy { - message: "Failed to finalize deflate encoder".to_string(), - })?; - Ok(()) - } - - /// Process deflate compressed input to uncompressed output (decompression only) - fn process_deflate_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::ZlibDecoder; - - self.decompress_and_process(ZlibDecoder::new(input), output, "deflate") - } - - /// Process brotli compressed stream - fn process_brotli_to_brotli( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use brotli::enc::writer::CompressorWriter; - use brotli::enc::BrotliEncoderParams; - use brotli::Decompressor; - - let decoder = Decompressor::new(input, 4096); - let params = BrotliEncoderParams { - quality: 4, - lgwin: 22, - ..Default::default() - }; - let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); - self.process_through_compression(decoder, &mut encoder)?; - // CompressorWriter finalizes on flush (already called) and into_inner - encoder.into_inner(); - Ok(()) - } - - /// Process brotli compressed input to uncompressed output (decompression only) - fn process_brotli_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use brotli::Decompressor; - - self.decompress_and_process(Decompressor::new(input, 4096), output, "brotli") - } - - /// Generic processing through compression layers + /// Read chunks from `reader`, pass each through the processor, and write output to `writer`. /// - /// The caller retains ownership of `encoder` and must call its - /// type-specific finalization method (e.g., `finish()` or `into_inner()`) - /// after this function returns successfully. - fn process_through_compression( + /// This is the single unified chunk loop used by all compression paths. + /// The caller is responsible for wrapping `reader`/`writer` in the appropriate + /// decoder/encoder and for finalizing the encoder (e.g., calling `finish()`) + /// after this method returns. + /// + /// # Errors + /// + /// Returns an error if reading, processing, or writing any chunk fails. + fn process_chunks( &mut self, - mut decoder: R, - encoder: &mut W, + mut reader: R, + mut writer: W, ) -> Result<(), Report> { let mut buffer = vec![0u8; self.config.chunk_size]; loop { - match decoder.read(&mut buffer) { + match reader.read(&mut buffer) { Ok(0) => { - // End of stream let final_chunk = self.processor.process_chunk(&[], true).change_context( TrustedServerError::Proxy { message: "Failed to process final chunk".to_string(), }, )?; if !final_chunk.is_empty() { - encoder.write_all(&final_chunk).change_context( + writer.write_all(&final_chunk).change_context( TrustedServerError::Proxy { message: "Failed to write final chunk".to_string(), }, @@ -368,7 +215,7 @@ impl StreamingPipeline

{ message: "Failed to process chunk".to_string(), })?; if !processed.is_empty() { - encoder.write_all(&processed).change_context( + writer.write_all(&processed).change_context( TrustedServerError::Proxy { message: "Failed to write processed chunk".to_string(), }, @@ -377,17 +224,16 @@ impl StreamingPipeline

{ } Err(e) => { return Err(Report::new(TrustedServerError::Proxy { - message: format!("Failed to read from decoder: {}", e), + message: format!("Failed to read: {e}"), })); } } } - encoder.flush().change_context(TrustedServerError::Proxy { - message: "Failed to flush encoder".to_string(), + writer.flush().change_context(TrustedServerError::Proxy { + message: "Failed to flush output".to_string(), })?; - // Caller owns encoder and must call finish() after this returns. Ok(()) } } From 80e51d4807411bc5d3bb77cc6a6971d2a5e4cebb Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:30:06 -0700 Subject: [PATCH 06/20] Update plan with compression refactor implementation note --- docs/superpowers/plans/2026-03-25-streaming-response.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md index 268517b8..4afca7fe 100644 --- a/docs/superpowers/plans/2026-03-25-streaming-response.md +++ b/docs/superpowers/plans/2026-03-25-streaming-response.md @@ -35,6 +35,14 @@ rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression). ## Phase 1: Make the Pipeline Chunk-Emitting +> **Implementation note (2026-03-26):** Tasks 1-3 were implemented as planned, +> then followed by a refactor that unified all 9 `process_*_to_*` methods into +> a single `process_chunks` method with inline decoder/encoder creation in +> `process()`. This eliminated ~150 lines of duplication. The refactor was +> committed as "Unify compression paths into single process_chunks method". +> Tasks 1-3 descriptions below reflect the original plan; the final code is +> cleaner than described. + ### Task 1: Fix encoder finalization in `process_through_compression` This is the prerequisite for Task 2. The current code calls `flush()` then From c505c00395efb034ef2dce6047f7adc2dcb11948 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:33:45 -0700 Subject: [PATCH 07/20] Accumulate output for post-processors in HtmlWithPostProcessing --- .../trusted-server-core/src/html_processor.rs | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 540ab29d..30550318 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -20,6 +20,9 @@ use crate::tsjs; struct HtmlWithPostProcessing { inner: HtmlRewriterAdapter, post_processors: Vec>, + /// Buffer that accumulates all intermediate output when post-processors + /// need the full document. Left empty on the streaming-only path. + accumulated_output: Vec, origin_host: String, request_host: String, request_scheme: String, @@ -29,12 +32,26 @@ struct HtmlWithPostProcessing { impl StreamProcessor for HtmlWithPostProcessing { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { let output = self.inner.process_chunk(chunk, is_last)?; - if !is_last || output.is_empty() || self.post_processors.is_empty() { + + // Streaming-optimized path: no post-processors, pass through immediately. + if self.post_processors.is_empty() { return Ok(output); } - let Ok(output_str) = std::str::from_utf8(&output) else { - return Ok(output); + // Post-processors need the full document. Accumulate until the last chunk. + self.accumulated_output.extend_from_slice(&output); + if !is_last { + return Ok(Vec::new()); + } + + // Final chunk: run post-processors on the full accumulated output. + let full_output = std::mem::take(&mut self.accumulated_output); + if full_output.is_empty() { + return Ok(full_output); + } + + let Ok(output_str) = std::str::from_utf8(&full_output) else { + return Ok(full_output); }; let ctx = IntegrationHtmlContext { @@ -50,10 +67,10 @@ impl StreamProcessor for HtmlWithPostProcessing { .iter() .any(|p| p.should_process(output_str, &ctx)) { - return Ok(output); + return Ok(full_output); } - let mut html = String::from_utf8(output).map_err(|e| { + let mut html = String::from_utf8(full_output).map_err(|e| { io::Error::other(format!( "HTML post-processing expected valid UTF-8 output: {e}" )) @@ -79,6 +96,7 @@ impl StreamProcessor for HtmlWithPostProcessing { fn reset(&mut self) { self.inner.reset(); + self.accumulated_output.clear(); self.document_state.clear(); } } @@ -467,6 +485,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso HtmlWithPostProcessing { inner: HtmlRewriterAdapter::new(rewriter_settings), post_processors, + accumulated_output: Vec::new(), origin_host: config.origin_host, request_host: config.request_host, request_scheme: config.request_scheme, From 6cae7f9982c8a1a8d02793b58c1469b0e67f0d7b Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Wed, 25 Mar 2026 00:46:53 -0700 Subject: [PATCH 08/20] Add streaming response optimization spec for non-Next.js paths --- .../2026-03-25-streaming-response-design.md | 194 ++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 docs/superpowers/specs/2026-03-25-streaming-response-design.md diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md new file mode 100644 index 00000000..7011dea6 --- /dev/null +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -0,0 +1,194 @@ +# Streaming Response Optimization (Next.js Disabled) + +## Problem + +When Next.js is disabled, the publisher proxy buffers the entire response body +in memory before sending any bytes to the client. This creates two costs: + +1. **Latency** — The client receives zero bytes until the full response is + decompressed, rewritten, and recompressed. For a 222KB HTML page, this adds + hundreds of milliseconds to time-to-last-byte. +2. **Memory** — Peak memory holds ~4x the response size simultaneously + (compressed input + decompressed + processed output + recompressed output). + With WASM's ~16MB heap, this limits the size of pages we can proxy. + +## Scope + +**In scope**: All content types flowing through the publisher proxy path — HTML, +text/JSON, and binary pass-through. Only when Next.js is disabled (no +post-processor requiring the full document). + +**Out of scope**: Concurrent origin+auction fetch, Next.js-enabled paths (these +require full-document post-processing by design), non-publisher routes (static +JS, auction, discovery). + +## Streaming Gate + +Before committing to `stream_to_client()`, check: + +1. Backend status is success (2xx). +2. `html_post_processors()` is empty — no registered post-processors. + +If either check fails, fall back to the current buffered path. This keeps the +optimization transparent: same behavior for all existing configurations, +streaming only activates when safe. + +## Architecture + +Two implementation steps, each independently valuable and testable. + +### Step 1: Make the pipeline chunk-emitting + +Three changes to existing processors: + +#### A) `HtmlRewriterAdapter` — incremental streaming + +The current implementation accumulates the entire HTML document and processes it +on `is_last`. This is unnecessary — `lol_html::HtmlRewriter` supports +incremental `write()` calls and emits output via its `OutputSink` callback after +each chunk. + +Fix: create the rewriter eagerly in the constructor, use +`Rc>>` to share the output buffer between the sink and +`process_chunk()`, drain the buffer on every call instead of only on `is_last`. + +#### B) `process_gzip_to_gzip` — chunk-based decompression + +Currently calls `read_to_end()` to decompress the entire body into memory. The +deflate and brotli paths already use the chunk-based +`process_through_compression()`. + +Fix: use the same `process_through_compression` pattern for gzip. + +#### C) `process_through_compression` finalization + +Currently uses `drop(encoder)` which silently swallows errors from the gzip +trailer CRC32 checksum. + +Fix: call `encoder.finish()` explicitly and propagate errors. + +### Step 2: Stream response to client + +Change the publisher proxy path to use Fastly's `StreamingBody` API: + +1. Fetch from origin, receive response headers. +2. Validate status — if backend error, return buffered error response via + `send_to_client()`. +3. Check streaming gate — if `html_post_processors()` is non-empty, fall back + to buffered path. +4. Finalize all response headers (cookies, synthetic ID, geo, version). +5. Call `response.stream_to_client()` — headers sent to client immediately. +6. Pipe origin body through the streaming pipeline, writing chunks directly to + `StreamingBody`. +7. Call `finish()` on success; on error, log and drop (client sees truncated + response). + +For binary/non-text content: use `StreamingBody::append(body)` for zero-copy +pass-through, bypassing the pipeline entirely. + +#### Entry point change + +Migrate `main.rs` from `#[fastly::main]` to raw `main()` with `fastly::init()` ++ `Request::from_client()`. This is required because `stream_to_client()` / +`send_to_client()` are incompatible with `#[fastly::main]`'s return-based model. + +Non-streaming routes (static, auction, discovery) use `send_to_client()` as +before. + +## Data Flow + +### Streaming path (HTML, text/JSON with processing) + +``` +Origin body (gzip) + → Read 8KB chunk from GzDecoder + → StreamProcessor::process_chunk(chunk, is_last) + → HtmlRewriterAdapter: lol_html.write(chunk) → sink emits rewritten bytes + → OR StreamingReplacer: URL replacement with overlap buffer + → GzEncoder::write(processed_chunk) → compressed bytes + → StreamingBody::write(compressed) → chunk sent to client + → repeat until EOF + → StreamingBody::finish() +``` + +Memory at steady state: ~8KB input chunk buffer + lol_html internal parser state ++ gzip encoder window + overlap buffer for replacer. Roughly constant regardless +of document size, versus the current ~4x document size. + +### Pass-through path (binary, images, fonts, etc.) + +``` +Origin body + → StreamingBody::append(body) → zero-copy transfer +``` + +No decompression, no processing, no buffering. + +### Buffered fallback path (error responses or post-processors present) + +``` +Origin returns 4xx/5xx OR html_post_processors() is non-empty + → Current buffered path unchanged + → send_to_client() with proper status and full body +``` + +## Error Handling + +**Backend returns error status**: Detected before calling `stream_to_client()`. +Return the backend response as-is via `send_to_client()`. Client sees the +correct error status code. No change from current behavior. + +**Processing fails mid-stream**: `lol_html` parse error, decompression +corruption, I/O error. Headers (200 OK) are already sent. Log the error +server-side, drop the `StreamingBody`. Client sees a truncated response and the +connection closes. Standard reverse proxy behavior. + +**Compression finalization fails**: The gzip trailer CRC32 write fails. With the +fix, `encoder.finish()` is called explicitly and errors propagate. Same +mid-stream handling — log and truncate. + +No retry logic. No fallback to buffered after streaming has started — once +headers are sent, we are committed. + +## Files Changed + +| File | Change | Risk | +|------|--------|------| +| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally; fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | Medium | +| `crates/trusted-server-core/src/publisher.rs` | Split `handle_publisher_request` into streaming vs buffered paths based on `html_post_processors().is_empty()` | Medium | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium | + +**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to +`HtmlRewriterAdapter`, works as-is), integration registration, JS build +pipeline, tsjs module serving, auction handler, cookie/synthetic ID logic. + +## Testing Strategy + +### Unit tests (streaming_processor.rs) + +- `HtmlRewriterAdapter` emits output on every `process_chunk()` call, not just + `is_last`. +- `process_gzip_to_gzip` produces correct output without `read_to_end`. +- `encoder.finish()` errors propagate (not swallowed by `drop`). +- Multi-chunk HTML produces identical output to single-chunk processing. + +### Integration tests (publisher.rs) + +- Streaming gate: when `html_post_processors()` is non-empty, response is + buffered. +- Streaming gate: when `html_post_processors()` is empty, response streams. +- Backend error (4xx/5xx) returns buffered error response with correct status. +- Binary content passes through without processing. + +### End-to-end validation (Viceroy) + +- `cargo test --workspace` — all existing tests pass. +- Manual verification via `fastly compute serve` against a real origin. +- Compare response bodies before/after to confirm byte-identical output for + HTML, text, and binary. + +### Measurement (post-deploy) + +- Compare TTFB and time-to-last-byte on staging before and after. +- Monitor WASM heap usage via Fastly dashboard. +- Verify no regressions on static endpoints or auction. From 930a584e102692a55f1c0de9bcd84588f7d8955c Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Wed, 25 Mar 2026 00:50:17 -0700 Subject: [PATCH 09/20] Address spec review: Content-Length, streaming gate, finalization order, rollback --- .../2026-03-25-streaming-response-design.md | 58 ++++++++++++++----- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index 7011dea6..f745f3dd 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -15,8 +15,8 @@ in memory before sending any bytes to the client. This creates two costs: ## Scope **In scope**: All content types flowing through the publisher proxy path — HTML, -text/JSON, and binary pass-through. Only when Next.js is disabled (no -post-processor requiring the full document). +text/JSON, RSC Flight (`text/x-component`), and binary pass-through. Only when +Next.js is disabled (no post-processor requiring the full document). **Out of scope**: Concurrent origin+auction fetch, Next.js-enabled paths (these require full-document post-processing by design), non-publisher routes (static @@ -27,11 +27,14 @@ JS, auction, discovery). Before committing to `stream_to_client()`, check: 1. Backend status is success (2xx). -2. `html_post_processors()` is empty — no registered post-processors. +2. For HTML content: `html_post_processors()` is empty — no registered + post-processors. Non-HTML content types (text/JSON, RSC Flight, binary) can + always stream regardless of post-processor registration, since + post-processors only apply to HTML. -If either check fails, fall back to the current buffered path. This keeps the -optimization transparent: same behavior for all existing configurations, -streaming only activates when safe. +If either check fails for the given content type, fall back to the current +buffered path. This keeps the optimization transparent: same behavior for all +existing configurations, streaming only activates when safe. ## Architecture @@ -51,6 +54,12 @@ each chunk. Fix: create the rewriter eagerly in the constructor, use `Rc>>` to share the output buffer between the sink and `process_chunk()`, drain the buffer on every call instead of only on `is_last`. +The output buffer is drained *after* each `rewriter.write()` returns, so the +`RefCell` borrow in the sink closure never overlaps with the drain borrow. + +Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op +since the `Settings` are consumed by the rewriter constructor. This matches +actual usage (one adapter per request). #### B) `process_gzip_to_gzip` — chunk-based decompression @@ -60,12 +69,16 @@ deflate and brotli paths already use the chunk-based Fix: use the same `process_through_compression` pattern for gzip. -#### C) `process_through_compression` finalization +#### C) `process_through_compression` finalization — prerequisite for B -Currently uses `drop(encoder)` which silently swallows errors from the gzip -trailer CRC32 checksum. +`process_through_compression` currently uses `drop(encoder)` which silently +swallows errors. For gzip specifically, the trailer contains a CRC32 checksum — +if `finish()` fails, corrupted responses are served silently. Today this affects +deflate and brotli (which already use `process_through_compression`); after Step +1B moves gzip to this path, it will affect gzip too. -Fix: call `encoder.finish()` explicitly and propagate errors. +Fix: call `encoder.finish()` explicitly and propagate errors. This must land +before or with Step 1B. ### Step 2: Stream response to client @@ -77,10 +90,13 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API: 3. Check streaming gate — if `html_post_processors()` is non-empty, fall back to buffered path. 4. Finalize all response headers (cookies, synthetic ID, geo, version). -5. Call `response.stream_to_client()` — headers sent to client immediately. -6. Pipe origin body through the streaming pipeline, writing chunks directly to +5. Remove `Content-Length` header — the final size is unknown after processing. + Fastly's `StreamingBody` sends the response using chunked transfer encoding + automatically. +6. Call `response.stream_to_client()` — headers sent to client immediately. +7. Pipe origin body through the streaming pipeline, writing chunks directly to `StreamingBody`. -7. Call `finish()` on success; on error, log and drop (client sees truncated +8. Call `finish()` on success; on error, log and drop (client sees truncated response). For binary/non-text content: use `StreamingBody::append(body)` for zero-copy @@ -154,7 +170,7 @@ headers are sent, we are committed. | File | Change | Risk | |------|--------|------| -| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally; fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | Medium | +| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High | | `crates/trusted-server-core/src/publisher.rs` | Split `handle_publisher_request` into streaming vs buffered paths based on `html_post_processors().is_empty()` | Medium | | `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium | @@ -162,6 +178,20 @@ headers are sent, we are committed. `HtmlRewriterAdapter`, works as-is), integration registration, JS build pipeline, tsjs module serving, auction handler, cookie/synthetic ID logic. +Note: `HtmlWithPostProcessing` wraps `HtmlRewriterAdapter` and applies +post-processors on `is_last`. In the streaming path the post-processor list is +empty (that's the gate condition), so the wrapper is a no-op passthrough. It +remains in place — no need to bypass it. + +## Rollback Strategy + +The `#[fastly::main]` to raw `main()` migration is a structural change. If +streaming causes issues in production, the fastest rollback is reverting the +`main.rs` change — the buffered path still exists and the pipeline improvements +(Step 1) are safe to keep regardless. No feature flag needed; a git revert of +the Step 2 commit restores buffered behavior while retaining Step 1 memory +improvements. + ## Testing Strategy ### Unit tests (streaming_processor.rs) From a2b71bf53be89be5167eaa4a605767c50b3afb67 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Wed, 25 Mar 2026 01:13:06 -0700 Subject: [PATCH 10/20] Address deep review: header timing, error phases, process_response_streaming refactor --- .../2026-03-25-streaming-response-design.md | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index f745f3dd..dd31097d 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -54,7 +54,7 @@ each chunk. Fix: create the rewriter eagerly in the constructor, use `Rc>>` to share the output buffer between the sink and `process_chunk()`, drain the buffer on every call instead of only on `is_last`. -The output buffer is drained *after* each `rewriter.write()` returns, so the +The output buffer is drained _after_ each `rewriter.write()` returns, so the `RefCell` borrow in the sink closure never overlaps with the drain borrow. Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op @@ -90,6 +90,10 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API: 3. Check streaming gate — if `html_post_processors()` is non-empty, fall back to buffered path. 4. Finalize all response headers (cookies, synthetic ID, geo, version). + Today, synthetic ID/cookie headers are set _after_ body processing in + `handle_publisher_request`. Since they are body-independent (computed from + request cookies and consent context), they must be reordered to run _before_ + `stream_to_client()` so headers are complete before streaming begins. 5. Remove `Content-Length` header — the final size is unknown after processing. Fastly's `StreamingBody` sends the response using chunked transfer encoding automatically. @@ -99,13 +103,16 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API: 8. Call `finish()` on success; on error, log and drop (client sees truncated response). -For binary/non-text content: use `StreamingBody::append(body)` for zero-copy -pass-through, bypassing the pipeline entirely. +For binary/non-text content: call `response.take_body()` then +`StreamingBody::append(body)` for zero-copy pass-through, bypassing the pipeline +entirely. Today binary responses skip `take_body()` and return the response +as-is — the streaming path needs to explicitly take the body to hand it to +`append()`. #### Entry point change Migrate `main.rs` from `#[fastly::main]` to raw `main()` with `fastly::init()` -+ `Request::from_client()`. This is required because `stream_to_client()` / +\+ `Request::from_client()`. This is required because `stream_to_client()` / `send_to_client()` are incompatible with `#[fastly::main]`'s return-based model. Non-streaming routes (static, auction, discovery) use `send_to_client()` as @@ -128,7 +135,7 @@ Origin body (gzip) ``` Memory at steady state: ~8KB input chunk buffer + lol_html internal parser state -+ gzip encoder window + overlap buffer for replacer. Roughly constant regardless +\+ gzip encoder window + overlap buffer for replacer. Roughly constant regardless of document size, versus the current ~4x document size. ### Pass-through path (binary, images, fonts, etc.) @@ -154,10 +161,15 @@ Origin returns 4xx/5xx OR html_post_processors() is non-empty Return the backend response as-is via `send_to_client()`. Client sees the correct error status code. No change from current behavior. +**Processor creation fails**: `create_html_stream_processor()` or pipeline +construction errors happen _before_ `stream_to_client()` is called. Since +headers have not been sent yet, return a proper error response via +`send_to_client()`. Same as current behavior. + **Processing fails mid-stream**: `lol_html` parse error, decompression -corruption, I/O error. Headers (200 OK) are already sent. Log the error -server-side, drop the `StreamingBody`. Client sees a truncated response and the -connection closes. Standard reverse proxy behavior. +corruption, I/O error during chunk processing. Headers (200 OK) are already +sent. Log the error server-side, drop the `StreamingBody`. Client sees a +truncated response and the connection closes. Standard reverse proxy behavior. **Compression finalization fails**: The gzip trailer CRC32 write fails. With the fix, `encoder.finish()` is called explicitly and errors propagate. Same @@ -168,11 +180,11 @@ headers are sent, we are committed. ## Files Changed -| File | Change | Risk | -|------|--------|------| -| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High | -| `crates/trusted-server-core/src/publisher.rs` | Split `handle_publisher_request` into streaming vs buffered paths based on `html_post_processors().is_empty()` | Medium | -| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium | +| File | Change | Risk | +| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | +| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High | +| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium | **Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to `HtmlRewriterAdapter`, works as-is), integration registration, JS build From b363e562ac105b114ad5562f04245507393d80e8 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Wed, 25 Mar 2026 07:59:42 -0700 Subject: [PATCH 11/20] Address deep review: remove fastly::init, fix API assumptions, add missing paths --- .../2026-03-25-streaming-response-design.md | 100 +++++++++++++----- 1 file changed, 71 insertions(+), 29 deletions(-) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index dd31097d..80c49ed8 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -61,21 +61,32 @@ Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op since the `Settings` are consumed by the rewriter constructor. This matches actual usage (one adapter per request). -#### B) `process_gzip_to_gzip` — chunk-based decompression +#### B) Chunk-based decompression for all compression paths -Currently calls `read_to_end()` to decompress the entire body into memory. The -deflate and brotli paths already use the chunk-based +`process_gzip_to_gzip` calls `read_to_end()` to decompress the entire body into +memory. The deflate and brotli keep-compression paths already use chunk-based `process_through_compression()`. Fix: use the same `process_through_compression` pattern for gzip. +Additionally, `decompress_and_process()` (used by `process_gzip_to_none`, +`process_deflate_to_none`, `process_brotli_to_none`) also calls +`read_to_end()`. These strip-compression paths must be converted to chunk-based +processing too — read decompressed chunks, process each, write uncompressed +output directly. + +Reference: `process_uncompressed` already implements the correct chunk-based +pattern (read loop → `process_chunk()` per chunk → `write_all()` → flush). The +compressed paths should follow the same structure. + #### C) `process_through_compression` finalization — prerequisite for B `process_through_compression` currently uses `drop(encoder)` which silently -swallows errors. For gzip specifically, the trailer contains a CRC32 checksum — -if `finish()` fails, corrupted responses are served silently. Today this affects -deflate and brotli (which already use `process_through_compression`); after Step -1B moves gzip to this path, it will affect gzip too. +swallows errors. Today this affects deflate and brotli (which already use this +path). The current `process_gzip_to_gzip` calls `encoder.finish()` explicitly — +but Step 1B moves gzip to `process_through_compression`, which would **regress** +gzip from working `finish()` to broken `drop()`. This fix prevents that +regression and also fixes the pre-existing issue for deflate/brotli. Fix: call `encoder.finish()` explicitly and propagate errors. This must land before or with Step 1B. @@ -89,11 +100,14 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API: `send_to_client()`. 3. Check streaming gate — if `html_post_processors()` is non-empty, fall back to buffered path. -4. Finalize all response headers (cookies, synthetic ID, geo, version). - Today, synthetic ID/cookie headers are set _after_ body processing in - `handle_publisher_request`. Since they are body-independent (computed from - request cookies and consent context), they must be reordered to run _before_ - `stream_to_client()` so headers are complete before streaming begins. +4. Finalize all response headers. This requires reordering two things: + - **Synthetic ID/cookie headers**: today set _after_ body processing in + `handle_publisher_request`. Since they are body-independent (computed from + request cookies and consent context), move them _before_ streaming. + - **`finalize_response()`** (main.rs): today called _after_ `route_request` + returns, adding geo, version, staging, and operator headers. In the + streaming path, this must run _before_ `stream_to_client()` since the + publisher handler sends the response directly instead of returning it. 5. Remove `Content-Length` header — the final size is unknown after processing. Fastly's `StreamingBody` sends the response using chunked transfer encoding automatically. @@ -103,17 +117,36 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API: 8. Call `finish()` on success; on error, log and drop (client sees truncated response). -For binary/non-text content: call `response.take_body()` then -`StreamingBody::append(body)` for zero-copy pass-through, bypassing the pipeline -entirely. Today binary responses skip `take_body()` and return the response -as-is — the streaming path needs to explicitly take the body to hand it to -`append()`. +For binary/non-text content: call `response.take_body()` then stream via +`io::copy(&mut body, &mut streaming_body)`. The `Body` type implements `Read` +and `StreamingBody` implements `Write`, so this streams the backend body to the +client without buffering the full content. Today binary responses skip +`take_body()` and return the response as-is — the streaming path needs to +explicitly take the body to pipe it through. #### Entry point change -Migrate `main.rs` from `#[fastly::main]` to raw `main()` with `fastly::init()` -\+ `Request::from_client()`. This is required because `stream_to_client()` / -`send_to_client()` are incompatible with `#[fastly::main]`'s return-based model. +Migrate `main.rs` from `#[fastly::main]` to an undecorated `main()` with +`Request::from_client()`. No separate initialization call is needed — +`#[fastly::main]` is just syntactic sugar for `Request::from_client()` + +`Response::send_to_client()`. The migration is required because +`stream_to_client()` / `send_to_client()` are incompatible with +`#[fastly::main]`'s return-based model. + +```rust +fn main() { + let req = Request::from_client(); + match handle(req) { + Ok(()) => {} + Err(e) => to_error_response(&e).send_to_client(), + } +} +``` + +Note: the return type changes from `Result` to `()` (or +`Result<(), Error>`). Errors that currently propagate to `main`'s `Result` must +now be caught explicitly and sent via `send_to_client()` with +`to_error_response()`. Non-streaming routes (static, auction, discovery) use `send_to_client()` as before. @@ -134,18 +167,19 @@ Origin body (gzip) → StreamingBody::finish() ``` -Memory at steady state: ~8KB input chunk buffer + lol_html internal parser state -\+ gzip encoder window + overlap buffer for replacer. Roughly constant regardless +Memory at steady state: ~8KB input chunk buffer, lol_html internal parser state, +gzip encoder window, and overlap buffer for replacer. Roughly constant regardless of document size, versus the current ~4x document size. ### Pass-through path (binary, images, fonts, etc.) ``` -Origin body - → StreamingBody::append(body) → zero-copy transfer +Origin body (via take_body()) + → io::copy(&mut body, &mut streaming_body) → streamed transfer + → StreamingBody::finish() ``` -No decompression, no processing, no buffering. +No decompression, no processing. Body streams through as read. ### Buffered fallback path (error responses or post-processors present) @@ -168,8 +202,10 @@ headers have not been sent yet, return a proper error response via **Processing fails mid-stream**: `lol_html` parse error, decompression corruption, I/O error during chunk processing. Headers (200 OK) are already -sent. Log the error server-side, drop the `StreamingBody`. Client sees a -truncated response and the connection closes. Standard reverse proxy behavior. +sent. Log the error server-side, drop the `StreamingBody`. Per the Fastly SDK, +`StreamingBody` automatically aborts the response if dropped without calling +`finish()` — the client sees a connection reset / truncated response. This is +standard reverse proxy behavior. **Compression finalization fails**: The gzip trailer CRC32 write fails. With the fix, `encoder.finish()` is called explicitly and errors propagate. Same @@ -182,9 +218,9 @@ headers are sent, we are committed. | File | Change | Risk | | ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | -| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High | +| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High | | `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium | -| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium | **Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to `HtmlRewriterAdapter`, works as-is), integration registration, JS build @@ -195,6 +231,12 @@ post-processors on `is_last`. In the streaming path the post-processor list is empty (that's the gate condition), so the wrapper is a no-op passthrough. It remains in place — no need to bypass it. +Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from +`html_post_processors`. Script rewriters run inside `lol_html` element handlers +during streaming — they do not require buffering and are unaffected by this +change. The streaming gate checks only `html_post_processors().is_empty()`, not +script rewriters. Currently only Next.js registers a post-processor. + ## Rollback Strategy The `#[fastly::main]` to raw `main()` migration is a structural change. If From b83f61c4a5c8eab14fc34e766d48d82b25e719aa Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 10:37:39 -0700 Subject: [PATCH 12/20] Apply rustfmt formatting to streaming_processor --- .../src/streaming_processor.rs | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 7062df93..40ec51cb 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -215,11 +215,11 @@ impl StreamingPipeline

{ message: "Failed to process chunk".to_string(), })?; if !processed.is_empty() { - writer.write_all(&processed).change_context( - TrustedServerError::Proxy { + writer + .write_all(&processed) + .change_context(TrustedServerError::Proxy { message: "Failed to write processed chunk".to_string(), - }, - )?; + })?; } } Err(e) => { @@ -502,8 +502,7 @@ mod tests { // Compress input let mut compressed_input = Vec::new(); { - let mut enc = - ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); + let mut enc = ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); enc.write_all(input_data) .expect("should compress test input"); enc.finish().expect("should finish compression"); @@ -551,8 +550,7 @@ mod tests { let mut compressed_input = Vec::new(); { - let mut enc = - GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); enc.write_all(input_data) .expect("should compress test input"); enc.finish().expect("should finish compression"); @@ -600,8 +598,7 @@ mod tests { let mut compressed_input = Vec::new(); { - let mut enc = - GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); enc.write_all(input_data) .expect("should compress test input"); enc.finish().expect("should finish compression"); @@ -627,8 +624,7 @@ mod tests { .expect("should process gzip-to-none"); // Assert - let result = - String::from_utf8(output).expect("should be valid UTF-8 uncompressed output"); + let result = String::from_utf8(output).expect("should be valid UTF-8 uncompressed output"); assert_eq!( result, "hi world", "should have replaced content after gzip decompression" From aeca9f6479c33d87263da7a24f61d37a04f72b64 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 10:47:24 -0700 Subject: [PATCH 13/20] Add debug logging, brotli round-trip test, and post-processor accumulation test - Add debug-level logging to process_chunks showing total bytes read and written per pipeline invocation - Add brotli-to-brotli round-trip test to cover the into_inner() finalization path - Add test proving HtmlWithPostProcessing accumulates output when post-processors are registered while streaming path passes through --- .../trusted-server-core/src/html_processor.rs | 85 +++++++++++++++++++ .../src/streaming_processor.rs | 57 +++++++++++++ 2 files changed, 142 insertions(+) diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 30550318..95ccf9c3 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -1010,4 +1010,89 @@ mod tests { .collect::() ); } + + #[test] + fn post_processors_accumulate_while_streaming_path_passes_through() { + use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; + use lol_html::Settings; + + // --- Streaming path: no post-processors → output emitted per chunk --- + let mut streaming = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: Vec::new(), + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + let chunk1 = streaming + .process_chunk(b"", false) + .expect("should process chunk1"); + let chunk2 = streaming + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let chunk3 = streaming + .process_chunk(b"", true) + .expect("should process final chunk"); + + assert!( + !chunk1.is_empty() || !chunk2.is_empty(), + "should emit intermediate output on streaming path" + ); + + let mut streaming_all = chunk1; + streaming_all.extend_from_slice(&chunk2); + streaming_all.extend_from_slice(&chunk3); + + // --- Buffered path: post-processor registered → accumulates until is_last --- + struct NoopPostProcessor; + impl IntegrationHtmlPostProcessor for NoopPostProcessor { + fn integration_id(&self) -> &'static str { + "test-noop" + } + fn post_process(&self, _html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool { + false + } + } + + let mut buffered = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: vec![Arc::new(NoopPostProcessor)], + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + let buf1 = buffered + .process_chunk(b"", false) + .expect("should process chunk1"); + let buf2 = buffered + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let buf3 = buffered + .process_chunk(b"", true) + .expect("should process final chunk"); + + assert!( + buf1.is_empty() && buf2.is_empty(), + "should return empty for intermediate chunks when post-processors are registered" + ); + assert!( + !buf3.is_empty(), + "should emit all output in final chunk when post-processors are registered" + ); + + // Both paths should produce identical output + let streaming_str = + String::from_utf8(streaming_all).expect("streaming output should be valid UTF-8"); + let buffered_str = String::from_utf8(buf3).expect("buffered output should be valid UTF-8"); + assert_eq!( + streaming_str, buffered_str, + "streaming and buffered paths should produce identical output" + ); + } } diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 40ec51cb..4f189926 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -189,6 +189,8 @@ impl StreamingPipeline

{ mut writer: W, ) -> Result<(), Report> { let mut buffer = vec![0u8; self.config.chunk_size]; + let mut total_read: u64 = 0; + let mut total_written: u64 = 0; loop { match reader.read(&mut buffer) { @@ -199,6 +201,7 @@ impl StreamingPipeline

{ }, )?; if !final_chunk.is_empty() { + total_written += final_chunk.len() as u64; writer.write_all(&final_chunk).change_context( TrustedServerError::Proxy { message: "Failed to write final chunk".to_string(), @@ -208,6 +211,7 @@ impl StreamingPipeline

{ break; } Ok(n) => { + total_read += n as u64; let processed = self .processor .process_chunk(&buffer[..n], false) @@ -215,6 +219,7 @@ impl StreamingPipeline

{ message: "Failed to process chunk".to_string(), })?; if !processed.is_empty() { + total_written += processed.len() as u64; writer .write_all(&processed) .change_context(TrustedServerError::Proxy { @@ -234,6 +239,10 @@ impl StreamingPipeline

{ message: "Failed to flush output".to_string(), })?; + log::debug!( + "Streaming pipeline complete: read {total_read} bytes, wrote {total_written} bytes" + ); + Ok(()) } } @@ -631,6 +640,54 @@ mod tests { ); } + #[test] + fn test_brotli_round_trip_produces_valid_output() { + use brotli::enc::writer::CompressorWriter; + use brotli::Decompressor; + use std::io::{Read as _, Write as _}; + + let input_data = b"hello world"; + + // Compress input with brotli + let mut compressed_input = Vec::new(); + { + let mut enc = CompressorWriter::new(&mut compressed_input, 4096, 4, 22); + enc.write_all(input_data) + .expect("should compress test input"); + enc.flush().expect("should flush brotli encoder"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Brotli, + output_compression: Compression::Brotli, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process brotli-to-brotli"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + Decompressor::new(&output[..], 4096) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through brotli round-trip" + ); + } + #[test] fn test_html_rewriter_adapter_emits_output_per_chunk() { use lol_html::Settings; From e1c6cb81e3c95bbb757a9bba67fa818969ea8658 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:05:07 -0700 Subject: [PATCH 14/20] Address deep review: imports, stale comments, brotli finalization, tests - Group std imports together (cell, io, rc) before external crates - Document supported compression combinations on PipelineConfig - Remove dead-weight byte counters from process_chunks hot loop - Fix stale comment referencing removed process_through_compression - Fix brotli finalization: use drop(encoder) instead of into_inner() to make the intent clear (CompressorWriter writes trailer on drop) - Document reset() as no-op on HtmlRewriterAdapter (single-use) - Add brotli round-trip test covering into_inner finalization path - Add gzip HTML rewriter pipeline test (compressed round-trip with lol_html, not just StreamingReplacer) - Add HtmlWithPostProcessing accumulation vs streaming behavior test --- .../trusted-server-core/src/html_processor.rs | 3 +- .../src/streaming_processor.rs | 126 +++++++++++++----- 2 files changed, 94 insertions(+), 35 deletions(-) diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 95ccf9c3..52fba915 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -94,8 +94,9 @@ impl StreamProcessor for HtmlWithPostProcessing { Ok(html.into_bytes()) } + /// No-op. `HtmlWithPostProcessing` wraps a single-use + /// [`HtmlRewriterAdapter`] and cannot be meaningfully reset. fn reset(&mut self) { - self.inner.reset(); self.accumulated_output.clear(); self.document_state.clear(); } diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 4f189926..6e915737 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -7,10 +7,10 @@ //! - UTF-8 boundary handling use std::cell::RefCell; +use std::io::{self, Read, Write}; use std::rc::Rc; use error_stack::{Report, ResultExt}; -use std::io::{self, Read, Write}; use crate::error::TrustedServerError; @@ -56,7 +56,21 @@ impl Compression { } } -/// Configuration for the streaming pipeline +/// Configuration for the streaming pipeline. +/// +/// # Supported compression combinations +/// +/// | Input | Output | Behavior | +/// |-------|--------|----------| +/// | None | None | Pass-through processing | +/// | Gzip | Gzip | Decompress → process → recompress | +/// | Gzip | None | Decompress → process | +/// | Deflate | Deflate | Decompress → process → recompress | +/// | Deflate | None | Decompress → process | +/// | Brotli | Brotli | Decompress → process → recompress | +/// | Brotli | None | Decompress → process | +/// +/// All other combinations return an error at runtime. pub struct PipelineConfig { /// Input compression type pub input_compression: Compression, @@ -158,8 +172,9 @@ impl StreamingPipeline

{ }; let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); self.process_chunks(decoder, &mut encoder)?; - // CompressorWriter finalizes on flush (already called) and into_inner - encoder.into_inner(); + // CompressorWriter writes the brotli stream trailer on drop. + // process_chunks already called flush(), so drop finalizes cleanly. + drop(encoder); Ok(()) } (Compression::Brotli, Compression::None) => { @@ -189,8 +204,6 @@ impl StreamingPipeline

{ mut writer: W, ) -> Result<(), Report> { let mut buffer = vec![0u8; self.config.chunk_size]; - let mut total_read: u64 = 0; - let mut total_written: u64 = 0; loop { match reader.read(&mut buffer) { @@ -201,7 +214,6 @@ impl StreamingPipeline

{ }, )?; if !final_chunk.is_empty() { - total_written += final_chunk.len() as u64; writer.write_all(&final_chunk).change_context( TrustedServerError::Proxy { message: "Failed to write final chunk".to_string(), @@ -211,7 +223,6 @@ impl StreamingPipeline

{ break; } Ok(n) => { - total_read += n as u64; let processed = self .processor .process_chunk(&buffer[..n], false) @@ -219,7 +230,6 @@ impl StreamingPipeline

{ message: "Failed to process chunk".to_string(), })?; if !processed.is_empty() { - total_written += processed.len() as u64; writer .write_all(&processed) .change_context(TrustedServerError::Proxy { @@ -239,10 +249,6 @@ impl StreamingPipeline

{ message: "Failed to flush output".to_string(), })?; - log::debug!( - "Streaming pipeline complete: read {total_read} bytes, wrote {total_written} bytes" - ); - Ok(()) } } @@ -308,10 +314,12 @@ impl StreamProcessor for HtmlRewriterAdapter { Ok(std::mem::take(&mut *self.output.borrow_mut())) } - fn reset(&mut self) { - // No-op: the rewriter consumed its Settings on construction. - // Single-use by design (one adapter per request). - } + /// No-op. `HtmlRewriterAdapter` is single-use: the rewriter consumes its + /// [`Settings`](lol_html::Settings) on construction and cannot be recreated. + /// Calling [`process_chunk`](StreamProcessor::process_chunk) after + /// [`process_chunk`](StreamProcessor::process_chunk) with `is_last = true` + /// will produce empty output. + fn reset(&mut self) {} } /// Adapter to use our existing `StreamingReplacer` as a `StreamProcessor` @@ -468,40 +476,33 @@ mod tests { } #[test] - fn test_html_rewriter_adapter_reset_is_noop() { + fn test_html_rewriter_adapter_reset_then_finalize() { use lol_html::Settings; let settings = Settings::default(); let mut adapter = HtmlRewriterAdapter::new(settings); - // Process some content - let result1 = adapter + adapter .process_chunk(b"test", false) .expect("should process html"); - // Reset is a no-op — the adapter is single-use by design + // reset() is a documented no-op — adapter is single-use adapter.reset(); - // The rewriter is still alive; finalize it - let result2 = adapter + // Finalize still works; the rewriter is still alive + let final_output = adapter .process_chunk(b"", true) .expect("should finalize after reset"); - let mut all_output = result1; - all_output.extend_from_slice(&result2); - - let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); - assert!( - output.contains("test"), - "should still produce output after no-op reset" - ); + // Output may or may not be empty depending on lol_html buffering, + // but it should not error + let _ = final_output; } #[test] fn test_deflate_round_trip_produces_valid_output() { - // Verify that deflate-to-deflate (which uses process_through_compression) - // produces valid output that decompresses correctly. This establishes the - // correctness contract before we change the finalization path. + // Verify that deflate-to-deflate produces valid output that decompresses + // correctly, confirming that encoder finalization works. use flate2::read::ZlibDecoder; use flate2::write::ZlibEncoder; use std::io::{Read as _, Write as _}; @@ -772,4 +773,61 @@ mod tests { "Should not contain original URL" ); } + + #[test] + fn test_gzip_pipeline_with_html_rewriter() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + use lol_html::{element, Settings}; + use std::io::{Read as _, Write as _}; + + let settings = Settings { + element_content_handlers: vec![element!("a[href]", |el| { + if let Some(href) = el.get_attribute("href") { + if href.contains("example.com") { + el.set_attribute("href", &href.replace("example.com", "test.com"))?; + } + } + Ok(()) + })], + ..Settings::default() + }; + + let input = b"Link"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input).expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let adapter = HtmlRewriterAdapter::new(settings); + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + let mut pipeline = StreamingPipeline::new(config, adapter); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("pipeline should process gzip HTML"); + + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output"); + + let result = String::from_utf8(decompressed).expect("output should be valid UTF-8"); + assert!( + result.contains("https://test.com"), + "should have replaced URL through gzip HTML pipeline" + ); + assert!( + !result.contains("example.com"), + "should not contain original URL after gzip HTML pipeline" + ); + } } From 9753026afc8a81e2f1ddee452bab727df08f05b5 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:12:42 -0700 Subject: [PATCH 15/20] Address second deep review: correctness, docs, and test robustness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Eq derive to Compression enum (all unit variants, trivially correct) - Brotli finalization: use into_inner() instead of drop() to skip redundant flush and make finalization explicit - Document process_chunks flush semantics: callers must still call encoder-specific finalization after this method returns - Warn when HtmlRewriterAdapter receives data after finalization (rewriter already consumed, data would be silently lost) - Make HtmlWithPostProcessing::reset() a true no-op matching its doc (clearing auxiliary state without resetting rewriter is inconsistent) - Document extra copying overhead on post-processor path vs streaming - Assert output content in reset-then-finalize test (was discarded) - Relax per-chunk emission test to not depend on lol_html internal buffering behavior — assert concatenated correctness + at least one intermediate chunk emitted --- .../trusted-server-core/src/html_processor.rs | 19 +++-- .../src/streaming_processor.rs | 82 +++++++++++-------- 2 files changed, 64 insertions(+), 37 deletions(-) diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 52fba915..d9840cfb 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -17,6 +17,16 @@ use crate::settings::Settings; use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; use crate::tsjs; +/// Wraps [`HtmlRewriterAdapter`] with optional post-processing. +/// +/// When `post_processors` is empty (the common streaming path), chunks pass +/// through immediately with no extra copying. When post-processors are +/// registered, intermediate output is accumulated in `accumulated_output` +/// until `is_last`, then post-processors run on the full document. This adds +/// an extra copy per chunk compared to the pre-streaming adapter (which +/// accumulated raw input instead of rewriter output). The overhead is +/// acceptable because the post-processor path is already fully buffered — +/// the real streaming win comes from the empty-post-processor path in Phase 2. struct HtmlWithPostProcessing { inner: HtmlRewriterAdapter, post_processors: Vec>, @@ -95,11 +105,10 @@ impl StreamProcessor for HtmlWithPostProcessing { } /// No-op. `HtmlWithPostProcessing` wraps a single-use - /// [`HtmlRewriterAdapter`] and cannot be meaningfully reset. - fn reset(&mut self) { - self.accumulated_output.clear(); - self.document_state.clear(); - } + /// [`HtmlRewriterAdapter`] that cannot be reset. Clearing auxiliary + /// state without resetting the rewriter would leave the processor + /// in an inconsistent state, so this method intentionally does nothing. + fn reset(&mut self) {} } /// Configuration for HTML processing diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 6e915737..3915494c 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -35,7 +35,7 @@ pub trait StreamProcessor { } /// Compression type for the stream -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Compression { None, Gzip, @@ -172,9 +172,12 @@ impl StreamingPipeline

{ }; let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); self.process_chunks(decoder, &mut encoder)?; - // CompressorWriter writes the brotli stream trailer on drop. - // process_chunks already called flush(), so drop finalizes cleanly. - drop(encoder); + // CompressorWriter emits the brotli stream trailer via flush(), + // which process_chunks already called. into_inner() avoids a + // redundant flush on drop and makes finalization explicit. + // Note: unlike flate2's finish(), CompressorWriter has no + // fallible finalization method — flush() is the only option. + let _ = encoder.into_inner(); Ok(()) } (Compression::Brotli, Compression::None) => { @@ -191,9 +194,11 @@ impl StreamingPipeline

{ /// Read chunks from `reader`, pass each through the processor, and write output to `writer`. /// /// This is the single unified chunk loop used by all compression paths. - /// The caller is responsible for wrapping `reader`/`writer` in the appropriate - /// decoder/encoder and for finalizing the encoder (e.g., calling `finish()`) - /// after this method returns. + /// The method calls `writer.flush()` before returning. For the `None → None` + /// path this is the only finalization needed. For compressed paths, the caller + /// must still call the encoder's type-specific finalization (e.g., `finish()` + /// for flate2, `into_inner()` for brotli) — `flush()` alone does not write + /// compression trailers for all codecs. /// /// # Errors /// @@ -292,13 +297,22 @@ impl HtmlRewriterAdapter { impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - if let Some(rewriter) = &mut self.rewriter { - if !chunk.is_empty() { - rewriter.write(chunk).map_err(|e| { - log::error!("Failed to process HTML chunk: {e}"); - io::Error::other(format!("HTML processing failed: {e}")) - })?; + match &mut self.rewriter { + Some(rewriter) => { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + None if !chunk.is_empty() => { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); } + None => {} } if is_last { @@ -482,7 +496,7 @@ mod tests { let settings = Settings::default(); let mut adapter = HtmlRewriterAdapter::new(settings); - adapter + let result1 = adapter .process_chunk(b"test", false) .expect("should process html"); @@ -490,13 +504,17 @@ mod tests { adapter.reset(); // Finalize still works; the rewriter is still alive - let final_output = adapter + let result2 = adapter .process_chunk(b"", true) .expect("should finalize after reset"); - // Output may or may not be empty depending on lol_html buffering, - // but it should not error - let _ = final_output; + let mut all_output = result1; + all_output.extend_from_slice(&result2); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains("test"), + "should produce correct output despite no-op reset" + ); } #[test] @@ -696,27 +714,27 @@ mod tests { let settings = Settings::default(); let mut adapter = HtmlRewriterAdapter::new(settings); - // Send three chunks - let chunk1 = b""; + // Send three chunks — lol_html may buffer internally, so individual + // chunk outputs may vary by version. The contract is that concatenated + // output is correct, and that output is not deferred entirely to is_last. let result1 = adapter - .process_chunk(chunk1, false) + .process_chunk(b"", false) .expect("should process chunk1"); - assert!( - !result1.is_empty(), - "should emit output for first chunk, got empty" - ); - - let chunk2 = b"

hello

"; let result2 = adapter - .process_chunk(chunk2, false) + .process_chunk(b"

hello

", false) .expect("should process chunk2"); - - let chunk3 = b""; let result3 = adapter - .process_chunk(chunk3, true) + .process_chunk(b"", true) .expect("should process final chunk"); - // Concatenate all outputs and verify correctness + // At least one intermediate chunk should produce output (verifies + // we're not deferring everything to is_last like the old adapter). + assert!( + !result1.is_empty() || !result2.is_empty(), + "should emit some output before is_last" + ); + + // Concatenated output must be correct let mut all_output = result1; all_output.extend_from_slice(&result2); all_output.extend_from_slice(&result3); From 0a4ece7c82480df7e07ef6ace4ea5773dcd0ac02 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:32:43 -0700 Subject: [PATCH 16/20] Add active post-processor test and precise flush docs per codec - Add test that feeds multiple chunks through HtmlWithPostProcessing with an active post-processor (should_process returns true, mutates HTML). Verifies the post-processor receives the complete accumulated document and its mutations appear in the output. - Make flush semantics per-codec explicit in process_chunks doc: flate2 needs finish() after flush, brotli is finalized by flush --- .../trusted-server-core/src/html_processor.rs | 62 +++++++++++++++++++ .../src/streaming_processor.rs | 8 ++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index d9840cfb..9e6efafb 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -1105,4 +1105,66 @@ mod tests { "streaming and buffered paths should produce identical output" ); } + + #[test] + fn active_post_processor_receives_full_document_and_mutates_output() { + use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; + use lol_html::Settings; + + struct AppendCommentProcessor; + impl IntegrationHtmlPostProcessor for AppendCommentProcessor { + fn integration_id(&self) -> &'static str { + "test-append" + } + fn should_process(&self, html: &str, _ctx: &IntegrationHtmlContext<'_>) -> bool { + html.contains("") + } + fn post_process(&self, html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool { + html.push_str(""); + true + } + } + + let mut processor = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: vec![Arc::new(AppendCommentProcessor)], + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + // Feed multiple chunks + let r1 = processor + .process_chunk(b"", false) + .expect("should process chunk1"); + let r2 = processor + .process_chunk(b"

content

", false) + .expect("should process chunk2"); + let r3 = processor + .process_chunk(b"", true) + .expect("should process final chunk"); + + // Intermediate chunks return empty (buffered for post-processor) + assert!( + r1.is_empty() && r2.is_empty(), + "should buffer intermediate chunks" + ); + + // Final chunk contains the full document with post-processor mutation + let output = String::from_utf8(r3).expect("should be valid UTF-8"); + assert!( + output.contains("

content

"), + "should contain original content" + ); + assert!( + output.contains(""), + "should contain complete document" + ); + assert!( + output.contains(""), + "should contain post-processor mutation" + ); + } } diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 3915494c..ac226d95 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -196,9 +196,11 @@ impl StreamingPipeline

{ /// This is the single unified chunk loop used by all compression paths. /// The method calls `writer.flush()` before returning. For the `None → None` /// path this is the only finalization needed. For compressed paths, the caller - /// must still call the encoder's type-specific finalization (e.g., `finish()` - /// for flate2, `into_inner()` for brotli) — `flush()` alone does not write - /// compression trailers for all codecs. + /// must still call the encoder's type-specific finalization after this returns: + /// - **flate2** (`GzEncoder`, `ZlibEncoder`): call `finish()` — `flush()` does + /// not write the gzip/deflate trailer. + /// - **brotli** (`CompressorWriter`): `flush()` does finalize the stream, so + /// the caller only needs `into_inner()` to reclaim the writer. /// /// # Errors /// From 68d11e875754623892bf66b1730f73716d9cea30 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:46:21 -0700 Subject: [PATCH 17/20] Fix text node fragmentation regression for script rewriters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit lol_html fragments text nodes across chunk boundaries when fed incrementally. This breaks script rewriters (NextJS __NEXT_DATA__, GTM) that expect complete text content — a split domain like "google" + "tagmanager.com" would silently miss the rewrite. Add dual-mode HtmlRewriterAdapter: - new(): streaming mode, emits output per chunk (no script rewriters) - new_buffered(): accumulates input, feeds lol_html in one write() call on is_last (script rewriters registered) create_html_processor selects the mode based on whether script_rewriters is non-empty. This preserves the old behavior (single-pass processing) when rewriters need it, while enabling streaming when they don't. Add regression test proving lol_html does fragment text across chunk boundaries, confirming the issue is real. --- .../trusted-server-core/src/html_processor.rs | 15 +- .../src/streaming_processor.rs | 135 ++++++++++++++++-- 2 files changed, 137 insertions(+), 13 deletions(-) diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 9e6efafb..1839eb59 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -455,6 +455,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso }), ]; + let has_script_rewriters = !script_rewriters.is_empty(); for script_rewriter in script_rewriters { let selector = script_rewriter.selector(); let rewriter = script_rewriter.clone(); @@ -492,8 +493,20 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso ..RewriterSettings::default() }; + // Use buffered mode when script rewriters are registered. lol_html fragments + // text nodes across chunk boundaries during streaming, which breaks rewriters + // that expect complete text content (e.g., __NEXT_DATA__, GTM inline scripts). + // Buffered mode feeds the entire document to lol_html in one write() call, + // preserving text node integrity. When no script rewriters are active, + // streaming mode emits output incrementally per chunk. + let inner = if has_script_rewriters { + HtmlRewriterAdapter::new_buffered(rewriter_settings) + } else { + HtmlRewriterAdapter::new(rewriter_settings) + }; + HtmlWithPostProcessing { - inner: HtmlRewriterAdapter::new(rewriter_settings), + inner, post_processors, accumulated_output: Vec::new(), origin_host: config.origin_host, diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index ac226d95..2ca71bc0 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -275,16 +275,33 @@ impl lol_html::OutputSink for RcVecSink { /// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`]. /// -/// Output is emitted incrementally on every [`StreamProcessor::process_chunk`] call. +/// Operates in one of two modes: +/// +/// - **Streaming** (`buffered = false`): output is emitted incrementally on every +/// [`StreamProcessor::process_chunk`] call. Use when no script rewriters are +/// registered. +/// - **Buffered** (`buffered = true`): input is accumulated and processed in a +/// single `write()` call on `is_last`. Use when script rewriters are registered, +/// because `lol_html` fragments text nodes across chunk boundaries and rewriters +/// that expect complete text content (e.g., `__NEXT_DATA__`, GTM) would silently +/// miss rewrites on split fragments. +/// /// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`] /// is a no-op because the rewriter consumes its settings on construction. pub struct HtmlRewriterAdapter { rewriter: Option>, output: Rc>>, + /// When true, input is accumulated and fed to `lol_html` in one pass on `is_last`. + buffered: bool, + /// Accumulated input for the buffered path. + accumulated_input: Vec, } impl HtmlRewriterAdapter { /// Create a new HTML rewriter adapter that streams output per chunk. + /// + /// Use [`Self::new_buffered`] when script rewriters are registered to + /// avoid text node fragmentation. #[must_use] pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { let output = Rc::new(RefCell::new(Vec::new())); @@ -293,28 +310,69 @@ impl HtmlRewriterAdapter { Self { rewriter: Some(rewriter), output, + buffered: false, + accumulated_input: Vec::new(), + } + } + + /// Create a new HTML rewriter adapter that buffers all input before processing. + /// + /// This avoids `lol_html` text node fragmentation that breaks script rewriters + /// expecting complete text content. The entire document is fed to the rewriter + /// in a single `write()` call when `is_last` is true. + #[must_use] + pub fn new_buffered(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); + Self { + rewriter: Some(rewriter), + output, + buffered: true, + accumulated_input: Vec::new(), } } } impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - match &mut self.rewriter { - Some(rewriter) => { - if !chunk.is_empty() { - rewriter.write(chunk).map_err(|e| { - log::error!("Failed to process HTML chunk: {e}"); + if self.buffered { + // Buffered mode: accumulate input, process all at once on is_last. + if !chunk.is_empty() { + self.accumulated_input.extend_from_slice(chunk); + } + if !is_last { + return Ok(Vec::new()); + } + // Feed entire document to lol_html in one pass + if let Some(rewriter) = &mut self.rewriter { + if !self.accumulated_input.is_empty() { + let input = std::mem::take(&mut self.accumulated_input); + rewriter.write(&input).map_err(|e| { + log::error!("Failed to process HTML: {e}"); io::Error::other(format!("HTML processing failed: {e}")) })?; } } - None if !chunk.is_empty() => { - log::warn!( - "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", - chunk.len() - ); + } else { + // Streaming mode: feed chunks to lol_html incrementally. + match &mut self.rewriter { + Some(rewriter) => { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + None if !chunk.is_empty() => { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); + } + None => {} } - None => {} } if is_last { @@ -352,6 +410,59 @@ mod tests { use super::*; use crate::streaming_replacer::{Replacement, StreamingReplacer}; + /// Verify that `lol_html` fragments text nodes when input chunks split + /// mid-text-node. This is critical: if `lol_html` does fragment, then + /// script rewriters (`NextJS` `__NEXT_DATA__`, `GTM`) that expect full + /// text content will silently miss rewrites when the streaming adapter + /// feeds chunks incrementally. + #[test] + fn lol_html_fragments_text_across_chunk_boundaries() { + use std::cell::RefCell; + use std::rc::Rc; + + let fragments: Rc>> = Rc::new(RefCell::new(Vec::new())); + let fragments_clone = Rc::clone(&fragments); + + let mut rewriter = lol_html::HtmlRewriter::new( + lol_html::Settings { + element_content_handlers: vec![lol_html::text!("script", move |text| { + fragments_clone + .borrow_mut() + .push((text.as_str().to_string(), text.last_in_text_node())); + Ok(()) + })], + ..lol_html::Settings::default() + }, + |_chunk: &[u8]| {}, + ); + + // Split "googletagmanager.com/gtm.js" across two chunks + rewriter + .write(b"") + .expect("should write chunk2"); + rewriter.end().expect("should end"); + + let frags = fragments.borrow(); + // lol_html should emit at least 2 text fragments since input was split + assert!( + frags.len() >= 2, + "should fragment text across chunk boundaries, got {} fragments: {:?}", + frags.len(), + *frags + ); + // No single fragment should contain the full domain + assert!( + !frags + .iter() + .any(|(text, _)| text.contains("googletagmanager.com")), + "no individual fragment should contain the full domain when split across chunks: {:?}", + *frags + ); + } + #[test] fn test_uncompressed_pipeline() { let replacer = StreamingReplacer::new(vec![Replacement { From 6faeea0190099e7c347b25dfd64727d9639e18cb Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:59:51 -0700 Subject: [PATCH 18/20] Gate streaming adapter on script rewriter presence lol_html fragments text nodes across input chunk boundaries. Script rewriters (NextJS __NEXT_DATA__, GTM) expect complete text content and would silently miss rewrites on split fragments. Add dual-mode HtmlRewriterAdapter: - new(): streaming, emits output per chunk (no script rewriters) - new_buffered(): accumulates input, single write() on is_last create_html_processor selects mode based on script_rewriters. This preserves correctness while enabling streaming for configs without script rewriters. Phase 3 will make rewriters fragment-safe. Add regression test proving lol_html does fragment text nodes. --- .../trusted-server-core/src/html_processor.rs | 9 ++++----- .../src/streaming_processor.rs | 19 +++++++++---------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 1839eb59..079681db 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -494,11 +494,10 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso }; // Use buffered mode when script rewriters are registered. lol_html fragments - // text nodes across chunk boundaries during streaming, which breaks rewriters - // that expect complete text content (e.g., __NEXT_DATA__, GTM inline scripts). - // Buffered mode feeds the entire document to lol_html in one write() call, - // preserving text node integrity. When no script rewriters are active, - // streaming mode emits output incrementally per chunk. + // text nodes across input chunk boundaries, breaking rewriters that expect + // complete text (e.g., __NEXT_DATA__, GTM). Buffered mode feeds the entire + // document in one write() call, preserving text node integrity. + // Phase 3 will make rewriters fragment-safe, enabling streaming for all configs. let inner = if has_script_rewriters { HtmlRewriterAdapter::new_buffered(rewriter_settings) } else { diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 2ca71bc0..a65958dc 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -277,14 +277,14 @@ impl lol_html::OutputSink for RcVecSink { /// /// Operates in one of two modes: /// -/// - **Streaming** (`buffered = false`): output is emitted incrementally on every -/// [`StreamProcessor::process_chunk`] call. Use when no script rewriters are -/// registered. -/// - **Buffered** (`buffered = true`): input is accumulated and processed in a -/// single `write()` call on `is_last`. Use when script rewriters are registered, -/// because `lol_html` fragments text nodes across chunk boundaries and rewriters -/// that expect complete text content (e.g., `__NEXT_DATA__`, GTM) would silently -/// miss rewrites on split fragments. +/// - **Streaming** ([`new`](Self::new)): output is emitted incrementally on every +/// [`process_chunk`](StreamProcessor::process_chunk) call. Use when no script +/// rewriters are registered. +/// - **Buffered** ([`new_buffered`](Self::new_buffered)): input is accumulated and +/// processed in a single `write()` call on `is_last`. Use when script rewriters +/// are registered, because `lol_html` fragments text nodes across chunk boundaries +/// and rewriters that expect complete text content would silently miss rewrites on +/// split fragments. (See Phase 3 plan for making rewriters fragment-safe.) /// /// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`] /// is a no-op because the rewriter consumes its settings on construction. @@ -344,7 +344,6 @@ impl StreamProcessor for HtmlRewriterAdapter { if !is_last { return Ok(Vec::new()); } - // Feed entire document to lol_html in one pass if let Some(rewriter) = &mut self.rewriter { if !self.accumulated_input.is_empty() { let input = std::mem::take(&mut self.accumulated_input); @@ -355,7 +354,7 @@ impl StreamProcessor for HtmlRewriterAdapter { } } } else { - // Streaming mode: feed chunks to lol_html incrementally. + // Streaming mode: feed chunks to `lol_html` incrementally. match &mut self.rewriter { Some(rewriter) => { if !chunk.is_empty() { From 73c992e8b8b3fe13995858c140fabc570e624e32 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:01:49 -0700 Subject: [PATCH 19/20] Document text node fragmentation workaround and Phase 3 plan Add section to spec explaining the lol_html text fragmentation issue, the dual-mode HtmlRewriterAdapter workaround (Phase 1), and the planned fix to make script rewriters fragment-safe (Phase 3, #584). --- .../2026-03-25-streaming-response-design.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index 72716b73..c42afd5c 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -239,6 +239,22 @@ during streaming — they do not require buffering and are unaffected by this change. The streaming gate checks only `html_post_processors().is_empty()`, not script rewriters. Currently only Next.js registers a post-processor. +## Text Node Fragmentation (Phase 3) + +`lol_html` fragments text nodes across input chunk boundaries when processing +HTML incrementally. Script rewriters (`NextJsNextDataRewriter`, +`GoogleTagManagerIntegration`) expect complete text content — if a domain string +is split across chunks, the rewrite silently fails. + +**Phase 1 workaround**: `HtmlRewriterAdapter` has two modes. `new()` streams +per chunk (no script rewriters). `new_buffered()` accumulates input and +processes in one `write()` call (script rewriters registered). +`create_html_processor` selects the mode automatically. + +**Phase 3** will make each script rewriter fragment-safe by accumulating text +fragments internally via `is_last_in_text_node`. This removes the buffered +fallback and enables streaming for all configurations. See #584. + ## Rollback Strategy The `#[fastly::main]` to raw `main()` migration is a structural change. If From 75f455acc37f7aebd23c0ba67639f1bdba443faa Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:06:20 -0700 Subject: [PATCH 20/20] Add buffered mode guard, anti-fragmentation test, and fix stale spec - Add post-finalization warning to buffered path (was only in streaming) - Add buffered_adapter_prevents_text_fragmentation test proving new_buffered() delivers complete text to lol_html handlers - Fix spec: html_processor.rs is changed (selects adapter mode), and script_rewriters do require buffered mode (not "unaffected") --- .../src/streaming_processor.rs | 60 ++++++++++++++++++- .../2026-03-25-streaming-response-design.md | 16 +++-- 2 files changed, 69 insertions(+), 7 deletions(-) diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index a65958dc..5a4ea290 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -339,7 +339,14 @@ impl StreamProcessor for HtmlRewriterAdapter { if self.buffered { // Buffered mode: accumulate input, process all at once on is_last. if !chunk.is_empty() { - self.accumulated_input.extend_from_slice(chunk); + if self.rewriter.is_none() { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); + } else { + self.accumulated_input.extend_from_slice(chunk); + } } if !is_last { return Ok(Vec::new()); @@ -462,6 +469,57 @@ mod tests { ); } + /// Companion to [`lol_html_fragments_text_across_chunk_boundaries`]: + /// proves that `new_buffered()` prevents fragmentation by feeding the + /// entire document to `lol_html` in one `write()` call. + #[test] + fn buffered_adapter_prevents_text_fragmentation() { + use std::cell::RefCell; + use std::rc::Rc; + + let fragments: Rc>> = Rc::new(RefCell::new(Vec::new())); + let fragments_clone = Rc::clone(&fragments); + + let settings = lol_html::Settings { + element_content_handlers: vec![lol_html::text!("script", move |text| { + fragments_clone + .borrow_mut() + .push((text.as_str().to_string(), text.last_in_text_node())); + Ok(()) + })], + ..lol_html::Settings::default() + }; + + let mut adapter = HtmlRewriterAdapter::new_buffered(settings); + + // Feed the same split chunks as the fragmentation test + let r1 = adapter + .process_chunk(b"", true) + .expect("should process chunk2"); + assert!( + !r2.is_empty(), + "buffered adapter should emit output on is_last" + ); + + let frags = fragments.borrow(); + // With buffered mode, the text handler should see the complete string + assert!( + frags + .iter() + .any(|(text, _)| text.contains("googletagmanager.com")), + "buffered adapter should deliver complete text to handler, got: {:?}", + *frags + ); + } + #[test] fn test_uncompressed_pipeline() { let replacer = StreamingReplacer::new(vec![Replacement { diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index c42afd5c..034624b5 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -224,9 +224,10 @@ headers are sent, we are committed. | `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium | | `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium | -**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to -`HtmlRewriterAdapter`, works as-is), integration registration, JS build -pipeline, tsjs module serving, auction handler, cookie/synthetic ID logic. +**Minimal changes**: `html_processor.rs` now selects `HtmlRewriterAdapter` mode +based on script rewriter presence (see [Text Node Fragmentation](#text-node-fragmentation-phase-3)), +but is otherwise unchanged. Integration registration, JS build pipeline, tsjs +module serving, auction handler, cookie/synthetic ID logic are not changed. Note: `HtmlWithPostProcessing` wraps `HtmlRewriterAdapter` and applies post-processors on `is_last`. In the streaming path the post-processor list is @@ -235,9 +236,12 @@ remains in place — no need to bypass it. Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from `html_post_processors`. Script rewriters run inside `lol_html` element handlers -during streaming — they do not require buffering and are unaffected by this -change. The streaming gate checks only `html_post_processors().is_empty()`, not -script rewriters. Currently only Next.js registers a post-processor. +and currently require buffered mode because `lol_html` fragments text nodes +across chunk boundaries (see [Phase 3](#text-node-fragmentation-phase-3)). +`html_post_processors` require the full document for post-processing. +The streaming gate checks `html_post_processors().is_empty()` for the +post-processor path; `create_html_processor` separately gates the adapter mode +on `script_rewriters`. Currently only Next.js registers a post-processor. ## Text Node Fragmentation (Phase 3)