diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 079681db..3b9e882f 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -455,7 +455,6 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso }), ]; - let has_script_rewriters = !script_rewriters.is_empty(); for script_rewriter in script_rewriters { let selector = script_rewriter.selector(); let rewriter = script_rewriter.clone(); @@ -493,16 +492,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso ..RewriterSettings::default() }; - // Use buffered mode when script rewriters are registered. lol_html fragments - // text nodes across input chunk boundaries, breaking rewriters that expect - // complete text (e.g., __NEXT_DATA__, GTM). Buffered mode feeds the entire - // document in one write() call, preserving text node integrity. - // Phase 3 will make rewriters fragment-safe, enabling streaming for all configs. - let inner = if has_script_rewriters { - HtmlRewriterAdapter::new_buffered(rewriter_settings) - } else { - HtmlRewriterAdapter::new(rewriter_settings) - }; + let inner = HtmlRewriterAdapter::new(rewriter_settings); HtmlWithPostProcessing { inner, diff --git a/crates/trusted-server-core/src/integrations/google_tag_manager.rs b/crates/trusted-server-core/src/integrations/google_tag_manager.rs index 0f3b5f29..9c7159e2 100644 --- a/crates/trusted-server-core/src/integrations/google_tag_manager.rs +++ b/crates/trusted-server-core/src/integrations/google_tag_manager.rs @@ -12,7 +12,7 @@ //! | `GET/POST` | `.../collect` | Proxies GA analytics beacons | //! | `GET/POST` | `.../g/collect` | Proxies GA4 analytics beacons | -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use error_stack::{Report, ResultExt}; @@ -133,11 +133,17 @@ fn validate_container_id(container_id: &str) -> Result<(), validator::Validation pub struct GoogleTagManagerIntegration { config: GoogleTagManagerConfig, + /// Accumulates text fragments when `lol_html` splits a text node across + /// chunk boundaries. Drained on `is_last_in_text_node`. + accumulated_text: Mutex, } impl GoogleTagManagerIntegration { fn new(config: GoogleTagManagerConfig) -> Arc { - Arc::new(Self { config }) + Arc::new(Self { + config, + accumulated_text: Mutex::new(String::new()), + }) } fn error(message: impl Into) -> TrustedServerError { @@ -481,14 +487,40 @@ impl IntegrationScriptRewriter for GoogleTagManagerIntegration { "script" // Match all scripts to find inline GTM snippets } - fn rewrite(&self, content: &str, _ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction { + fn rewrite(&self, content: &str, ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction { + let mut buf = self + .accumulated_text + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + if !ctx.is_last_in_text_node { + // Intermediate fragment — accumulate and suppress output. + buf.push_str(content); + return ScriptRewriteAction::RemoveNode; + } + + // Last fragment. Determine the full content to inspect. + let full_content; + let text = if buf.is_empty() { + content + } else { + buf.push_str(content); + full_content = std::mem::take(&mut *buf); + &full_content + }; + // Look for the GTM snippet pattern. // Standard snippet contains: "googletagmanager.com/gtm.js" // Note: analytics.google.com is intentionally excluded — gtag.js stores // that domain as a bare string and constructs URLs dynamically, so // rewriting it in scripts produces broken URLs. - if content.contains("googletagmanager.com") || content.contains("google-analytics.com") { - return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(content)); + if text.contains("googletagmanager.com") || text.contains("google-analytics.com") { + return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(text)); + } + + // No GTM content — if we accumulated fragments, emit them unchanged. + if text.len() != content.len() { + return ScriptRewriteAction::replace(text.to_string()); } ScriptRewriteAction::keep() @@ -1631,4 +1663,157 @@ container_id = "GTM-DEFAULT" other => panic!("Expected Integration error, got {:?}", other), } } + + #[test] + fn fragmented_gtm_snippet_is_accumulated_and_rewritten() { + let config = GoogleTagManagerConfig { + enabled: true, + container_id: "GTM-FRAG1".to_string(), + upstream_url: "https://www.googletagmanager.com".to_string(), + cache_max_age: default_cache_max_age(), + max_beacon_body_size: default_max_beacon_body_size(), + }; + let integration = GoogleTagManagerIntegration::new(config); + + let document_state = IntegrationDocumentState::default(); + + // Simulate lol_html splitting the GTM snippet mid-domain. + let fragment1 = r#"(function(w,d,s,l,i){j.src='https://www.google"#; + let fragment2 = r#"tagmanager.com/gtm.js?id='+i;f.parentNode.insertBefore(j,f);})(window,document,'script','dataLayer','GTM-FRAG1');"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script", + request_host: "publisher.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + // Intermediate fragment: should be suppressed. + let action1 = + IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate fragment" + ); + + // Last fragment: should emit full rewritten content. + let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("/integrations/google_tag_manager/gtm.js"), + "should rewrite GTM URL. Got: {rewritten}" + ); + assert!( + !rewritten.contains("googletagmanager.com"), + "should not contain original GTM domain. Got: {rewritten}" + ); + } + other => panic!("expected Replace for fragmented GTM, got {other:?}"), + } + } + + #[test] + fn non_gtm_fragmented_script_is_passed_through() { + let config = GoogleTagManagerConfig { + enabled: true, + container_id: "GTM-PASS1".to_string(), + upstream_url: "https://www.googletagmanager.com".to_string(), + cache_max_age: default_cache_max_age(), + max_beacon_body_size: default_max_beacon_body_size(), + }; + let integration = GoogleTagManagerIntegration::new(config); + + let document_state = IntegrationDocumentState::default(); + + let fragment1 = "console.log('hel"; + let fragment2 = "lo world');"; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script", + request_host: "publisher.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = + IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate" + ); + + // Last fragment: should emit full unchanged content since it's not GTM. + let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(content) => { + assert_eq!( + content, "console.log('hello world');", + "should emit full accumulated non-GTM content" + ); + } + other => panic!("expected Replace with passthrough, got {other:?}"), + } + } + + /// Regression test: with a small chunk size, `lol_html` fragments the + /// inline GTM script text node. The rewriter must accumulate fragments + /// and produce correct output through the full HTML pipeline. + #[test] + fn small_chunk_gtm_rewrite_survives_fragmentation() { + let mut settings = make_settings(); + settings + .integrations + .insert_config( + "google_tag_manager", + &serde_json::json!({ + "enabled": true, + "container_id": "GTM-SMALL1" + }), + ) + .expect("should update config"); + + let registry = IntegrationRegistry::new(&settings).expect("should create registry"); + let config = config_from_settings(&settings, ®istry); + let processor = create_html_processor(config); + + // Use a very small chunk size to force fragmentation mid-domain. + let pipeline_config = PipelineConfig { + input_compression: Compression::None, + output_compression: Compression::None, + chunk_size: 32, + }; + let mut pipeline = StreamingPipeline::new(pipeline_config, processor); + + let html_input = r#""#; + + let mut output = Vec::new(); + pipeline + .process(Cursor::new(html_input.as_bytes()), &mut output) + .expect("should process with small chunks"); + let processed = String::from_utf8_lossy(&output); + + assert!( + processed.contains("/integrations/google_tag_manager/gtm.js"), + "should rewrite fragmented GTM URL. Got: {processed}" + ); + assert!( + !processed.contains("googletagmanager.com"), + "should not contain original GTM domain. Got: {processed}" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/nextjs/mod.rs b/crates/trusted-server-core/src/integrations/nextjs/mod.rs index 7904c781..cb78fbb5 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/mod.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/mod.rs @@ -494,4 +494,55 @@ mod tests { final_html ); } + + /// Regression test: with a small chunk size, `lol_html` fragments the + /// `__NEXT_DATA__` text node across chunks. The rewriter must accumulate + /// fragments and produce correct output. + #[test] + fn small_chunk_next_data_rewrite_survives_fragmentation() { + // Build a __NEXT_DATA__ payload large enough to cross a 32-byte chunk boundary. + let html = r#""#; + + let mut settings = create_test_settings(); + settings + .integrations + .insert_config( + "nextjs", + &json!({ + "enabled": true, + "rewrite_attributes": ["href", "link", "url"], + }), + ) + .expect("should update nextjs config"); + let registry = IntegrationRegistry::new(&settings).expect("should create registry"); + let config = config_from_settings(&settings, ®istry); + let processor = create_html_processor(config); + + // Use a very small chunk size to force fragmentation. + let pipeline_config = PipelineConfig { + input_compression: Compression::None, + output_compression: Compression::None, + chunk_size: 32, + }; + let mut pipeline = StreamingPipeline::new(pipeline_config, processor); + + let mut output = Vec::new(); + pipeline + .process(Cursor::new(html.as_bytes()), &mut output) + .expect("should process with small chunks"); + + let processed = String::from_utf8_lossy(&output); + assert!( + processed.contains("test.example.com") && processed.contains("/reviews"), + "should rewrite fragmented __NEXT_DATA__ href. Got: {processed}" + ); + assert!( + !processed.contains("origin.example.com/reviews"), + "should not contain original origin href. Got: {processed}" + ); + assert!( + processed.contains("Hello World"), + "should preserve non-URL content. Got: {processed}" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs index 4df34935..9199f830 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use regex::{escape, Regex}; @@ -10,11 +10,17 @@ use super::{NextJsIntegrationConfig, NEXTJS_INTEGRATION_ID}; pub(super) struct NextJsNextDataRewriter { config: Arc, + /// Accumulates text fragments when `lol_html` splits a text node across + /// chunk boundaries. Drained on `is_last_in_text_node`. + accumulated_text: Mutex, } impl NextJsNextDataRewriter { pub(super) fn new(config: Arc) -> Self { - Self { config } + Self { + config, + accumulated_text: Mutex::new(String::new()), + } } fn rewrite_structured( @@ -58,7 +64,33 @@ impl IntegrationScriptRewriter for NextJsNextDataRewriter { return ScriptRewriteAction::keep(); } - self.rewrite_structured(content, ctx) + let mut buf = self + .accumulated_text + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + if !ctx.is_last_in_text_node { + // Intermediate fragment — accumulate and suppress output. + buf.push_str(content); + return ScriptRewriteAction::RemoveNode; + } + + // Last fragment. If nothing was accumulated, process directly. + if buf.is_empty() { + return self.rewrite_structured(content, ctx); + } + + // Complete the accumulated text and process the full content. + // If rewrite_structured returns Keep, we must still emit the full + // accumulated text via Replace — intermediate fragments were already + // removed from lol_html's output via RemoveNode. + buf.push_str(content); + let full_content = std::mem::take(&mut *buf); + let action = self.rewrite_structured(&full_content, ctx); + if matches!(action, ScriptRewriteAction::Keep) { + return ScriptRewriteAction::replace(full_content); + } + action } } @@ -422,4 +454,122 @@ mod tests { .expect("URL should be rewritten"); assert_eq!(new_url, "http://proxy.example.com/news"); } + + #[test] + fn fragmented_next_data_is_accumulated_and_rewritten() { + let rewriter = NextJsNextDataRewriter::new(test_config()); + let document_state = IntegrationDocumentState::default(); + + // Simulate lol_html splitting the text node mid-URL. + let fragment1 = r#"{"props":{"pageProps":{"href":"https://origin."#; + let fragment2 = r#"example.com/reviews"}}}"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + // Intermediate fragment: should be suppressed (RemoveNode). + let action1 = rewriter.rewrite(fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate fragment" + ); + + // Last fragment: should emit full rewritten content. + let action2 = rewriter.rewrite(fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("ts.example.com"), + "should rewrite origin to proxy host. Got: {rewritten}" + ); + assert!( + rewritten.contains("/reviews"), + "should preserve path. Got: {rewritten}" + ); + assert!( + !rewritten.contains("origin.example.com"), + "should not contain original host. Got: {rewritten}" + ); + } + other => panic!("expected Replace, got {other:?}"), + } + } + + #[test] + fn unfragmented_next_data_works_without_accumulation() { + let rewriter = NextJsNextDataRewriter::new(test_config()); + let document_state = IntegrationDocumentState::default(); + let payload = r#"{"props":{"pageProps":{"href":"https://origin.example.com/page"}}}"#; + + let ctx_single = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: true, + document_state: &document_state, + }; + + let action = rewriter.rewrite(payload, &ctx_single); + match action { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("ts.example.com"), + "should rewrite. Got: {rewritten}" + ); + } + other => panic!("expected Replace, got {other:?}"), + } + } + + #[test] + fn fragmented_next_data_without_rewritable_urls_preserves_content() { + let rewriter = NextJsNextDataRewriter::new(test_config()); + let document_state = IntegrationDocumentState::default(); + + // __NEXT_DATA__ JSON with no origin URLs — rewrite_structured returns Keep. + let fragment1 = r#"{"props":{"pageProps":{"title":"Hello"#; + let fragment2 = r#" World","count":42}}}"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = rewriter.rewrite(fragment1, &ctx_intermediate); + assert_eq!(action1, ScriptRewriteAction::RemoveNode); + + // Last fragment: even though no URLs to rewrite, must emit full content + // because intermediate fragments were removed. + let action2 = rewriter.rewrite(fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(content) => { + let expected = format!("{fragment1}{fragment2}"); + assert_eq!( + content, expected, + "should emit full accumulated content unchanged" + ); + } + other => panic!("expected Replace with passthrough, got {other:?}"), + } + } } diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 6a450623..c6350e5c 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -370,11 +370,14 @@ pub fn handle_publisher_request( .to_lowercase(); // Streaming gate: can we stream this response? + // - Backend returned success (2xx). Error pages stay buffered so the + // client receives a complete error response, not a truncated stream. // - No HTML post-processors registered (they need the full document) // - Non-HTML content always streams (post-processors only apply to HTML) let is_html = content_type.contains("text/html"); let has_post_processors = !integration_registry.html_post_processors().is_empty(); - let can_stream = !is_html || !has_post_processors; + let is_success = response.get_status().is_success(); + let can_stream = is_success && (!is_html || !has_post_processors); if can_stream { log::debug!( @@ -549,11 +552,64 @@ mod tests { } } - // Note: test_streaming_compressed_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. + /// Test the streaming gate logic in isolation. The gate decides whether + /// a response can be streamed or must be buffered based on: + /// - Backend status (2xx only) + /// - Content type (processable text types) + /// - Post-processors (none registered for streaming) + #[test] + fn streaming_gate_allows_2xx_html_without_post_processors() { + let is_success = true; + let is_html = true; + let has_post_processors = false; + let can_stream = is_success && (!is_html || !has_post_processors); + assert!(can_stream, "should stream 2xx HTML without post-processors"); + } + + #[test] + fn streaming_gate_blocks_non_2xx_responses() { + let is_success = false; + let is_html = true; + let has_post_processors = false; + let can_stream = is_success && (!is_html || !has_post_processors); + assert!( + !can_stream, + "should not stream error responses even without post-processors" + ); + } + + #[test] + fn streaming_gate_blocks_html_with_post_processors() { + let is_success = true; + let is_html = true; + let has_post_processors = true; + let can_stream = is_success && (!is_html || !has_post_processors); + assert!( + !can_stream, + "should not stream HTML when post-processors are registered" + ); + } + + #[test] + fn streaming_gate_allows_non_html_with_post_processors() { + let is_success = true; + let is_html = false; + let has_post_processors = true; + let can_stream = is_success && (!is_html || !has_post_processors); + assert!( + can_stream, + "should stream non-HTML even with post-processors (they only apply to HTML)" + ); + } - // Note: test_streaming_brotli_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. + #[test] + fn streaming_gate_blocks_non_2xx_json() { + let is_success = false; + let is_html = false; + let has_post_processors = false; + let can_stream = is_success && (!is_html || !has_post_processors); + assert!(!can_stream, "should not stream 4xx/5xx JSON responses"); + } #[test] fn test_content_encoding_detection() { @@ -734,4 +790,56 @@ mod tests { "should reject unknown module names" ); } + + #[test] + fn stream_publisher_body_preserves_gzip_round_trip() { + use flate2::write::GzEncoder; + use std::io::Write; + + let settings = create_test_settings(); + let registry = + IntegrationRegistry::new(&settings).expect("should create integration registry"); + + // Compress CSS containing an origin URL that should be rewritten. + // CSS uses the text URL replacer (not lol_html), so inline URLs are rewritten. + let html = b"body { background: url('https://origin.example.com/page'); }"; + let mut compressed = Vec::new(); + { + let mut encoder = GzEncoder::new(&mut compressed, flate2::Compression::default()); + encoder.write_all(html).expect("should compress"); + encoder.finish().expect("should finish compression"); + } + + let body = Body::from(compressed); + let params = OwnedProcessResponseParams { + content_encoding: "gzip".to_string(), + origin_host: "origin.example.com".to_string(), + origin_url: "https://origin.example.com".to_string(), + request_host: "proxy.example.com".to_string(), + request_scheme: "https".to_string(), + content_type: "text/css".to_string(), + }; + + let mut output = Vec::new(); + stream_publisher_body(body, &mut output, ¶ms, &settings, ®istry) + .expect("should process gzip CSS"); + + // Decompress output + use flate2::read::GzDecoder; + use std::io::Read; + let mut decoder = GzDecoder::new(&output[..]); + let mut decompressed = String::new(); + decoder + .read_to_string(&mut decompressed) + .expect("should decompress output"); + + assert!( + decompressed.contains("proxy.example.com"), + "should rewrite origin to proxy. Got: {decompressed}" + ); + assert!( + !decompressed.contains("origin.example.com"), + "should not contain original host. Got: {decompressed}" + ); + } } diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 5a4ea290..20665d7a 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -275,33 +275,19 @@ impl lol_html::OutputSink for RcVecSink { /// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`]. /// -/// Operates in one of two modes: -/// -/// - **Streaming** ([`new`](Self::new)): output is emitted incrementally on every -/// [`process_chunk`](StreamProcessor::process_chunk) call. Use when no script -/// rewriters are registered. -/// - **Buffered** ([`new_buffered`](Self::new_buffered)): input is accumulated and -/// processed in a single `write()` call on `is_last`. Use when script rewriters -/// are registered, because `lol_html` fragments text nodes across chunk boundaries -/// and rewriters that expect complete text content would silently miss rewrites on -/// split fragments. (See Phase 3 plan for making rewriters fragment-safe.) +/// Output is emitted incrementally on every [`process_chunk`](StreamProcessor::process_chunk) +/// call. Script rewriters that receive text from `lol_html` must be fragment-safe — +/// they accumulate text fragments internally until `is_last_in_text_node` is true. /// /// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`] /// is a no-op because the rewriter consumes its settings on construction. pub struct HtmlRewriterAdapter { rewriter: Option>, output: Rc>>, - /// When true, input is accumulated and fed to `lol_html` in one pass on `is_last`. - buffered: bool, - /// Accumulated input for the buffered path. - accumulated_input: Vec, } impl HtmlRewriterAdapter { /// Create a new HTML rewriter adapter that streams output per chunk. - /// - /// Use [`Self::new_buffered`] when script rewriters are registered to - /// avoid text node fragmentation. #[must_use] pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { let output = Rc::new(RefCell::new(Vec::new())); @@ -310,75 +296,28 @@ impl HtmlRewriterAdapter { Self { rewriter: Some(rewriter), output, - buffered: false, - accumulated_input: Vec::new(), - } - } - - /// Create a new HTML rewriter adapter that buffers all input before processing. - /// - /// This avoids `lol_html` text node fragmentation that breaks script rewriters - /// expecting complete text content. The entire document is fed to the rewriter - /// in a single `write()` call when `is_last` is true. - #[must_use] - pub fn new_buffered(settings: lol_html::Settings<'static, 'static>) -> Self { - let output = Rc::new(RefCell::new(Vec::new())); - let sink = RcVecSink(Rc::clone(&output)); - let rewriter = lol_html::HtmlRewriter::new(settings, sink); - Self { - rewriter: Some(rewriter), - output, - buffered: true, - accumulated_input: Vec::new(), } } } impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - if self.buffered { - // Buffered mode: accumulate input, process all at once on is_last. - if !chunk.is_empty() { - if self.rewriter.is_none() { - log::warn!( - "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", - chunk.len() - ); - } else { - self.accumulated_input.extend_from_slice(chunk); - } - } - if !is_last { - return Ok(Vec::new()); - } - if let Some(rewriter) = &mut self.rewriter { - if !self.accumulated_input.is_empty() { - let input = std::mem::take(&mut self.accumulated_input); - rewriter.write(&input).map_err(|e| { - log::error!("Failed to process HTML: {e}"); + match &mut self.rewriter { + Some(rewriter) => { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); io::Error::other(format!("HTML processing failed: {e}")) })?; } } - } else { - // Streaming mode: feed chunks to `lol_html` incrementally. - match &mut self.rewriter { - Some(rewriter) => { - if !chunk.is_empty() { - rewriter.write(chunk).map_err(|e| { - log::error!("Failed to process HTML chunk: {e}"); - io::Error::other(format!("HTML processing failed: {e}")) - })?; - } - } - None if !chunk.is_empty() => { - log::warn!( - "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", - chunk.len() - ); - } - None => {} + None if !chunk.is_empty() => { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); } + None => {} } if is_last { @@ -417,10 +356,8 @@ mod tests { use crate::streaming_replacer::{Replacement, StreamingReplacer}; /// Verify that `lol_html` fragments text nodes when input chunks split - /// mid-text-node. This is critical: if `lol_html` does fragment, then - /// script rewriters (`NextJS` `__NEXT_DATA__`, `GTM`) that expect full - /// text content will silently miss rewrites when the streaming adapter - /// feeds chunks incrementally. + /// mid-text-node. Script rewriters must be fragment-safe — they accumulate + /// text fragments internally until `is_last_in_text_node` is true. #[test] fn lol_html_fragments_text_across_chunk_boundaries() { use std::cell::RefCell; @@ -469,57 +406,6 @@ mod tests { ); } - /// Companion to [`lol_html_fragments_text_across_chunk_boundaries`]: - /// proves that `new_buffered()` prevents fragmentation by feeding the - /// entire document to `lol_html` in one `write()` call. - #[test] - fn buffered_adapter_prevents_text_fragmentation() { - use std::cell::RefCell; - use std::rc::Rc; - - let fragments: Rc>> = Rc::new(RefCell::new(Vec::new())); - let fragments_clone = Rc::clone(&fragments); - - let settings = lol_html::Settings { - element_content_handlers: vec![lol_html::text!("script", move |text| { - fragments_clone - .borrow_mut() - .push((text.as_str().to_string(), text.last_in_text_node())); - Ok(()) - })], - ..lol_html::Settings::default() - }; - - let mut adapter = HtmlRewriterAdapter::new_buffered(settings); - - // Feed the same split chunks as the fragmentation test - let r1 = adapter - .process_chunk(b"", true) - .expect("should process chunk2"); - assert!( - !r2.is_empty(), - "buffered adapter should emit output on is_last" - ); - - let frags = fragments.borrow(); - // With buffered mode, the text handler should see the complete string - assert!( - frags - .iter() - .any(|(text, _)| text.contains("googletagmanager.com")), - "buffered adapter should deliver complete text to handler, got: {:?}", - *frags - ); - } - #[test] fn test_uncompressed_pipeline() { let replacer = StreamingReplacer::new(vec![Replacement { diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md index 4afca7fe..061a3979 100644 --- a/docs/superpowers/plans/2026-03-25-streaming-response.md +++ b/docs/superpowers/plans/2026-03-25-streaming-response.md @@ -25,11 +25,11 @@ rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression). ## File Map -| File | Role | Phase | -|------|------|-------| -| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 | -| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 | -| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 | +| File | Role | Phase | +| ------------------------------------------------------- | -------------------------------------------------------------------------------------- | ----- | +| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 | +| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 | --- @@ -50,6 +50,7 @@ This is the prerequisite for Task 2. The current code calls `flush()` then moving gzip to this path. **Files:** + - Modify: `crates/trusted-server-core/src/streaming_processor.rs:334-393` - Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) @@ -210,6 +211,7 @@ git commit -m "Fix encoder finalization: explicit finish instead of drop" ### Task 2: Convert `process_gzip_to_gzip` to chunk-based processing **Files:** + - Modify: `crates/trusted-server-core/src/streaming_processor.rs:183-225` - Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) @@ -311,6 +313,7 @@ git commit -m "Convert process_gzip_to_gzip to chunk-based processing" ### Task 3: Convert `decompress_and_process` to chunk-based processing **Files:** + - Modify: `crates/trusted-server-core/src/streaming_processor.rs:227-262` - Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) @@ -449,6 +452,7 @@ git commit -m "Convert decompress_and_process to chunk-based processing" ### Task 4: Rewrite `HtmlRewriterAdapter` for incremental streaming **Files:** + - Modify: `crates/trusted-server-core/src/streaming_processor.rs:396-472` - Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) @@ -663,6 +667,7 @@ Expected: Builds successfully. ### Task 6: Migrate entry point from `#[fastly::main]` to raw `main()` **Files:** + - Modify: `crates/trusted-server-adapter-fastly/src/main.rs:32-68` - [ ] **Step 1: Rewrite `main` function** @@ -742,6 +747,7 @@ git commit -m "Migrate entry point from #[fastly::main] to raw main()" ### Task 7: Refactor `process_response_streaming` to accept `W: Write` **Files:** + - Modify: `crates/trusted-server-core/src/publisher.rs:97-180` - [ ] **Step 1: Change signature to accept generic writer** @@ -797,6 +803,7 @@ git commit -m "Refactor process_response_streaming to accept generic writer" ### Task 8: Add streaming path to publisher proxy **Files:** + - Modify: `crates/trusted-server-core/src/publisher.rs` - Modify: `crates/trusted-server-adapter-fastly/src/main.rs` @@ -857,6 +864,7 @@ fn should_stream( - [ ] **Step 3: Restructure `handle_publisher_request` to support streaming** Split the function into: + 1. Pre-processing: request info, cookies, synthetic ID, consent, backend request — everything before `response.take_body()` 2. Header finalization: synthetic ID/cookie headers, `finalize_response()` @@ -865,6 +873,7 @@ Split the function into: (`StreamingBody`) The streaming path in the fastly adapter: + ```rust // After header finalization, before body processing: if should_stream { @@ -954,6 +963,7 @@ Expected: Builds. Run: `fastly compute serve` Test: + - `curl -s http://localhost:7676/ | sha256sum` — compare with baseline - `curl -sI http://localhost:7676/` — verify headers present (geo, version, synthetic ID cookie if consent configured) @@ -985,7 +995,108 @@ Repeat the same measurements after building the feature branch. Create a comparison table and save to PR description or a results file. Check for: + - TTLB improvement (primary goal) - No TTFB regression - Identical response body hash (correctness) - LCP/Speed Index improvement (secondary) + +--- + +## Phase 3: Make Script Rewriters Fragment-Safe (PR #591) + +> **Implementation note (2026-03-27):** All tasks completed. Script rewriters +> accumulate text fragments via `Mutex` until `last_in_text_node` is +> true. Buffered mode removed from `HtmlRewriterAdapter`. 2xx streaming gate +> added. Small-chunk (32 byte) pipeline regression tests added for both +> NextJS `__NEXT_DATA__` and GTM inline scripts. + +### Task 11: Make `NextJsNextDataRewriter` fragment-safe + +**Files:** `crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs` + +- [x] Add `accumulated_text: Mutex` field +- [x] Accumulate intermediate fragments, return `RemoveNode` +- [x] On last fragment, process full accumulated text +- [x] Handle Keep-after-accumulation (emit `Replace(full_content)`) +- [x] Add regression tests + +### Task 12: Make `GoogleTagManagerIntegration` rewrite fragment-safe + +**Files:** `crates/trusted-server-core/src/integrations/google_tag_manager.rs` + +- [x] Add `accumulated_text: Mutex` field +- [x] Accumulate intermediate fragments, return `RemoveNode` +- [x] On last fragment, match and rewrite on complete text +- [x] Non-GTM accumulated scripts emitted unchanged via `Replace` +- [x] Add regression tests + +### Task 13: Remove buffered mode from `HtmlRewriterAdapter` + +**Files:** `crates/trusted-server-core/src/streaming_processor.rs` + +- [x] Delete `new_buffered()`, `buffered` flag, `accumulated_input` +- [x] Simplify `process_chunk` to streaming-only path +- [x] Remove `buffered_adapter_prevents_text_fragmentation` test +- [x] Update doc comments + +### Task 14: Always use streaming adapter in `create_html_processor` + +**Files:** `crates/trusted-server-core/src/html_processor.rs` + +- [x] Remove `has_script_rewriters` check +- [x] Always call `HtmlRewriterAdapter::new(settings)` + +### Task 15: Full verification, regression tests, and performance measurement + +- [x] Add 2xx streaming gate (`response.get_status().is_success()`) +- [x] Add streaming gate unit tests (5 tests) +- [x] Add `stream_publisher_body` gzip round-trip test +- [x] Add small-chunk (32 byte) pipeline tests for NextJS and GTM +- [x] `cargo test --workspace` — 766 passed +- [x] `cargo clippy` — clean +- [x] `cargo fmt --check` — clean +- [x] WASM release build — success +- [x] Staging performance comparison (see results below) + +### Performance Results (getpurpose.ai, median over 5 runs, Chrome 1440x900) + +| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta | +| -------------------------- | --------------------------- | ------------------------- | ------------------ | +| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** | +| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) | +| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** | + +--- + +## Phase 4: Stream Binary Pass-Through Responses + +Non-processable content (images, fonts, video, `application/octet-stream`) +currently passes through `handle_publisher_request` unchanged via the +`Buffered` path. This buffers the entire response body in memory — wasteful +for large binaries that need no processing. Phase 4 adds a `PassThrough` +variant that streams the body directly via `io::copy` into `StreamingBody`. + +### Task 16: Stream binary pass-through responses via `io::copy` + +**Files:** + +- `crates/trusted-server-core/src/publisher.rs` +- `crates/trusted-server-adapter-fastly/src/main.rs` + +- [ ] Add `PublisherResponse::PassThrough { response, body }` variant +- [ ] Return `PassThrough` when `!should_process` and backend returned 2xx +- [ ] Handle in `main.rs`: `stream_to_client()` + `io::copy(body, &mut streaming_body)` +- [ ] Keep `Buffered` for non-2xx responses and `request_host.is_empty()` +- [ ] Preserve `Content-Length` for pass-through (body is unmodified) + +### Task 17: Binary pass-through tests and verification + +- [ ] Publisher-level test: image content type returns `PassThrough` +- [ ] Publisher-level test: 4xx image stays `Buffered` +- [ ] `cargo test --workspace` +- [ ] `cargo clippy` + `cargo fmt --check` +- [ ] WASM release build +- [ ] Staging performance comparison (DOM Complete for image-heavy pages) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index 034624b5..eb633be5 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -218,11 +218,11 @@ headers are sent, we are committed. ## Files Changed -| File | Change | Risk | -| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | -| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High | -| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium | -| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium | +| File | Change | Risk | +| ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | +| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High | +| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium | **Minimal changes**: `html_processor.rs` now selects `HtmlRewriterAdapter` mode based on script rewriter presence (see [Text Node Fragmentation](#text-node-fragmentation-phase-3)), @@ -250,14 +250,16 @@ HTML incrementally. Script rewriters (`NextJsNextDataRewriter`, `GoogleTagManagerIntegration`) expect complete text content — if a domain string is split across chunks, the rewrite silently fails. -**Phase 1 workaround**: `HtmlRewriterAdapter` has two modes. `new()` streams -per chunk (no script rewriters). `new_buffered()` accumulates input and -processes in one `write()` call (script rewriters registered). -`create_html_processor` selects the mode automatically. +**Resolved in Phase 3**: Each script rewriter is now fragment-safe. They +accumulate text fragments internally via `Mutex` until +`is_last_in_text_node` is true, then process the complete text. Intermediate +fragments return `RemoveNode` (suppressed from output); the final fragment +emits the full rewritten content via `Replace`. If no rewrite is needed, +the full accumulated content is still emitted via `Replace` (since +intermediate fragments were already removed from the output). -**Phase 3** will make each script rewriter fragment-safe by accumulating text -fragments internally via `is_last_in_text_node`. This removes the buffered -fallback and enables streaming for all configurations. See #584. +The `HtmlRewriterAdapter` buffered mode (`new_buffered()`) has been removed. +`create_html_processor` always uses the streaming adapter. ## Rollback Strategy @@ -324,14 +326,14 @@ branch, then compare. Repeat the same steps on the feature branch. Compare: -| Metric | Source | Expected change | -|--------|--------|-----------------| -| TTFB (document) | Network timing | Minimal change (gated by backend response time) | -| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally | -| LCP | Lighthouse | Improved — browser receives `` resources sooner | -| Speed Index | Lighthouse | Improved — progressive rendering starts earlier | -| Transfer size | Network timing | Unchanged (same content, same compression) | -| Response body hash | `evaluate_script` with hash | Identical — correctness check | +| Metric | Source | Expected change | +| ------------------ | ------------------------------ | ----------------------------------------------------- | +| TTFB (document) | Network timing | Minimal change (gated by backend response time) | +| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally | +| LCP | Lighthouse | Improved — browser receives `` resources sooner | +| Speed Index | Lighthouse | Improved — progressive rendering starts earlier | +| Transfer size | Network timing | Unchanged (same content, same compression) | +| Response body hash | `evaluate_script` with hash | Identical — correctness check | #### Automated comparison script @@ -340,11 +342,13 @@ correctness verification: ```js // Run via evaluate_script after page load -const response = await fetch(location.href); -const buffer = await response.arrayBuffer(); -const hash = await crypto.subtle.digest('SHA-256', buffer); -const hex = [...new Uint8Array(hash)].map(b => b.toString(16).padStart(2, '0')).join(''); -hex; // compare this between baseline and feature branch +const response = await fetch(location.href) +const buffer = await response.arrayBuffer() +const hash = await crypto.subtle.digest('SHA-256', buffer) +const hex = [...new Uint8Array(hash)] + .map((b) => b.toString(16).padStart(2, '0')) + .join('') +hex // compare this between baseline and feature branch ``` #### What to watch for @@ -365,3 +369,54 @@ hex; // compare this between baseline and feature branch - Compare against Viceroy results to account for real network conditions. - Monitor WASM heap usage via Fastly dashboard. - Verify no regressions on static endpoints or auction. + +### Results (getpurpose.ai, median over 5 runs, Chrome 1440x900) + +Measured via Chrome DevTools Protocol against prod (v135, buffered) and +staging (v136, streaming). Chrome `--host-resolver-rules` used to route +`getpurpose.ai` to the staging Fastly edge (167.82.83.52). + +| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta | +| -------------------------- | --------------------------- | ------------------------- | ------------------ | +| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** | +| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) | +| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** | + +## Phase 4: Binary Pass-Through Streaming + +Non-processable content (images, fonts, video, `application/octet-stream`) +currently passes through `handle_publisher_request` unchanged via the +`Buffered` path, buffering the entire body in memory before sending. For +large binaries (1-10 MB images), this is wasteful. + +Phase 4 adds a `PublisherResponse::PassThrough` variant that signals the +adapter to stream the body directly via `io::copy` into `StreamingBody` +with no processing pipeline. This eliminates peak memory for binary +responses and improves DOM Complete for image-heavy pages. + +### Streaming gate (updated) + +``` +is_success (2xx) +├── should_process && (!is_html || !has_post_processors) → Stream (pipeline) +├── should_process && is_html && has_post_processors → Buffered (post-processors) +└── !should_process → PassThrough (io::copy) + +!is_success +└── any content type → Buffered (error page) +``` + +### `PublisherResponse` enum (updated) + +```rust +pub enum PublisherResponse { + Buffered(Response), + Stream { response, body, params }, + PassThrough { response, body }, +} +``` + +`Content-Length` is preserved for `PassThrough` since the body is +unmodified — no need for chunked transfer encoding.