diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md new file mode 100644 index 00000000..268517b8 --- /dev/null +++ b/docs/superpowers/plans/2026-03-25-streaming-response.md @@ -0,0 +1,983 @@ +# Streaming Response Optimization — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use +> superpowers:subagent-driven-development (recommended) or +> superpowers:executing-plans to implement this plan task-by-task. Steps use +> checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Stream HTTP responses through the publisher proxy instead of buffering +them, reducing peak memory from ~4x response size to constant and improving +time-to-last-byte. + +**Architecture:** Two independent phases. Phase 1 makes the internal streaming +pipeline truly chunk-emitting (HtmlRewriterAdapter, compression paths, encoder +finalization). Phase 2 wires up Fastly's `StreamingBody` API so processed chunks +flow directly to the client. Each phase is shippable independently. + +**Tech Stack:** Rust 1.91.1, Fastly Compute SDK 0.11.12 +(`stream_to_client`/`send_to_client`/`StreamingBody`), `lol_html` (HTML +rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression). + +**Spec:** `docs/superpowers/specs/2026-03-25-streaming-response-design.md` +**Issue:** #563 + +--- + +## File Map + +| File | Role | Phase | +|------|------|-------| +| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 | +| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 | + +--- + +## Phase 1: Make the Pipeline Chunk-Emitting + +### Task 1: Fix encoder finalization in `process_through_compression` + +This is the prerequisite for Task 2. The current code calls `flush()` then +`drop(encoder)`, silently swallowing finalization errors. Must fix before +moving gzip to this path. + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:334-393` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +- [ ] **Step 1: Write a test verifying deflate round-trip correctness** + +Add to the `#[cfg(test)]` module at the bottom of +`streaming_processor.rs`: + +```rust +#[test] +fn test_deflate_round_trip_produces_valid_output() { + // Verify that deflate-to-deflate (which uses process_through_compression) + // produces valid output that decompresses correctly. This establishes the + // correctness contract before we change the finalization path. + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let input_data = b"hello world"; + + // Compress input + let mut compressed_input = Vec::new(); + { + let mut enc = ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Deflate, + output_compression: Compression::Deflate, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process deflate-to-deflate"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + ZlibDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through deflate round-trip" + ); +} +``` + +- [ ] **Step 2: Run test to verify it passes (baseline)** + +Run: `cargo test --package trusted-server-core test_deflate_round_trip_produces_valid_output` + +Expected: PASS (current code happens to work for this case since +`ZlibEncoder::drop` calls `finish` internally — the test establishes the +contract). + +- [ ] **Step 3: Change `process_through_compression` to take `&mut W` and remove `drop(encoder)`** + +`finish()` is not on the `Write` trait — each encoder type +(`GzEncoder`, `ZlibEncoder`, `CompressorWriter`) has its own `finish()`. +The fix: change the signature to take `&mut W` so the caller retains +ownership and calls `finish()` explicitly. + +Change signature (line 335-338): + +```rust + fn process_through_compression( + &mut self, + mut decoder: R, + encoder: &mut W, + ) -> Result<(), Report> { +``` + +Replace lines 383-393 (the `flush` + `drop` block): + +```rust + encoder.flush().change_context(TrustedServerError::Proxy { + message: "Failed to flush encoder".to_string(), + })?; + + // Caller owns encoder and must call finish() after this returns. + Ok(()) + } +``` + +Then update `process_deflate_to_deflate` (lines 276-289): + +```rust + fn process_deflate_to_deflate( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let decoder = ZlibDecoder::new(input); + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize deflate encoder".to_string(), + })?; + Ok(()) + } +``` + +And update `process_brotli_to_brotli` (lines 303-321): + +```rust + fn process_brotli_to_brotli( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use brotli::enc::writer::CompressorWriter; + use brotli::enc::BrotliEncoderParams; + use brotli::Decompressor; + + let decoder = Decompressor::new(input, 4096); + let mut params = BrotliEncoderParams::default(); + params.quality = 4; + params.lgwin = 22; + let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); + self.process_through_compression(decoder, &mut encoder)?; + // CompressorWriter finalizes on flush (already called) and into_inner + encoder.into_inner(); + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All existing tests pass plus the new one. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Fix encoder finalization: explicit finish instead of drop" +``` + +--- + +### Task 2: Convert `process_gzip_to_gzip` to chunk-based processing + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:183-225` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +- [ ] **Step 1: Write a test for gzip chunk-based round-trip** + +```rust +#[test] +fn test_gzip_to_gzip_produces_correct_output() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let input_data = b"hello world"; + + // Compress input as gzip + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-gzip"); + + // Decompress and verify + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress gzip output"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through gzip round-trip" + ); +} +``` + +- [ ] **Step 2: Run test to verify it passes (baseline)** + +Run: `cargo test --package trusted-server-core test_gzip_to_gzip_produces_correct_output` + +Expected: PASS (current code works, just buffers everything). + +- [ ] **Step 3: Rewrite `process_gzip_to_gzip` to use `process_through_compression`** + +Replace `process_gzip_to_gzip` (lines 183-225): + +```rust + fn process_gzip_to_gzip( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let decoder = GzDecoder::new(input); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize gzip encoder".to_string(), + })?; + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Convert process_gzip_to_gzip to chunk-based processing" +``` + +--- + +### Task 3: Convert `decompress_and_process` to chunk-based processing + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:227-262` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +Note: the `*_to_none` callers (`process_gzip_to_none`, +`process_deflate_to_none`, `process_brotli_to_none` at lines 264-332) do +not need changes — they call `decompress_and_process` with the same +signature. + +- [ ] **Step 1: Write a test for gzip-to-none chunk-based processing** + +```rust +#[test] +fn test_gzip_to_none_produces_correct_output() { + use flate2::write::GzEncoder; + + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::None, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-none"); + + assert_eq!( + String::from_utf8(output).expect("should be valid UTF-8"), + "hi world", + "should have replaced content and output uncompressed" + ); +} +``` + +- [ ] **Step 2: Run test to verify baseline** + +Run: `cargo test --package trusted-server-core test_gzip_to_none_produces_correct_output` + +Expected: PASS. + +- [ ] **Step 3: Rewrite `decompress_and_process` to use chunk loop** + +Replace `decompress_and_process` (lines 227-262) with a chunk-based +version that mirrors `process_uncompressed`: + +```rust + fn decompress_and_process( + &mut self, + mut decoder: R, + mut output: W, + _codec_name: &str, + ) -> Result<(), Report> { + let mut buffer = vec![0u8; self.config.chunk_size]; + + loop { + match decoder.read(&mut buffer) { + Ok(0) => { + let final_chunk = + self.processor.process_chunk(&[], true).change_context( + TrustedServerError::Proxy { + message: "Failed to process final chunk".to_string(), + }, + )?; + if !final_chunk.is_empty() { + output.write_all(&final_chunk).change_context( + TrustedServerError::Proxy { + message: "Failed to write final chunk".to_string(), + }, + )?; + } + break; + } + Ok(n) => { + let processed = self + .processor + .process_chunk(&buffer[..n], false) + .change_context(TrustedServerError::Proxy { + message: "Failed to process chunk".to_string(), + })?; + if !processed.is_empty() { + output.write_all(&processed).change_context( + TrustedServerError::Proxy { + message: "Failed to write processed chunk".to_string(), + }, + )?; + } + } + Err(e) => { + return Err(Report::new(TrustedServerError::Proxy { + message: format!("Failed to read from decoder: {e}"), + })); + } + } + } + + output.flush().change_context(TrustedServerError::Proxy { + message: "Failed to flush output".to_string(), + })?; + + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Convert decompress_and_process to chunk-based processing" +``` + +--- + +### Task 4: Rewrite `HtmlRewriterAdapter` for incremental streaming + +**Files:** +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:396-472` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +Important context: `create_html_processor` in `html_processor.rs` returns +`HtmlWithPostProcessing`, which wraps `HtmlRewriterAdapter`. The wrapper's +`process_chunk` (line 31-34 of `html_processor.rs`) returns intermediate +output immediately for `!is_last` chunks — it passes through, not +swallows. When the post-processor list is empty (streaming gate condition), +the wrapper is a no-op passthrough. No changes needed to +`html_processor.rs`. + +- [ ] **Step 1: Write a test proving incremental output** + +```rust +#[test] +fn test_html_rewriter_adapter_emits_output_per_chunk() { + use lol_html::Settings; + + let settings = Settings::default(); + let mut adapter = HtmlRewriterAdapter::new(settings); + + // First chunk should produce output (not empty) + let result1 = adapter + .process_chunk(b"", false) + .expect("should process chunk 1"); + assert!( + !result1.is_empty(), + "should emit output for non-last chunk, got empty" + ); + + // Second chunk should also produce output + let result2 = adapter + .process_chunk(b"

