diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index 079681db..3b9e882f 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -455,7 +455,6 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
}),
];
- let has_script_rewriters = !script_rewriters.is_empty();
for script_rewriter in script_rewriters {
let selector = script_rewriter.selector();
let rewriter = script_rewriter.clone();
@@ -493,16 +492,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
..RewriterSettings::default()
};
- // Use buffered mode when script rewriters are registered. lol_html fragments
- // text nodes across input chunk boundaries, breaking rewriters that expect
- // complete text (e.g., __NEXT_DATA__, GTM). Buffered mode feeds the entire
- // document in one write() call, preserving text node integrity.
- // Phase 3 will make rewriters fragment-safe, enabling streaming for all configs.
- let inner = if has_script_rewriters {
- HtmlRewriterAdapter::new_buffered(rewriter_settings)
- } else {
- HtmlRewriterAdapter::new(rewriter_settings)
- };
+ let inner = HtmlRewriterAdapter::new(rewriter_settings);
HtmlWithPostProcessing {
inner,
diff --git a/crates/trusted-server-core/src/integrations/google_tag_manager.rs b/crates/trusted-server-core/src/integrations/google_tag_manager.rs
index 0f3b5f29..9c7159e2 100644
--- a/crates/trusted-server-core/src/integrations/google_tag_manager.rs
+++ b/crates/trusted-server-core/src/integrations/google_tag_manager.rs
@@ -12,7 +12,7 @@
//! | `GET/POST` | `.../collect` | Proxies GA analytics beacons |
//! | `GET/POST` | `.../g/collect` | Proxies GA4 analytics beacons |
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use error_stack::{Report, ResultExt};
@@ -133,11 +133,17 @@ fn validate_container_id(container_id: &str) -> Result<(), validator::Validation
pub struct GoogleTagManagerIntegration {
config: GoogleTagManagerConfig,
+ /// Accumulates text fragments when `lol_html` splits a text node across
+ /// chunk boundaries. Drained on `is_last_in_text_node`.
+ accumulated_text: Mutex,
}
impl GoogleTagManagerIntegration {
fn new(config: GoogleTagManagerConfig) -> Arc {
- Arc::new(Self { config })
+ Arc::new(Self {
+ config,
+ accumulated_text: Mutex::new(String::new()),
+ })
}
fn error(message: impl Into) -> TrustedServerError {
@@ -481,14 +487,40 @@ impl IntegrationScriptRewriter for GoogleTagManagerIntegration {
"script" // Match all scripts to find inline GTM snippets
}
- fn rewrite(&self, content: &str, _ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction {
+ fn rewrite(&self, content: &str, ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction {
+ let mut buf = self
+ .accumulated_text
+ .lock()
+ .unwrap_or_else(std::sync::PoisonError::into_inner);
+
+ if !ctx.is_last_in_text_node {
+ // Intermediate fragment — accumulate and suppress output.
+ buf.push_str(content);
+ return ScriptRewriteAction::RemoveNode;
+ }
+
+ // Last fragment. Determine the full content to inspect.
+ let full_content;
+ let text = if buf.is_empty() {
+ content
+ } else {
+ buf.push_str(content);
+ full_content = std::mem::take(&mut *buf);
+ &full_content
+ };
+
// Look for the GTM snippet pattern.
// Standard snippet contains: "googletagmanager.com/gtm.js"
// Note: analytics.google.com is intentionally excluded — gtag.js stores
// that domain as a bare string and constructs URLs dynamically, so
// rewriting it in scripts produces broken URLs.
- if content.contains("googletagmanager.com") || content.contains("google-analytics.com") {
- return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(content));
+ if text.contains("googletagmanager.com") || text.contains("google-analytics.com") {
+ return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(text));
+ }
+
+ // No GTM content — if we accumulated fragments, emit them unchanged.
+ if text.len() != content.len() {
+ return ScriptRewriteAction::replace(text.to_string());
}
ScriptRewriteAction::keep()
@@ -1631,4 +1663,157 @@ container_id = "GTM-DEFAULT"
other => panic!("Expected Integration error, got {:?}", other),
}
}
+
+ #[test]
+ fn fragmented_gtm_snippet_is_accumulated_and_rewritten() {
+ let config = GoogleTagManagerConfig {
+ enabled: true,
+ container_id: "GTM-FRAG1".to_string(),
+ upstream_url: "https://www.googletagmanager.com".to_string(),
+ cache_max_age: default_cache_max_age(),
+ max_beacon_body_size: default_max_beacon_body_size(),
+ };
+ let integration = GoogleTagManagerIntegration::new(config);
+
+ let document_state = IntegrationDocumentState::default();
+
+ // Simulate lol_html splitting the GTM snippet mid-domain.
+ let fragment1 = r#"(function(w,d,s,l,i){j.src='https://www.google"#;
+ let fragment2 = r#"tagmanager.com/gtm.js?id='+i;f.parentNode.insertBefore(j,f);})(window,document,'script','dataLayer','GTM-FRAG1');"#;
+
+ let ctx_intermediate = IntegrationScriptContext {
+ selector: "script",
+ request_host: "publisher.example.com",
+ request_scheme: "https",
+ origin_host: "origin.example.com",
+ is_last_in_text_node: false,
+ document_state: &document_state,
+ };
+ let ctx_last = IntegrationScriptContext {
+ is_last_in_text_node: true,
+ ..ctx_intermediate
+ };
+
+ // Intermediate fragment: should be suppressed.
+ let action1 =
+ IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate);
+ assert_eq!(
+ action1,
+ ScriptRewriteAction::RemoveNode,
+ "should suppress intermediate fragment"
+ );
+
+ // Last fragment: should emit full rewritten content.
+ let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last);
+ match action2 {
+ ScriptRewriteAction::Replace(rewritten) => {
+ assert!(
+ rewritten.contains("/integrations/google_tag_manager/gtm.js"),
+ "should rewrite GTM URL. Got: {rewritten}"
+ );
+ assert!(
+ !rewritten.contains("googletagmanager.com"),
+ "should not contain original GTM domain. Got: {rewritten}"
+ );
+ }
+ other => panic!("expected Replace for fragmented GTM, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn non_gtm_fragmented_script_is_passed_through() {
+ let config = GoogleTagManagerConfig {
+ enabled: true,
+ container_id: "GTM-PASS1".to_string(),
+ upstream_url: "https://www.googletagmanager.com".to_string(),
+ cache_max_age: default_cache_max_age(),
+ max_beacon_body_size: default_max_beacon_body_size(),
+ };
+ let integration = GoogleTagManagerIntegration::new(config);
+
+ let document_state = IntegrationDocumentState::default();
+
+ let fragment1 = "console.log('hel";
+ let fragment2 = "lo world');";
+
+ let ctx_intermediate = IntegrationScriptContext {
+ selector: "script",
+ request_host: "publisher.example.com",
+ request_scheme: "https",
+ origin_host: "origin.example.com",
+ is_last_in_text_node: false,
+ document_state: &document_state,
+ };
+ let ctx_last = IntegrationScriptContext {
+ is_last_in_text_node: true,
+ ..ctx_intermediate
+ };
+
+ let action1 =
+ IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate);
+ assert_eq!(
+ action1,
+ ScriptRewriteAction::RemoveNode,
+ "should suppress intermediate"
+ );
+
+ // Last fragment: should emit full unchanged content since it's not GTM.
+ let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last);
+ match action2 {
+ ScriptRewriteAction::Replace(content) => {
+ assert_eq!(
+ content, "console.log('hello world');",
+ "should emit full accumulated non-GTM content"
+ );
+ }
+ other => panic!("expected Replace with passthrough, got {other:?}"),
+ }
+ }
+
+ /// Regression test: with a small chunk size, `lol_html` fragments the
+ /// inline GTM script text node. The rewriter must accumulate fragments
+ /// and produce correct output through the full HTML pipeline.
+ #[test]
+ fn small_chunk_gtm_rewrite_survives_fragmentation() {
+ let mut settings = make_settings();
+ settings
+ .integrations
+ .insert_config(
+ "google_tag_manager",
+ &serde_json::json!({
+ "enabled": true,
+ "container_id": "GTM-SMALL1"
+ }),
+ )
+ .expect("should update config");
+
+ let registry = IntegrationRegistry::new(&settings).expect("should create registry");
+ let config = config_from_settings(&settings, ®istry);
+ let processor = create_html_processor(config);
+
+ // Use a very small chunk size to force fragmentation mid-domain.
+ let pipeline_config = PipelineConfig {
+ input_compression: Compression::None,
+ output_compression: Compression::None,
+ chunk_size: 32,
+ };
+ let mut pipeline = StreamingPipeline::new(pipeline_config, processor);
+
+ let html_input = r#""#;
+
+ let mut output = Vec::new();
+ pipeline
+ .process(Cursor::new(html_input.as_bytes()), &mut output)
+ .expect("should process with small chunks");
+ let processed = String::from_utf8_lossy(&output);
+
+ assert!(
+ processed.contains("/integrations/google_tag_manager/gtm.js"),
+ "should rewrite fragmented GTM URL. Got: {processed}"
+ );
+ assert!(
+ !processed.contains("googletagmanager.com"),
+ "should not contain original GTM domain. Got: {processed}"
+ );
+ }
}
diff --git a/crates/trusted-server-core/src/integrations/nextjs/mod.rs b/crates/trusted-server-core/src/integrations/nextjs/mod.rs
index 7904c781..cb78fbb5 100644
--- a/crates/trusted-server-core/src/integrations/nextjs/mod.rs
+++ b/crates/trusted-server-core/src/integrations/nextjs/mod.rs
@@ -494,4 +494,55 @@ mod tests {
final_html
);
}
+
+ /// Regression test: with a small chunk size, `lol_html` fragments the
+ /// `__NEXT_DATA__` text node across chunks. The rewriter must accumulate
+ /// fragments and produce correct output.
+ #[test]
+ fn small_chunk_next_data_rewrite_survives_fragmentation() {
+ // Build a __NEXT_DATA__ payload large enough to cross a 32-byte chunk boundary.
+ let html = r#""#;
+
+ let mut settings = create_test_settings();
+ settings
+ .integrations
+ .insert_config(
+ "nextjs",
+ &json!({
+ "enabled": true,
+ "rewrite_attributes": ["href", "link", "url"],
+ }),
+ )
+ .expect("should update nextjs config");
+ let registry = IntegrationRegistry::new(&settings).expect("should create registry");
+ let config = config_from_settings(&settings, ®istry);
+ let processor = create_html_processor(config);
+
+ // Use a very small chunk size to force fragmentation.
+ let pipeline_config = PipelineConfig {
+ input_compression: Compression::None,
+ output_compression: Compression::None,
+ chunk_size: 32,
+ };
+ let mut pipeline = StreamingPipeline::new(pipeline_config, processor);
+
+ let mut output = Vec::new();
+ pipeline
+ .process(Cursor::new(html.as_bytes()), &mut output)
+ .expect("should process with small chunks");
+
+ let processed = String::from_utf8_lossy(&output);
+ assert!(
+ processed.contains("test.example.com") && processed.contains("/reviews"),
+ "should rewrite fragmented __NEXT_DATA__ href. Got: {processed}"
+ );
+ assert!(
+ !processed.contains("origin.example.com/reviews"),
+ "should not contain original origin href. Got: {processed}"
+ );
+ assert!(
+ processed.contains("Hello World"),
+ "should preserve non-URL content. Got: {processed}"
+ );
+ }
}
diff --git a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs
index 4df34935..9199f830 100644
--- a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs
+++ b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs
@@ -1,4 +1,4 @@
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use regex::{escape, Regex};
@@ -10,11 +10,17 @@ use super::{NextJsIntegrationConfig, NEXTJS_INTEGRATION_ID};
pub(super) struct NextJsNextDataRewriter {
config: Arc,
+ /// Accumulates text fragments when `lol_html` splits a text node across
+ /// chunk boundaries. Drained on `is_last_in_text_node`.
+ accumulated_text: Mutex,
}
impl NextJsNextDataRewriter {
pub(super) fn new(config: Arc) -> Self {
- Self { config }
+ Self {
+ config,
+ accumulated_text: Mutex::new(String::new()),
+ }
}
fn rewrite_structured(
@@ -58,7 +64,33 @@ impl IntegrationScriptRewriter for NextJsNextDataRewriter {
return ScriptRewriteAction::keep();
}
- self.rewrite_structured(content, ctx)
+ let mut buf = self
+ .accumulated_text
+ .lock()
+ .unwrap_or_else(std::sync::PoisonError::into_inner);
+
+ if !ctx.is_last_in_text_node {
+ // Intermediate fragment — accumulate and suppress output.
+ buf.push_str(content);
+ return ScriptRewriteAction::RemoveNode;
+ }
+
+ // Last fragment. If nothing was accumulated, process directly.
+ if buf.is_empty() {
+ return self.rewrite_structured(content, ctx);
+ }
+
+ // Complete the accumulated text and process the full content.
+ // If rewrite_structured returns Keep, we must still emit the full
+ // accumulated text via Replace — intermediate fragments were already
+ // removed from lol_html's output via RemoveNode.
+ buf.push_str(content);
+ let full_content = std::mem::take(&mut *buf);
+ let action = self.rewrite_structured(&full_content, ctx);
+ if matches!(action, ScriptRewriteAction::Keep) {
+ return ScriptRewriteAction::replace(full_content);
+ }
+ action
}
}
@@ -422,4 +454,122 @@ mod tests {
.expect("URL should be rewritten");
assert_eq!(new_url, "http://proxy.example.com/news");
}
+
+ #[test]
+ fn fragmented_next_data_is_accumulated_and_rewritten() {
+ let rewriter = NextJsNextDataRewriter::new(test_config());
+ let document_state = IntegrationDocumentState::default();
+
+ // Simulate lol_html splitting the text node mid-URL.
+ let fragment1 = r#"{"props":{"pageProps":{"href":"https://origin."#;
+ let fragment2 = r#"example.com/reviews"}}}"#;
+
+ let ctx_intermediate = IntegrationScriptContext {
+ selector: "script#__NEXT_DATA__",
+ request_host: "ts.example.com",
+ request_scheme: "https",
+ origin_host: "origin.example.com",
+ is_last_in_text_node: false,
+ document_state: &document_state,
+ };
+ let ctx_last = IntegrationScriptContext {
+ is_last_in_text_node: true,
+ ..ctx_intermediate
+ };
+
+ // Intermediate fragment: should be suppressed (RemoveNode).
+ let action1 = rewriter.rewrite(fragment1, &ctx_intermediate);
+ assert_eq!(
+ action1,
+ ScriptRewriteAction::RemoveNode,
+ "should suppress intermediate fragment"
+ );
+
+ // Last fragment: should emit full rewritten content.
+ let action2 = rewriter.rewrite(fragment2, &ctx_last);
+ match action2 {
+ ScriptRewriteAction::Replace(rewritten) => {
+ assert!(
+ rewritten.contains("ts.example.com"),
+ "should rewrite origin to proxy host. Got: {rewritten}"
+ );
+ assert!(
+ rewritten.contains("/reviews"),
+ "should preserve path. Got: {rewritten}"
+ );
+ assert!(
+ !rewritten.contains("origin.example.com"),
+ "should not contain original host. Got: {rewritten}"
+ );
+ }
+ other => panic!("expected Replace, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn unfragmented_next_data_works_without_accumulation() {
+ let rewriter = NextJsNextDataRewriter::new(test_config());
+ let document_state = IntegrationDocumentState::default();
+ let payload = r#"{"props":{"pageProps":{"href":"https://origin.example.com/page"}}}"#;
+
+ let ctx_single = IntegrationScriptContext {
+ selector: "script#__NEXT_DATA__",
+ request_host: "ts.example.com",
+ request_scheme: "https",
+ origin_host: "origin.example.com",
+ is_last_in_text_node: true,
+ document_state: &document_state,
+ };
+
+ let action = rewriter.rewrite(payload, &ctx_single);
+ match action {
+ ScriptRewriteAction::Replace(rewritten) => {
+ assert!(
+ rewritten.contains("ts.example.com"),
+ "should rewrite. Got: {rewritten}"
+ );
+ }
+ other => panic!("expected Replace, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn fragmented_next_data_without_rewritable_urls_preserves_content() {
+ let rewriter = NextJsNextDataRewriter::new(test_config());
+ let document_state = IntegrationDocumentState::default();
+
+ // __NEXT_DATA__ JSON with no origin URLs — rewrite_structured returns Keep.
+ let fragment1 = r#"{"props":{"pageProps":{"title":"Hello"#;
+ let fragment2 = r#" World","count":42}}}"#;
+
+ let ctx_intermediate = IntegrationScriptContext {
+ selector: "script#__NEXT_DATA__",
+ request_host: "ts.example.com",
+ request_scheme: "https",
+ origin_host: "origin.example.com",
+ is_last_in_text_node: false,
+ document_state: &document_state,
+ };
+ let ctx_last = IntegrationScriptContext {
+ is_last_in_text_node: true,
+ ..ctx_intermediate
+ };
+
+ let action1 = rewriter.rewrite(fragment1, &ctx_intermediate);
+ assert_eq!(action1, ScriptRewriteAction::RemoveNode);
+
+ // Last fragment: even though no URLs to rewrite, must emit full content
+ // because intermediate fragments were removed.
+ let action2 = rewriter.rewrite(fragment2, &ctx_last);
+ match action2 {
+ ScriptRewriteAction::Replace(content) => {
+ let expected = format!("{fragment1}{fragment2}");
+ assert_eq!(
+ content, expected,
+ "should emit full accumulated content unchanged"
+ );
+ }
+ other => panic!("expected Replace with passthrough, got {other:?}"),
+ }
+ }
}
diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs
index 6a450623..c6350e5c 100644
--- a/crates/trusted-server-core/src/publisher.rs
+++ b/crates/trusted-server-core/src/publisher.rs
@@ -370,11 +370,14 @@ pub fn handle_publisher_request(
.to_lowercase();
// Streaming gate: can we stream this response?
+ // - Backend returned success (2xx). Error pages stay buffered so the
+ // client receives a complete error response, not a truncated stream.
// - No HTML post-processors registered (they need the full document)
// - Non-HTML content always streams (post-processors only apply to HTML)
let is_html = content_type.contains("text/html");
let has_post_processors = !integration_registry.html_post_processors().is_empty();
- let can_stream = !is_html || !has_post_processors;
+ let is_success = response.get_status().is_success();
+ let can_stream = is_success && (!is_html || !has_post_processors);
if can_stream {
log::debug!(
@@ -549,11 +552,64 @@ mod tests {
}
}
- // Note: test_streaming_compressed_content removed as it directly tested private function
- // process_response_streaming. The functionality is tested through handle_publisher_request.
+ /// Test the streaming gate logic in isolation. The gate decides whether
+ /// a response can be streamed or must be buffered based on:
+ /// - Backend status (2xx only)
+ /// - Content type (processable text types)
+ /// - Post-processors (none registered for streaming)
+ #[test]
+ fn streaming_gate_allows_2xx_html_without_post_processors() {
+ let is_success = true;
+ let is_html = true;
+ let has_post_processors = false;
+ let can_stream = is_success && (!is_html || !has_post_processors);
+ assert!(can_stream, "should stream 2xx HTML without post-processors");
+ }
+
+ #[test]
+ fn streaming_gate_blocks_non_2xx_responses() {
+ let is_success = false;
+ let is_html = true;
+ let has_post_processors = false;
+ let can_stream = is_success && (!is_html || !has_post_processors);
+ assert!(
+ !can_stream,
+ "should not stream error responses even without post-processors"
+ );
+ }
+
+ #[test]
+ fn streaming_gate_blocks_html_with_post_processors() {
+ let is_success = true;
+ let is_html = true;
+ let has_post_processors = true;
+ let can_stream = is_success && (!is_html || !has_post_processors);
+ assert!(
+ !can_stream,
+ "should not stream HTML when post-processors are registered"
+ );
+ }
+
+ #[test]
+ fn streaming_gate_allows_non_html_with_post_processors() {
+ let is_success = true;
+ let is_html = false;
+ let has_post_processors = true;
+ let can_stream = is_success && (!is_html || !has_post_processors);
+ assert!(
+ can_stream,
+ "should stream non-HTML even with post-processors (they only apply to HTML)"
+ );
+ }
- // Note: test_streaming_brotli_content removed as it directly tested private function
- // process_response_streaming. The functionality is tested through handle_publisher_request.
+ #[test]
+ fn streaming_gate_blocks_non_2xx_json() {
+ let is_success = false;
+ let is_html = false;
+ let has_post_processors = false;
+ let can_stream = is_success && (!is_html || !has_post_processors);
+ assert!(!can_stream, "should not stream 4xx/5xx JSON responses");
+ }
#[test]
fn test_content_encoding_detection() {
@@ -734,4 +790,56 @@ mod tests {
"should reject unknown module names"
);
}
+
+ #[test]
+ fn stream_publisher_body_preserves_gzip_round_trip() {
+ use flate2::write::GzEncoder;
+ use std::io::Write;
+
+ let settings = create_test_settings();
+ let registry =
+ IntegrationRegistry::new(&settings).expect("should create integration registry");
+
+ // Compress CSS containing an origin URL that should be rewritten.
+ // CSS uses the text URL replacer (not lol_html), so inline URLs are rewritten.
+ let html = b"body { background: url('https://origin.example.com/page'); }";
+ let mut compressed = Vec::new();
+ {
+ let mut encoder = GzEncoder::new(&mut compressed, flate2::Compression::default());
+ encoder.write_all(html).expect("should compress");
+ encoder.finish().expect("should finish compression");
+ }
+
+ let body = Body::from(compressed);
+ let params = OwnedProcessResponseParams {
+ content_encoding: "gzip".to_string(),
+ origin_host: "origin.example.com".to_string(),
+ origin_url: "https://origin.example.com".to_string(),
+ request_host: "proxy.example.com".to_string(),
+ request_scheme: "https".to_string(),
+ content_type: "text/css".to_string(),
+ };
+
+ let mut output = Vec::new();
+ stream_publisher_body(body, &mut output, ¶ms, &settings, ®istry)
+ .expect("should process gzip CSS");
+
+ // Decompress output
+ use flate2::read::GzDecoder;
+ use std::io::Read;
+ let mut decoder = GzDecoder::new(&output[..]);
+ let mut decompressed = String::new();
+ decoder
+ .read_to_string(&mut decompressed)
+ .expect("should decompress output");
+
+ assert!(
+ decompressed.contains("proxy.example.com"),
+ "should rewrite origin to proxy. Got: {decompressed}"
+ );
+ assert!(
+ !decompressed.contains("origin.example.com"),
+ "should not contain original host. Got: {decompressed}"
+ );
+ }
}
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 5a4ea290..20665d7a 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -275,33 +275,19 @@ impl lol_html::OutputSink for RcVecSink {
/// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`].
///
-/// Operates in one of two modes:
-///
-/// - **Streaming** ([`new`](Self::new)): output is emitted incrementally on every
-/// [`process_chunk`](StreamProcessor::process_chunk) call. Use when no script
-/// rewriters are registered.
-/// - **Buffered** ([`new_buffered`](Self::new_buffered)): input is accumulated and
-/// processed in a single `write()` call on `is_last`. Use when script rewriters
-/// are registered, because `lol_html` fragments text nodes across chunk boundaries
-/// and rewriters that expect complete text content would silently miss rewrites on
-/// split fragments. (See Phase 3 plan for making rewriters fragment-safe.)
+/// Output is emitted incrementally on every [`process_chunk`](StreamProcessor::process_chunk)
+/// call. Script rewriters that receive text from `lol_html` must be fragment-safe —
+/// they accumulate text fragments internally until `is_last_in_text_node` is true.
///
/// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`]
/// is a no-op because the rewriter consumes its settings on construction.
pub struct HtmlRewriterAdapter {
rewriter: Option>,
output: Rc>>,
- /// When true, input is accumulated and fed to `lol_html` in one pass on `is_last`.
- buffered: bool,
- /// Accumulated input for the buffered path.
- accumulated_input: Vec,
}
impl HtmlRewriterAdapter {
/// Create a new HTML rewriter adapter that streams output per chunk.
- ///
- /// Use [`Self::new_buffered`] when script rewriters are registered to
- /// avoid text node fragmentation.
#[must_use]
pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self {
let output = Rc::new(RefCell::new(Vec::new()));
@@ -310,75 +296,28 @@ impl HtmlRewriterAdapter {
Self {
rewriter: Some(rewriter),
output,
- buffered: false,
- accumulated_input: Vec::new(),
- }
- }
-
- /// Create a new HTML rewriter adapter that buffers all input before processing.
- ///
- /// This avoids `lol_html` text node fragmentation that breaks script rewriters
- /// expecting complete text content. The entire document is fed to the rewriter
- /// in a single `write()` call when `is_last` is true.
- #[must_use]
- pub fn new_buffered(settings: lol_html::Settings<'static, 'static>) -> Self {
- let output = Rc::new(RefCell::new(Vec::new()));
- let sink = RcVecSink(Rc::clone(&output));
- let rewriter = lol_html::HtmlRewriter::new(settings, sink);
- Self {
- rewriter: Some(rewriter),
- output,
- buffered: true,
- accumulated_input: Vec::new(),
}
}
}
impl StreamProcessor for HtmlRewriterAdapter {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> {
- if self.buffered {
- // Buffered mode: accumulate input, process all at once on is_last.
- if !chunk.is_empty() {
- if self.rewriter.is_none() {
- log::warn!(
- "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost",
- chunk.len()
- );
- } else {
- self.accumulated_input.extend_from_slice(chunk);
- }
- }
- if !is_last {
- return Ok(Vec::new());
- }
- if let Some(rewriter) = &mut self.rewriter {
- if !self.accumulated_input.is_empty() {
- let input = std::mem::take(&mut self.accumulated_input);
- rewriter.write(&input).map_err(|e| {
- log::error!("Failed to process HTML: {e}");
+ match &mut self.rewriter {
+ Some(rewriter) => {
+ if !chunk.is_empty() {
+ rewriter.write(chunk).map_err(|e| {
+ log::error!("Failed to process HTML chunk: {e}");
io::Error::other(format!("HTML processing failed: {e}"))
})?;
}
}
- } else {
- // Streaming mode: feed chunks to `lol_html` incrementally.
- match &mut self.rewriter {
- Some(rewriter) => {
- if !chunk.is_empty() {
- rewriter.write(chunk).map_err(|e| {
- log::error!("Failed to process HTML chunk: {e}");
- io::Error::other(format!("HTML processing failed: {e}"))
- })?;
- }
- }
- None if !chunk.is_empty() => {
- log::warn!(
- "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost",
- chunk.len()
- );
- }
- None => {}
+ None if !chunk.is_empty() => {
+ log::warn!(
+ "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost",
+ chunk.len()
+ );
}
+ None => {}
}
if is_last {
@@ -417,10 +356,8 @@ mod tests {
use crate::streaming_replacer::{Replacement, StreamingReplacer};
/// Verify that `lol_html` fragments text nodes when input chunks split
- /// mid-text-node. This is critical: if `lol_html` does fragment, then
- /// script rewriters (`NextJS` `__NEXT_DATA__`, `GTM`) that expect full
- /// text content will silently miss rewrites when the streaming adapter
- /// feeds chunks incrementally.
+ /// mid-text-node. Script rewriters must be fragment-safe — they accumulate
+ /// text fragments internally until `is_last_in_text_node` is true.
#[test]
fn lol_html_fragments_text_across_chunk_boundaries() {
use std::cell::RefCell;
@@ -469,57 +406,6 @@ mod tests {
);
}
- /// Companion to [`lol_html_fragments_text_across_chunk_boundaries`]:
- /// proves that `new_buffered()` prevents fragmentation by feeding the
- /// entire document to `lol_html` in one `write()` call.
- #[test]
- fn buffered_adapter_prevents_text_fragmentation() {
- use std::cell::RefCell;
- use std::rc::Rc;
-
- let fragments: Rc>> = Rc::new(RefCell::new(Vec::new()));
- let fragments_clone = Rc::clone(&fragments);
-
- let settings = lol_html::Settings {
- element_content_handlers: vec![lol_html::text!("script", move |text| {
- fragments_clone
- .borrow_mut()
- .push((text.as_str().to_string(), text.last_in_text_node()));
- Ok(())
- })],
- ..lol_html::Settings::default()
- };
-
- let mut adapter = HtmlRewriterAdapter::new_buffered(settings);
-
- // Feed the same split chunks as the fragmentation test
- let r1 = adapter
- .process_chunk(b"", true)
- .expect("should process chunk2");
- assert!(
- !r2.is_empty(),
- "buffered adapter should emit output on is_last"
- );
-
- let frags = fragments.borrow();
- // With buffered mode, the text handler should see the complete string
- assert!(
- frags
- .iter()
- .any(|(text, _)| text.contains("googletagmanager.com")),
- "buffered adapter should deliver complete text to handler, got: {:?}",
- *frags
- );
- }
-
#[test]
fn test_uncompressed_pipeline() {
let replacer = StreamingReplacer::new(vec![Replacement {
diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md
index 4afca7fe..061a3979 100644
--- a/docs/superpowers/plans/2026-03-25-streaming-response.md
+++ b/docs/superpowers/plans/2026-03-25-streaming-response.md
@@ -25,11 +25,11 @@ rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression).
## File Map
-| File | Role | Phase |
-|------|------|-------|
-| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 |
-| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 |
-| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 |
+| File | Role | Phase |
+| ------------------------------------------------------- | -------------------------------------------------------------------------------------- | ----- |
+| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 |
+| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 |
+| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 |
---
@@ -50,6 +50,7 @@ This is the prerequisite for Task 2. The current code calls `flush()` then
moving gzip to this path.
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:334-393`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -210,6 +211,7 @@ git commit -m "Fix encoder finalization: explicit finish instead of drop"
### Task 2: Convert `process_gzip_to_gzip` to chunk-based processing
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:183-225`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -311,6 +313,7 @@ git commit -m "Convert process_gzip_to_gzip to chunk-based processing"
### Task 3: Convert `decompress_and_process` to chunk-based processing
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:227-262`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -449,6 +452,7 @@ git commit -m "Convert decompress_and_process to chunk-based processing"
### Task 4: Rewrite `HtmlRewriterAdapter` for incremental streaming
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:396-472`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -663,6 +667,7 @@ Expected: Builds successfully.
### Task 6: Migrate entry point from `#[fastly::main]` to raw `main()`
**Files:**
+
- Modify: `crates/trusted-server-adapter-fastly/src/main.rs:32-68`
- [ ] **Step 1: Rewrite `main` function**
@@ -742,6 +747,7 @@ git commit -m "Migrate entry point from #[fastly::main] to raw main()"
### Task 7: Refactor `process_response_streaming` to accept `W: Write`
**Files:**
+
- Modify: `crates/trusted-server-core/src/publisher.rs:97-180`
- [ ] **Step 1: Change signature to accept generic writer**
@@ -797,6 +803,7 @@ git commit -m "Refactor process_response_streaming to accept generic writer"
### Task 8: Add streaming path to publisher proxy
**Files:**
+
- Modify: `crates/trusted-server-core/src/publisher.rs`
- Modify: `crates/trusted-server-adapter-fastly/src/main.rs`
@@ -857,6 +864,7 @@ fn should_stream(
- [ ] **Step 3: Restructure `handle_publisher_request` to support streaming**
Split the function into:
+
1. Pre-processing: request info, cookies, synthetic ID, consent, backend
request — everything before `response.take_body()`
2. Header finalization: synthetic ID/cookie headers, `finalize_response()`
@@ -865,6 +873,7 @@ Split the function into:
(`StreamingBody`)
The streaming path in the fastly adapter:
+
```rust
// After header finalization, before body processing:
if should_stream {
@@ -954,6 +963,7 @@ Expected: Builds.
Run: `fastly compute serve`
Test:
+
- `curl -s http://localhost:7676/ | sha256sum` — compare with baseline
- `curl -sI http://localhost:7676/` — verify headers present (geo, version,
synthetic ID cookie if consent configured)
@@ -985,7 +995,108 @@ Repeat the same measurements after building the feature branch.
Create a comparison table and save to PR description or a results file.
Check for:
+
- TTLB improvement (primary goal)
- No TTFB regression
- Identical response body hash (correctness)
- LCP/Speed Index improvement (secondary)
+
+---
+
+## Phase 3: Make Script Rewriters Fragment-Safe (PR #591)
+
+> **Implementation note (2026-03-27):** All tasks completed. Script rewriters
+> accumulate text fragments via `Mutex` until `last_in_text_node` is
+> true. Buffered mode removed from `HtmlRewriterAdapter`. 2xx streaming gate
+> added. Small-chunk (32 byte) pipeline regression tests added for both
+> NextJS `__NEXT_DATA__` and GTM inline scripts.
+
+### Task 11: Make `NextJsNextDataRewriter` fragment-safe
+
+**Files:** `crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs`
+
+- [x] Add `accumulated_text: Mutex` field
+- [x] Accumulate intermediate fragments, return `RemoveNode`
+- [x] On last fragment, process full accumulated text
+- [x] Handle Keep-after-accumulation (emit `Replace(full_content)`)
+- [x] Add regression tests
+
+### Task 12: Make `GoogleTagManagerIntegration` rewrite fragment-safe
+
+**Files:** `crates/trusted-server-core/src/integrations/google_tag_manager.rs`
+
+- [x] Add `accumulated_text: Mutex` field
+- [x] Accumulate intermediate fragments, return `RemoveNode`
+- [x] On last fragment, match and rewrite on complete text
+- [x] Non-GTM accumulated scripts emitted unchanged via `Replace`
+- [x] Add regression tests
+
+### Task 13: Remove buffered mode from `HtmlRewriterAdapter`
+
+**Files:** `crates/trusted-server-core/src/streaming_processor.rs`
+
+- [x] Delete `new_buffered()`, `buffered` flag, `accumulated_input`
+- [x] Simplify `process_chunk` to streaming-only path
+- [x] Remove `buffered_adapter_prevents_text_fragmentation` test
+- [x] Update doc comments
+
+### Task 14: Always use streaming adapter in `create_html_processor`
+
+**Files:** `crates/trusted-server-core/src/html_processor.rs`
+
+- [x] Remove `has_script_rewriters` check
+- [x] Always call `HtmlRewriterAdapter::new(settings)`
+
+### Task 15: Full verification, regression tests, and performance measurement
+
+- [x] Add 2xx streaming gate (`response.get_status().is_success()`)
+- [x] Add streaming gate unit tests (5 tests)
+- [x] Add `stream_publisher_body` gzip round-trip test
+- [x] Add small-chunk (32 byte) pipeline tests for NextJS and GTM
+- [x] `cargo test --workspace` — 766 passed
+- [x] `cargo clippy` — clean
+- [x] `cargo fmt --check` — clean
+- [x] WASM release build — success
+- [x] Staging performance comparison (see results below)
+
+### Performance Results (getpurpose.ai, median over 5 runs, Chrome 1440x900)
+
+| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta |
+| -------------------------- | --------------------------- | ------------------------- | ------------------ |
+| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** |
+| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) |
+| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) |
+| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) |
+| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** |
+
+---
+
+## Phase 4: Stream Binary Pass-Through Responses
+
+Non-processable content (images, fonts, video, `application/octet-stream`)
+currently passes through `handle_publisher_request` unchanged via the
+`Buffered` path. This buffers the entire response body in memory — wasteful
+for large binaries that need no processing. Phase 4 adds a `PassThrough`
+variant that streams the body directly via `io::copy` into `StreamingBody`.
+
+### Task 16: Stream binary pass-through responses via `io::copy`
+
+**Files:**
+
+- `crates/trusted-server-core/src/publisher.rs`
+- `crates/trusted-server-adapter-fastly/src/main.rs`
+
+- [ ] Add `PublisherResponse::PassThrough { response, body }` variant
+- [ ] Return `PassThrough` when `!should_process` and backend returned 2xx
+- [ ] Handle in `main.rs`: `stream_to_client()` + `io::copy(body, &mut streaming_body)`
+- [ ] Keep `Buffered` for non-2xx responses and `request_host.is_empty()`
+- [ ] Preserve `Content-Length` for pass-through (body is unmodified)
+
+### Task 17: Binary pass-through tests and verification
+
+- [ ] Publisher-level test: image content type returns `PassThrough`
+- [ ] Publisher-level test: 4xx image stays `Buffered`
+- [ ] `cargo test --workspace`
+- [ ] `cargo clippy` + `cargo fmt --check`
+- [ ] WASM release build
+- [ ] Staging performance comparison (DOM Complete for image-heavy pages)
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index 034624b5..eb633be5 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -218,11 +218,11 @@ headers are sent, we are committed.
## Files Changed
-| File | Change | Risk |
-| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
-| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High |
-| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium |
-| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium |
+| File | Change | Risk |
+| ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
+| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High |
+| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium |
+| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium |
**Minimal changes**: `html_processor.rs` now selects `HtmlRewriterAdapter` mode
based on script rewriter presence (see [Text Node Fragmentation](#text-node-fragmentation-phase-3)),
@@ -250,14 +250,16 @@ HTML incrementally. Script rewriters (`NextJsNextDataRewriter`,
`GoogleTagManagerIntegration`) expect complete text content — if a domain string
is split across chunks, the rewrite silently fails.
-**Phase 1 workaround**: `HtmlRewriterAdapter` has two modes. `new()` streams
-per chunk (no script rewriters). `new_buffered()` accumulates input and
-processes in one `write()` call (script rewriters registered).
-`create_html_processor` selects the mode automatically.
+**Resolved in Phase 3**: Each script rewriter is now fragment-safe. They
+accumulate text fragments internally via `Mutex` until
+`is_last_in_text_node` is true, then process the complete text. Intermediate
+fragments return `RemoveNode` (suppressed from output); the final fragment
+emits the full rewritten content via `Replace`. If no rewrite is needed,
+the full accumulated content is still emitted via `Replace` (since
+intermediate fragments were already removed from the output).
-**Phase 3** will make each script rewriter fragment-safe by accumulating text
-fragments internally via `is_last_in_text_node`. This removes the buffered
-fallback and enables streaming for all configurations. See #584.
+The `HtmlRewriterAdapter` buffered mode (`new_buffered()`) has been removed.
+`create_html_processor` always uses the streaming adapter.
## Rollback Strategy
@@ -324,14 +326,14 @@ branch, then compare.
Repeat the same steps on the feature branch. Compare:
-| Metric | Source | Expected change |
-|--------|--------|-----------------|
-| TTFB (document) | Network timing | Minimal change (gated by backend response time) |
-| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally |
-| LCP | Lighthouse | Improved — browser receives `` resources sooner |
-| Speed Index | Lighthouse | Improved — progressive rendering starts earlier |
-| Transfer size | Network timing | Unchanged (same content, same compression) |
-| Response body hash | `evaluate_script` with hash | Identical — correctness check |
+| Metric | Source | Expected change |
+| ------------------ | ------------------------------ | ----------------------------------------------------- |
+| TTFB (document) | Network timing | Minimal change (gated by backend response time) |
+| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally |
+| LCP | Lighthouse | Improved — browser receives `` resources sooner |
+| Speed Index | Lighthouse | Improved — progressive rendering starts earlier |
+| Transfer size | Network timing | Unchanged (same content, same compression) |
+| Response body hash | `evaluate_script` with hash | Identical — correctness check |
#### Automated comparison script
@@ -340,11 +342,13 @@ correctness verification:
```js
// Run via evaluate_script after page load
-const response = await fetch(location.href);
-const buffer = await response.arrayBuffer();
-const hash = await crypto.subtle.digest('SHA-256', buffer);
-const hex = [...new Uint8Array(hash)].map(b => b.toString(16).padStart(2, '0')).join('');
-hex; // compare this between baseline and feature branch
+const response = await fetch(location.href)
+const buffer = await response.arrayBuffer()
+const hash = await crypto.subtle.digest('SHA-256', buffer)
+const hex = [...new Uint8Array(hash)]
+ .map((b) => b.toString(16).padStart(2, '0'))
+ .join('')
+hex // compare this between baseline and feature branch
```
#### What to watch for
@@ -365,3 +369,54 @@ hex; // compare this between baseline and feature branch
- Compare against Viceroy results to account for real network conditions.
- Monitor WASM heap usage via Fastly dashboard.
- Verify no regressions on static endpoints or auction.
+
+### Results (getpurpose.ai, median over 5 runs, Chrome 1440x900)
+
+Measured via Chrome DevTools Protocol against prod (v135, buffered) and
+staging (v136, streaming). Chrome `--host-resolver-rules` used to route
+`getpurpose.ai` to the staging Fastly edge (167.82.83.52).
+
+| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta |
+| -------------------------- | --------------------------- | ------------------------- | ------------------ |
+| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** |
+| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) |
+| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) |
+| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) |
+| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** |
+
+## Phase 4: Binary Pass-Through Streaming
+
+Non-processable content (images, fonts, video, `application/octet-stream`)
+currently passes through `handle_publisher_request` unchanged via the
+`Buffered` path, buffering the entire body in memory before sending. For
+large binaries (1-10 MB images), this is wasteful.
+
+Phase 4 adds a `PublisherResponse::PassThrough` variant that signals the
+adapter to stream the body directly via `io::copy` into `StreamingBody`
+with no processing pipeline. This eliminates peak memory for binary
+responses and improves DOM Complete for image-heavy pages.
+
+### Streaming gate (updated)
+
+```
+is_success (2xx)
+├── should_process && (!is_html || !has_post_processors) → Stream (pipeline)
+├── should_process && is_html && has_post_processors → Buffered (post-processors)
+└── !should_process → PassThrough (io::copy)
+
+!is_success
+└── any content type → Buffered (error page)
+```
+
+### `PublisherResponse` enum (updated)
+
+```rust
+pub enum PublisherResponse {
+ Buffered(Response),
+ Stream { response, body, params },
+ PassThrough { response, body },
+}
+```
+
+`Content-Length` is preserved for `PassThrough` since the body is
+unmodified — no need for chunked transfer encoding.