hello

", false) + .expect("should process chunk 2"); + assert!( + !result2.is_empty(), + "should emit output for second non-last chunk, got empty" + ); + + // Final chunk + let result3 = adapter + .process_chunk(b"", true) + .expect("should process final chunk"); + + // Concatenated output should be the full document + let mut full_output = result1; + full_output.extend_from_slice(&result2); + full_output.extend_from_slice(&result3); + let output_str = String::from_utf8(full_output).expect("should be valid UTF-8"); + assert!( + output_str.contains("") && output_str.contains("hello"), + "should contain complete document, got: {output_str}" + ); +} +``` + +- [ ] **Step 2: Run test to verify it fails (current code returns empty for non-last chunks)** + +Run: `cargo test --package trusted-server-core test_html_rewriter_adapter_emits_output_per_chunk` + +Expected: FAIL — assertion `should emit output for non-last chunk` fails. + +- [ ] **Step 3: Rewrite `HtmlRewriterAdapter` to stream incrementally** + +Replace the struct and impl (lines 396-472): + +```rust +/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor`. +/// +/// Creates the rewriter eagerly and emits output on every `process_chunk` +/// call. Single-use: `reset()` is a no-op since `Settings` are consumed +/// by the rewriter constructor. +pub struct HtmlRewriterAdapter { + rewriter: Option>, + output: Rc>>, +} + +/// Output sink that appends to a shared `Vec`. +struct RcVecSink(Rc>>); + +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); + } +} + +impl HtmlRewriterAdapter { + /// Create a new HTML rewriter adapter. + /// + /// The rewriter is created immediately, consuming the settings. + #[must_use] + pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); + Self { + rewriter: Some(rewriter), + output, + } + } +} + +impl StreamProcessor for HtmlRewriterAdapter { + fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { + if let Some(rewriter) = &mut self.rewriter { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + + if is_last { + if let Some(rewriter) = self.rewriter.take() { + rewriter.end().map_err(|e| { + log::error!("Failed to finalize HTML: {e}"); + io::Error::other(format!("HTML finalization failed: {e}")) + })?; + } + } + + // Drain whatever lol_html produced since last call. + // Safe: sink borrow released before we borrow here. + Ok(std::mem::take(&mut *self.output.borrow_mut())) + } + + fn reset(&mut self) { + // No-op: rewriter consumed Settings on construction. + // Single-use by design (one per request). + } +} +``` + +Add these imports at the top of `streaming_processor.rs`: + +```rust +use std::cell::RefCell; +use std::rc::Rc; +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: The new per-chunk test passes. Some existing tests that assert +"intermediate chunks return empty" will now fail and need updating. + +- [ ] **Step 5: Update existing tests for new behavior** + +Update `test_html_rewriter_adapter_accumulates_until_last` — it currently +asserts empty output for non-last chunks. Change assertions to expect +non-empty intermediate output and verify the concatenated result. + +Update `test_html_rewriter_adapter_handles_large_input` — same: remove +assertions that intermediate chunks are empty. + +Update `test_html_rewriter_adapter_reset` — `reset()` is now a no-op. +Remove or update this test since the adapter is single-use. + +- [ ] **Step 6: Run all tests again** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 7: Run clippy** + +Run: `cargo clippy --workspace --all-targets --all-features -- -D warnings` + +Expected: No warnings. + +- [ ] **Step 8: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Rewrite HtmlRewriterAdapter for incremental lol_html streaming" +``` + +--- + +### Task 5: Phase 1 full verification + +- [ ] **Step 1: Run full test suite** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 2: Run JS tests** + +Run: `cd crates/js/lib && npx vitest run` + +Expected: All tests pass. + +- [ ] **Step 3: Run clippy and fmt** + +Run: `cargo clippy --workspace --all-targets --all-features -- -D warnings && cargo fmt --all -- --check` + +Expected: Clean. + +- [ ] **Step 4: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +--- + +## Phase 2: Stream Response to Client + +### Task 6: Migrate entry point from `#[fastly::main]` to raw `main()` + +**Files:** +- Modify: `crates/trusted-server-adapter-fastly/src/main.rs:32-68` + +- [ ] **Step 1: Rewrite `main` function** + +Replace lines 32-68: + +```rust +fn main() { + init_logger(); + + let req = Request::from_client(); + + // Health probe: independent from settings/routing. + if req.get_method() == Method::GET && req.get_path() == "/health" { + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; + } + + let settings = match get_settings() { + Ok(s) => s, + Err(e) => { + log::error!("Failed to load settings: {:?}", e); + to_error_response(&e).send_to_client(); + return; + } + }; + log::debug!("Settings {settings:?}"); + + let orchestrator = build_orchestrator(&settings); + + let integration_registry = match IntegrationRegistry::new(&settings) { + Ok(r) => r, + Err(e) => { + log::error!("Failed to create integration registry: {:?}", e); + to_error_response(&e).send_to_client(); + return; + } + }; + + let response = futures::executor::block_on(route_request( + &settings, + &orchestrator, + &integration_registry, + req, + )); + + match response { + Ok(resp) => resp.send_to_client(), + Err(e) => to_error_response(&e).send_to_client(), + } +} +``` + +- [ ] **Step 2: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 3: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +- [ ] **Step 4: Commit** + +``` +git add crates/trusted-server-adapter-fastly/src/main.rs +git commit -m "Migrate entry point from #[fastly::main] to raw main()" +``` + +--- + +### Task 7: Refactor `process_response_streaming` to accept `W: Write` + +**Files:** +- Modify: `crates/trusted-server-core/src/publisher.rs:97-180` + +- [ ] **Step 1: Change signature to accept generic writer** + +Change `process_response_streaming` from returning `Body` to writing into +a generic `W: Write`: + +```rust +fn process_response_streaming( + body: Body, + output: &mut W, + params: &ProcessResponseParams, +) -> Result<(), Report> { +``` + +Remove `let mut output = Vec::new();` (line 117) and +`Ok(Body::from(output))` (line 179). The caller passes the output writer. + +- [ ] **Step 2: Update the call site in `handle_publisher_request`** + +In `handle_publisher_request`, replace the current call (lines 338-341): + +```rust +// Before: +match process_response_streaming(body, ¶ms) { + Ok(processed_body) => { + response.set_body(processed_body); + +// After: +let mut output = Vec::new(); +match process_response_streaming(body, &mut output, ¶ms) { + Ok(()) => { + response.set_body(Body::from(output)); +``` + +This preserves existing behavior — the buffered path still works. + +- [ ] **Step 3: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass (behavior unchanged). + +- [ ] **Step 4: Commit** + +``` +git add crates/trusted-server-core/src/publisher.rs +git commit -m "Refactor process_response_streaming to accept generic writer" +``` + +--- + +### Task 8: Add streaming path to publisher proxy + +**Files:** +- Modify: `crates/trusted-server-core/src/publisher.rs` +- Modify: `crates/trusted-server-adapter-fastly/src/main.rs` + +This is the core change. `handle_publisher_request` needs to support two +modes: buffered (returns `Response`) and streaming (sends response directly +via `StreamingBody`). The streaming path requires access to Fastly-specific +types (`StreamingBody`, `send_to_client`), but `publisher.rs` lives in +`trusted-server-core` which is platform-agnostic. + +**Approach:** Add a `ResponseMode` enum or callback that +`handle_publisher_request` uses to decide how to send the response. The +simplest approach: split into a preparation phase (returns headers + body +stream + processing params) and a send phase (in the fastly adapter). + +Alternatively, since `StreamingPipeline::process` already takes `W: Write`, +the adapter can call `process_response_streaming` with a `StreamingBody` +directly. The key is that the adapter needs to: + +1. Call `handle_publisher_request` logic up to the point of body processing +2. Decide buffered vs streaming +3. Either buffer or stream + +This task is complex — the implementer should read the spec's Step 2 +carefully and adapt the approach to minimize changes. The plan provides the +structure; exact code depends on how the publisher function is decomposed. + +- [ ] **Step 1: Export `finalize_response` or its logic for use before streaming** + +In `main.rs`, make `finalize_response` callable from the publisher path. +Either make it `pub` and move to `trusted-server-core`, or pass a +pre-finalized response to the streaming path. + +- [ ] **Step 2: Add streaming gate check** + +Add a helper in `publisher.rs`: + +```rust +fn should_stream( + status: u16, + content_type: &str, + integration_registry: &IntegrationRegistry, +) -> bool { + if !(200..300).contains(&status) { + return false; + } + // Only html_post_processors gate streaming — NOT script_rewriters. + // Script rewriters (Next.js, GTM) run inside lol_html element handlers + // during streaming and do not require full-document buffering. + // Currently only Next.js registers a post-processor. + let is_html = content_type.contains("text/html"); + if is_html && !integration_registry.html_post_processors().is_empty() { + return false; + } + true +} +``` + +- [ ] **Step 3: Restructure `handle_publisher_request` to support streaming** + +Split the function into: +1. Pre-processing: request info, cookies, synthetic ID, consent, backend + request — everything before `response.take_body()` +2. Header finalization: synthetic ID/cookie headers, `finalize_response()` + headers, Content-Length removal +3. Body processing: either buffered (`Vec`) or streaming + (`StreamingBody`) + +The streaming path in the fastly adapter: +```rust +// After header finalization, before body processing: +if should_stream { + let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + let mut streaming_body = response.stream_to_client(); + + match process_response_streaming(body, &mut streaming_body, ¶ms) { + Ok(()) => { + streaming_body.finish() + .expect("should finish streaming body"); + } + Err(e) => { + log::error!("Streaming processing failed: {:?}", e); + // StreamingBody dropped → client sees abort + } + } +} else { + // Existing buffered path +} +``` + +- [ ] **Step 4: Handle binary pass-through in streaming path** + +For non-text content when streaming is enabled: + +```rust +if !should_process { + let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + let mut streaming_body = response.stream_to_client(); + io::copy(&mut body, &mut streaming_body) + .expect("should copy body to streaming output"); + streaming_body.finish() + .expect("should finish streaming body"); +} +``` + +- [ ] **Step 5: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 6: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +- [ ] **Step 7: Commit** + +``` +git add crates/trusted-server-core/src/publisher.rs \ + crates/trusted-server-adapter-fastly/src/main.rs +git commit -m "Add streaming response path for publisher proxy" +``` + +--- + +### Task 9: Phase 2 full verification + +- [ ] **Step 1: Run full test suite** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 2: Run clippy, fmt, JS tests** + +```bash +cargo clippy --workspace --all-targets --all-features -- -D warnings +cargo fmt --all -- --check +cd crates/js/lib && npx vitest run +``` + +Expected: All clean. + +- [ ] **Step 3: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds. + +- [ ] **Step 4: Manual verification with Viceroy** + +Run: `fastly compute serve` + +Test: +- `curl -s http://localhost:7676/ | sha256sum` — compare with baseline +- `curl -sI http://localhost:7676/` — verify headers present (geo, version, + synthetic ID cookie if consent configured) +- `curl -s http://localhost:7676/static/tsjs=tsjs-unified.min.js` — verify + static routes still work via `send_to_client` + +- [ ] **Step 5: Chrome DevTools MCP performance capture** + +Follow the measurement protocol in the spec's "Performance measurement via +Chrome DevTools MCP" section. Compare against baseline captured on `main`. + +--- + +### Task 10: Chrome DevTools MCP baseline + comparison + +- [ ] **Step 1: Capture baseline on `main`** + +Follow spec section "Baseline capture" — use `navigate_page`, +`list_network_requests`, `lighthouse_audit`, `performance_start_trace` / +`performance_stop_trace`, `performance_analyze_insight`, +`take_memory_snapshot`. Record median TTFB, TTLB, LCP, Speed Index across +5 runs. + +- [ ] **Step 2: Capture metrics on feature branch** + +Repeat the same measurements after building the feature branch. + +- [ ] **Step 3: Compare and document results** + +Create a comparison table and save to PR description or a results file. +Check for: +- TTLB improvement (primary goal) +- No TTFB regression +- Identical response body hash (correctness) +- LCP/Speed Index improvement (secondary) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md new file mode 100644 index 00000000..72716b73 --- /dev/null +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -0,0 +1,347 @@ +# Streaming Response Optimization (Next.js Disabled) + +## Problem + +When Next.js is disabled, the publisher proxy buffers the entire response body +in memory before sending any bytes to the client. This creates two costs: + +1. **Latency** — The client receives zero bytes until the full response is + decompressed, rewritten, and recompressed. For a 222KB HTML page, this adds + hundreds of milliseconds to time-to-last-byte. +2. **Memory** — Peak memory holds ~4x the response size simultaneously + (compressed input + decompressed + processed output + recompressed output). + With WASM's ~16MB heap, this limits the size of pages we can proxy. + +## Scope + +**In scope**: All content types flowing through the publisher proxy path — HTML, +text/JSON, RSC Flight (`text/x-component`), and binary pass-through. Only when +Next.js is disabled (no post-processor requiring the full document). + +**Out of scope**: Concurrent origin+auction fetch, Next.js-enabled paths (these +require full-document post-processing by design), non-publisher routes (static +JS, auction, discovery). + +## Streaming Gate + +Before committing to `stream_to_client()`, check: + +1. Backend status is success (2xx). +2. For HTML content: `html_post_processors()` is empty — no registered + post-processors. Non-HTML content types (text/JSON, RSC Flight, binary) can + always stream regardless of post-processor registration, since + post-processors only apply to HTML. + +If either check fails for the given content type, fall back to the current +buffered path. This keeps the optimization transparent: same behavior for all +existing configurations, streaming only activates when safe. + +## Architecture + +Two implementation steps, each independently valuable and testable. + +### Step 1: Make the pipeline chunk-emitting + +Three changes to existing processors: + +#### A) `HtmlRewriterAdapter` — incremental streaming + +The current implementation accumulates the entire HTML document and processes it +on `is_last`. This is unnecessary — `lol_html::HtmlRewriter` supports +incremental `write()` calls and emits output via its `OutputSink` callback after +each chunk. + +Fix: create the rewriter eagerly in the constructor, use +`Rc>>` to share the output buffer between the sink and +`process_chunk()`, drain the buffer on every call instead of only on `is_last`. +The output buffer is drained _after_ each `rewriter.write()` returns, so the +`RefCell` borrow in the sink closure never overlaps with the drain borrow. + +Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op +since the `Settings` are consumed by the rewriter constructor. This matches +actual usage (one adapter per request). + +#### B) Chunk-based decompression for all compression paths + +`process_gzip_to_gzip` calls `read_to_end()` to decompress the entire body into +memory. The deflate and brotli keep-compression paths already use chunk-based +`process_through_compression()`. + +Fix: use the same `process_through_compression` pattern for gzip. + +Additionally, `decompress_and_process()` (used by `process_gzip_to_none`, +`process_deflate_to_none`, `process_brotli_to_none`) also calls +`read_to_end()`. These strip-compression paths must be converted to chunk-based +processing too — read decompressed chunks, process each, write uncompressed +output directly. + +Reference: `process_uncompressed` already implements the correct chunk-based +pattern (read loop → `process_chunk()` per chunk → `write_all()` → flush). The +compressed paths should follow the same structure. + +#### C) `process_through_compression` finalization — prerequisite for B + +`process_through_compression` currently calls `flush()` (with error +propagation) then `drop(encoder)` for finalization. The `flush()` only flushes +buffered data but does not write compression trailers/footers — `drop()` +handles finalization but silently swallows errors. Today this affects deflate +and brotli (which already use this path). The current `process_gzip_to_gzip` calls `encoder.finish()` explicitly — +but Step 1B moves gzip to `process_through_compression`, which would **regress** +gzip from working `finish()` to broken `drop()`. This fix prevents that +regression and also fixes the pre-existing issue for deflate/brotli. + +Fix: call `encoder.finish()` explicitly and propagate errors. This must land +before or with Step 1B. + +### Step 2: Stream response to client + +Change the publisher proxy path to use Fastly's `StreamingBody` API: + +1. Fetch from origin, receive response headers. +2. Validate status — if backend error, return buffered error response via + `send_to_client()`. +3. Check streaming gate — if `html_post_processors()` is non-empty, fall back + to buffered path. +4. Finalize all response headers. This requires reordering two things: + - **Synthetic ID/cookie headers**: today set _after_ body processing in + `handle_publisher_request`. Since they are body-independent (computed from + request cookies and consent context), move them _before_ streaming. + - **`finalize_response()`** (main.rs): today called _after_ `route_request` + returns, adding geo, version, staging, and operator headers. In the + streaming path, this must run _before_ `stream_to_client()` since the + publisher handler sends the response directly instead of returning it. +5. Remove `Content-Length` header — the final size is unknown after processing. + Fastly's `StreamingBody` sends the response using chunked transfer encoding + automatically. +6. Call `response.stream_to_client()` — headers sent to client immediately. +7. Pipe origin body through the streaming pipeline, writing chunks directly to + `StreamingBody`. +8. Call `finish()` on success; on error, log and drop (client sees truncated + response). + +For binary/non-text content: call `response.take_body()` then stream via +`io::copy(&mut body, &mut streaming_body)`. The `Body` type implements `Read` +and `StreamingBody` implements `Write`, so this streams the backend body to the +client without buffering the full content. Today binary responses skip +`take_body()` and return the response as-is — the streaming path needs to +explicitly take the body to pipe it through. + +#### Entry point change + +Migrate `main.rs` from `#[fastly::main]` to an undecorated `main()` with +`Request::from_client()`. No separate initialization call is needed — +`#[fastly::main]` is just syntactic sugar for `Request::from_client()` + +`Response::send_to_client()`. The migration is required because +`stream_to_client()` / `send_to_client()` are incompatible with +`#[fastly::main]`'s return-based model. + +```rust +fn main() { + let req = Request::from_client(); + match handle(req) { + Ok(()) => {} + Err(e) => to_error_response(&e).send_to_client(), + } +} +``` + +Note: the return type changes from `Result` to `()` (or +`Result<(), Error>`). Errors that currently propagate to `main`'s `Result` must +now be caught explicitly and sent via `send_to_client()` with +`to_error_response()`. + +Non-streaming routes (static, auction, discovery) use `send_to_client()` as +before. + +## Data Flow + +### Streaming path (HTML, text/JSON with processing) + +``` +Origin body (gzip) + → Read 8KB chunk from GzDecoder + → StreamProcessor::process_chunk(chunk, is_last) + → HtmlRewriterAdapter: lol_html.write(chunk) → sink emits rewritten bytes + → OR StreamingReplacer: URL replacement with overlap buffer + → GzEncoder::write(processed_chunk) → compressed bytes + → StreamingBody::write(compressed) → chunk sent to client + → repeat until EOF + → StreamingBody::finish() +``` + +Memory at steady state: ~8KB input chunk buffer, lol_html internal parser state, +gzip encoder window, and overlap buffer for replacer. Roughly constant regardless +of document size, versus the current ~4x document size. + +### Pass-through path (binary, images, fonts, etc.) + +``` +Origin body (via take_body()) + → io::copy(&mut body, &mut streaming_body) → streamed transfer + → StreamingBody::finish() +``` + +No decompression, no processing. Body streams through as read. + +### Buffered fallback path (error responses or post-processors present) + +``` +Origin returns 4xx/5xx OR html_post_processors() is non-empty + → Current buffered path unchanged + → send_to_client() with proper status and full body +``` + +## Error Handling + +**Backend returns error status**: Detected before calling `stream_to_client()`. +Return the backend response as-is via `send_to_client()`. Client sees the +correct error status code. No change from current behavior. + +**Processor creation fails**: `create_html_stream_processor()` or pipeline +construction errors happen _before_ `stream_to_client()` is called. Since +headers have not been sent yet, return a proper error response via +`send_to_client()`. Same as current behavior. + +**Processing fails mid-stream**: `lol_html` parse error, decompression +corruption, I/O error during chunk processing. Headers (200 OK) are already +sent. Log the error server-side, drop the `StreamingBody`. Per the Fastly SDK, +`StreamingBody` automatically aborts the response if dropped without calling +`finish()` — the client sees a connection reset / truncated response. This is +standard reverse proxy behavior. + +**Compression finalization fails**: The gzip trailer CRC32 write fails. With the +fix, `encoder.finish()` is called explicitly and errors propagate. Same +mid-stream handling — log and truncate. + +No retry logic. No fallback to buffered after streaming has started — once +headers are sent, we are committed. + +## Files Changed + +| File | Change | Risk | +| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | +| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High | +| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium | + +**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to +`HtmlRewriterAdapter`, works as-is), integration registration, JS build +pipeline, tsjs module serving, auction handler, cookie/synthetic ID logic. + +Note: `HtmlWithPostProcessing` wraps `HtmlRewriterAdapter` and applies +post-processors on `is_last`. In the streaming path the post-processor list is +empty (that's the gate condition), so the wrapper is a no-op passthrough. It +remains in place — no need to bypass it. + +Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from +`html_post_processors`. Script rewriters run inside `lol_html` element handlers +during streaming — they do not require buffering and are unaffected by this +change. The streaming gate checks only `html_post_processors().is_empty()`, not +script rewriters. Currently only Next.js registers a post-processor. + +## Rollback Strategy + +The `#[fastly::main]` to raw `main()` migration is a structural change. If +streaming causes issues in production, the fastest rollback is reverting the +`main.rs` change — the buffered path still exists and the pipeline improvements +(Step 1) are safe to keep regardless. No feature flag needed; a git revert of +the Step 2 commit restores buffered behavior while retaining Step 1 memory +improvements. + +## Testing Strategy + +### Unit tests (streaming_processor.rs) + +- `HtmlRewriterAdapter` emits output on every `process_chunk()` call, not just + `is_last`. +- `process_gzip_to_gzip` produces correct output without `read_to_end`. +- `encoder.finish()` errors propagate (not swallowed by `drop`). +- Multi-chunk HTML produces identical output to single-chunk processing. + +### Integration tests (publisher.rs) + +- Streaming gate: when `html_post_processors()` is non-empty, response is + buffered. +- Streaming gate: when `html_post_processors()` is empty, response streams. +- Backend error (4xx/5xx) returns buffered error response with correct status. +- Binary content passes through without processing. + +### End-to-end validation (Viceroy) + +- `cargo test --workspace` — all existing tests pass. +- Manual verification via `fastly compute serve` against a real origin. +- Compare response bodies before/after to confirm byte-identical output for + HTML, text, and binary. + +### Performance measurement via Chrome DevTools MCP + +Capture before/after metrics using Chrome DevTools MCP against Viceroy locally +and staging. Run each measurement set on `main` (baseline) and the feature +branch, then compare. + +#### Baseline capture (before — on `main`) + +1. Start local server: `fastly compute serve` +2. Navigate to publisher proxy URL via `navigate_page` +3. Capture network timing: + - `list_network_requests` — record TTFB (`responseStart - requestStart`), + total time (`responseEnd - requestStart`), and transfer size for the + document request + - Filter for the main document (`resourceType: Document`) +4. Run Lighthouse audit: + - `lighthouse_audit` with categories `["performance"]` + - Record TTFB, LCP, Speed Index, Total Blocking Time +5. Capture performance trace: + - `performance_start_trace` → load page → `performance_stop_trace` + - `performance_analyze_insight` — extract "Time to First Byte" and + "Network requests" insights +6. Take memory snapshot: + - `take_memory_snapshot` — record JS heap size as a secondary check + (WASM heap is measured separately via Fastly dashboard) +7. Repeat 3-5 times for stable medians + +#### Post-implementation capture (after — on feature branch) + +Repeat the same steps on the feature branch. Compare: + +| Metric | Source | Expected change | +|--------|--------|-----------------| +| TTFB (document) | Network timing | Minimal change (gated by backend response time) | +| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally | +| LCP | Lighthouse | Improved — browser receives `` resources sooner | +| Speed Index | Lighthouse | Improved — progressive rendering starts earlier | +| Transfer size | Network timing | Unchanged (same content, same compression) | +| Response body hash | `evaluate_script` with hash | Identical — correctness check | + +#### Automated comparison script + +Use `evaluate_script` to compute a response body hash in the browser for +correctness verification: + +```js +// Run via evaluate_script after page load +const response = await fetch(location.href); +const buffer = await response.arrayBuffer(); +const hash = await crypto.subtle.digest('SHA-256', buffer); +const hex = [...new Uint8Array(hash)].map(b => b.toString(16).padStart(2, '0')).join(''); +hex; // compare this between baseline and feature branch +``` + +#### What to watch for + +- **TTFB regression**: If TTFB increases, the header finalization reordering + may be adding latency. Investigate `finalize_response()` and synthetic ID + computation timing. +- **Body mismatch**: If response body hashes differ between baseline and + feature branch, the streaming pipeline is producing different output. + Bisect between Step 1 and Step 2 to isolate. +- **LCP unchanged**: If LCP doesn't improve, the `` content may not be + reaching the browser earlier. Check whether `lol_html` emits the `` + injection in the first chunk or buffers until more input arrives. + +### Measurement (post-deploy to staging) + +- Repeat Chrome DevTools MCP measurements against staging URL. +- Compare against Viceroy results to account for real network conditions. +- Monitor WASM heap usage via Fastly dashboard. +- Verify no regressions on static endpoints or auction.