diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 3d704119..e5dedf5b 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -16,6 +16,9 @@ use trusted_server_core::ec::finalize::ec_finalize_response; use trusted_server_core::ec::identify::{cors_preflight_identify, handle_identify}; use trusted_server_core::ec::kv::KvIdentityGraph; use trusted_server_core::ec::partner::PartnerStore; +use trusted_server_core::ec::pull_sync::{ + build_pull_sync_context, dispatch_pull_sync, PullSyncContext, +}; use trusted_server_core::ec::sync_pixel::{handle_sync, FastlyRateLimiter, RATE_COUNTER_NAME}; use trusted_server_core::ec::EcContext; use trusted_server_core::error::TrustedServerError; @@ -37,21 +40,26 @@ use trusted_server_core::settings_data::get_settings; mod error; use crate::error::to_error_response; -#[fastly::main] -fn main(req: Request) -> Result { +fn main() -> Result<(), Error> { init_logger(); + let req = Request::from_client(); + // Keep the health probe independent from settings loading and routing so // readiness checks still get a cheap liveness response during startup. if req.get_method() == Method::GET && req.get_path() == "/health" { - return Ok(Response::from_status(200).with_body_text_plain("ok")); + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return Ok(()); } let settings = match get_settings() { Ok(s) => s, Err(e) => { log::error!("Failed to load settings: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return Ok(()); } }; log::debug!("Settings {settings:?}"); @@ -63,16 +71,36 @@ fn main(req: Request) -> Result { Ok(r) => r, Err(e) => { log::error!("Failed to create integration registry: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return Ok(()); } }; - futures::executor::block_on(route_request( + let outcome = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, req, - )) + ))?; + + let RouteOutcome { + response, + pull_sync_context, + } = outcome; + + response.send_to_client(); + + if let Some(context) = pull_sync_context { + run_pull_sync_after_send(&settings, &context); + } + + Ok(()) +} + +#[must_use] +struct RouteOutcome { + response: Response, + pull_sync_context: Option, } async fn route_request( @@ -80,7 +108,7 @@ async fn route_request( orchestrator: &AuctionOrchestrator, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result { +) -> Result { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -101,7 +129,10 @@ async fn route_request( }) .unwrap_or_else(|e| to_error_response(&e)); finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Ok(RouteOutcome { + response, + pull_sync_context: None, + }); } let mut ec_context = @@ -110,7 +141,10 @@ async fn route_request( Err(err) => { let mut response = to_error_response(&err); finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Ok(RouteOutcome { + response, + pull_sync_context: None, + }); } }; @@ -125,7 +159,10 @@ async fn route_request( &mut response, ); finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Ok(RouteOutcome { + response, + pull_sync_context: None, + }); } // Get path and method for routing @@ -133,71 +170,88 @@ async fn route_request( let method = req.get_method().clone(); // Match known routes and handle them - let result = match (method, path.as_str()) { + let (result, organic_route) = match (method, path.as_str()) { // Serve the tsjs library (Method::GET, path) if path.starts_with("/static/tsjs=") => { - handle_tsjs_dynamic(&req, integration_registry) + (handle_tsjs_dynamic(&req, integration_registry), false) } // Discovery endpoint for trusted-server capabilities and JWKS (Method::GET, "/.well-known/trusted-server.json") => { - handle_trusted_server_discovery(settings, req) + (handle_trusted_server_discovery(settings, req), false) } // Signature verification endpoint - (Method::POST, "/verify-signature") => handle_verify_signature(settings, req), + (Method::POST, "/verify-signature") => (handle_verify_signature(settings, req), false), // Admin endpoints // Keep in sync with Settings::ADMIN_ENDPOINTS in crates/trusted-server-core/src/settings.rs - (Method::POST, "/admin/keys/rotate") => handle_rotate_key(settings, req), - (Method::POST, "/admin/keys/deactivate") => handle_deactivate_key(settings, req), - (Method::POST, "/admin/partners/register") => { - require_partner_store(settings).and_then(|store| handle_register_partner(&store, req)) - } - - (Method::GET, "/sync") => require_identity_graph(settings).and_then(|kv| { - require_partner_store(settings).and_then(|partner_store| { - handle_sync(settings, &kv, &partner_store, &req, &mut ec_context) - }) - }), - (Method::GET, "/identify") => require_identity_graph(settings).and_then(|kv| { - require_partner_store(settings).and_then(|partner_store| { - handle_identify(settings, &kv, &partner_store, &req, &ec_context) - }) - }), - (Method::OPTIONS, "/identify") => cors_preflight_identify(settings, &req), + (Method::POST, "/admin/keys/rotate") => (handle_rotate_key(settings, req), false), + (Method::POST, "/admin/keys/deactivate") => (handle_deactivate_key(settings, req), false), + (Method::POST, "/admin/partners/register") => ( + require_partner_store(settings).and_then(|store| handle_register_partner(&store, req)), + false, + ), + + (Method::GET, "/sync") => ( + require_identity_graph(settings).and_then(|kv| { + require_partner_store(settings).and_then(|partner_store| { + handle_sync(settings, &kv, &partner_store, &req, &mut ec_context) + }) + }), + false, + ), + (Method::GET, "/identify") => ( + require_identity_graph(settings).and_then(|kv| { + require_partner_store(settings).and_then(|partner_store| { + handle_identify(settings, &kv, &partner_store, &req, &ec_context) + }) + }), + false, + ), + (Method::OPTIONS, "/identify") => (cors_preflight_identify(settings, &req), false), // Unified auction endpoint (returns creative HTML inline) (Method::POST, "/auction") => { let partner_store = require_partner_store(settings).ok(); - handle_auction( - settings, - orchestrator, - kv_graph.as_ref(), - partner_store.as_ref(), - &ec_context, - req, + ( + handle_auction( + settings, + orchestrator, + kv_graph.as_ref(), + partner_store.as_ref(), + &ec_context, + req, + ) + .await, + false, ) - .await } // tsjs endpoints - (Method::GET, "/first-party/proxy") => handle_first_party_proxy(settings, req).await, - (Method::GET, "/first-party/click") => handle_first_party_click(settings, req).await, + (Method::GET, "/first-party/proxy") => { + (handle_first_party_proxy(settings, req).await, false) + } + (Method::GET, "/first-party/click") => { + (handle_first_party_click(settings, req).await, false) + } (Method::GET, "/first-party/sign") | (Method::POST, "/first-party/sign") => { - handle_first_party_proxy_sign(settings, req).await + (handle_first_party_proxy_sign(settings, req).await, false) } (Method::POST, "/first-party/proxy-rebuild") => { - handle_first_party_proxy_rebuild(settings, req).await + (handle_first_party_proxy_rebuild(settings, req).await, false) + } + (m, path) if integration_registry.has_route(&m, path) => { + let result = integration_registry + .handle_proxy(&m, path, settings, kv_graph.as_ref(), &mut ec_context, req) + .await + .unwrap_or_else(|| { + Err(Report::new(TrustedServerError::BadRequest { + message: format!("Unknown integration route: {path}"), + })) + }); + (result, true) } - (m, path) if integration_registry.has_route(&m, path) => integration_registry - .handle_proxy(&m, path, settings, kv_graph.as_ref(), &mut ec_context, req) - .await - .unwrap_or_else(|| { - Err(Report::new(TrustedServerError::BadRequest { - message: format!("Unknown integration route: {path}"), - })) - }), // No known route matched, proxy to publisher origin as fallback _ => { @@ -206,7 +260,7 @@ async fn route_request( path ); - match handle_publisher_request( + let result = match handle_publisher_request( settings, integration_registry, kv_graph.as_ref(), @@ -218,10 +272,13 @@ async fn route_request( log::error!("Failed to proxy to publisher origin: {:?}", e); Err(e) } - } + }; + (result, true) } }; + let route_succeeded = result.is_ok(); + // Convert any errors to HTTP error responses let mut response = result.unwrap_or_else(|e| to_error_response(&e)); @@ -235,13 +292,43 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - Ok(response) + let pull_sync_context = if organic_route && route_succeeded { + build_pull_sync_context(&ec_context) + } else { + None + }; + + Ok(RouteOutcome { + response, + pull_sync_context, + }) } fn maybe_identity_graph(settings: &Settings) -> Option { settings.ec.ec_store.as_ref().map(KvIdentityGraph::new) } +fn run_pull_sync_after_send(settings: &Settings, context: &PullSyncContext) { + let kv = match require_identity_graph(settings) { + Ok(kv) => kv, + Err(err) => { + log::debug!("Pull sync: identity graph unavailable, skipping: {err:?}"); + return; + } + }; + + let partner_store = match require_partner_store(settings) { + Ok(store) => store, + Err(err) => { + log::debug!("Pull sync: partner store unavailable, skipping: {err:?}"); + return; + } + }; + + let limiter = FastlyRateLimiter::new(RATE_COUNTER_NAME); + dispatch_pull_sync(settings, &kv, &partner_store, &limiter, context); +} + /// Applies all standard response headers: geo, version, staging, and configured headers. /// /// Called from every response path (including auth early-returns) so that all diff --git a/crates/trusted-server-core/src/ec/mod.rs b/crates/trusted-server-core/src/ec/mod.rs index 65df9662..4723379d 100644 --- a/crates/trusted-server-core/src/ec/mod.rs +++ b/crates/trusted-server-core/src/ec/mod.rs @@ -24,6 +24,7 @@ //! - [`identify`] — Browser identity read endpoint (`GET /identify`) //! - [`eids`] — Shared EID resolution and formatting helpers //! - [`batch_sync`] — S2S batch sync endpoint (`POST /api/v1/sync`) +//! - [`pull_sync`] — Background pull-sync dispatcher for organic routes pub mod admin; pub mod batch_sync; @@ -36,6 +37,7 @@ pub mod identify; pub mod kv; pub mod kv_types; pub mod partner; +pub mod pull_sync; pub mod sync_pixel; use cookie::CookieJar; @@ -362,6 +364,25 @@ impl EcContext { geo_info: None, } } + + /// Creates a test-only [`EcContext`] with explicit client IP. + #[cfg(test)] + #[must_use] + pub fn new_for_test_with_ip( + ec_value: Option, + consent: ConsentContext, + client_ip: Option, + ) -> Self { + Self { + ec_was_present: ec_value.is_some(), + cookie_ec_value: ec_value.clone(), + ec_value, + ec_generated: false, + consent, + client_ip, + geo_info: None, + } + } } fn current_timestamp() -> u64 { diff --git a/crates/trusted-server-core/src/ec/partner.rs b/crates/trusted-server-core/src/ec/partner.rs index 7e1566a8..b83e8e41 100644 --- a/crates/trusted-server-core/src/ec/partner.rs +++ b/crates/trusted-server-core/src/ec/partner.rs @@ -137,9 +137,18 @@ pub fn validate_pull_sync_config(record: &PartnerRecord) -> Result<(), String> { ); } - // Validate that the pull sync URL hostname is in the allowed domains. + // Validate that the pull sync URL uses HTTPS (bearer tokens must not + // travel over plaintext). let parsed = url::Url::parse(url_str).map_err(|e| format!("pull_sync_url is not a valid URL: {e}"))?; + if parsed.scheme() != "https" { + return Err(format!( + "pull_sync_url must use HTTPS, got scheme '{}'", + parsed.scheme() + )); + } + + // Validate that the pull sync URL hostname is in the allowed domains. let host = parsed .host_str() .ok_or("pull_sync_url has no hostname")? @@ -241,6 +250,58 @@ impl PartnerStore { Ok(Some(record)) } + /// Lists all registered partner records. + /// + /// Scans the partner KV store and returns records for non-index keys. + /// Secondary index entries (e.g. `apikey:*`) are skipped. + /// + /// # Errors + /// + /// Returns [`TrustedServerError::KvStore`] on list, lookup, or + /// deserialization failure. + pub fn list_registered(&self) -> Result, Report> { + let store = self.open_store()?; + let mut records = Vec::new(); + + for page in store.build_list().limit(1000).iter() { + let page = page.change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: "Failed to list partner keys".to_owned(), + })?; + + for key in page.keys() { + if key.starts_with(APIKEY_INDEX_PREFIX) { + continue; + } + + let mut response = match store.lookup(key) { + Ok(resp) => resp, + Err(fastly::kv_store::KVStoreError::ItemNotFound) => continue, + Err(err) => { + return Err( + Report::new(err).change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("Failed to read partner '{key}' while listing"), + }), + ); + } + }; + + let body_bytes = response.take_body_bytes(); + let record = serde_json::from_slice::(&body_bytes).change_context( + TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("Failed to deserialize partner '{key}' while listing"), + }, + )?; + + records.push(record); + } + } + + Ok(records) + } + /// Writes or updates a partner record and maintains the API key index. /// /// Returns `true` if this was a new partner (create), `false` if an @@ -651,6 +712,32 @@ mod tests { assert!(err.contains("ts_pull_token"), "got: {err}"); } + #[test] + fn validate_pull_sync_rejects_http_scheme() { + let record = PartnerRecord { + id: "test".to_owned(), + name: "Test".to_owned(), + allowed_return_domains: vec![], + api_key_hash: String::new(), + bidstream_enabled: false, + source_domain: "test.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: true, + pull_sync_url: Some("http://sync.test.com/pull".to_owned()), + pull_sync_allowed_domains: vec!["sync.test.com".to_owned()], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: Some("token".to_owned()), + }; + let err = validate_pull_sync_config(&record).unwrap_err(); + assert!( + err.contains("HTTPS"), + "should reject HTTP scheme, got: {err}" + ); + } + #[test] fn validate_pull_sync_rejects_url_not_in_allowed_domains() { let record = PartnerRecord { diff --git a/crates/trusted-server-core/src/ec/pull_sync.rs b/crates/trusted-server-core/src/ec/pull_sync.rs new file mode 100644 index 00000000..d07c15e0 --- /dev/null +++ b/crates/trusted-server-core/src/ec/pull_sync.rs @@ -0,0 +1,539 @@ +//! Pull sync background dispatch. +//! +//! Launches partner pull-sync requests for organic traffic after the client +//! response has been sent. Dispatch is best-effort and never affects client +//! response status. + +use fastly::http::request::PendingRequest; +use fastly::http::{Method, StatusCode}; +use fastly::Request; +use serde::Deserialize; +use url::Url; + +use crate::backend::BackendConfig; +use crate::settings::Settings; + +use super::generation::is_valid_ec_id; +use super::kv::KvIdentityGraph; +use super::kv_types::KvEntry; +use super::partner::{PartnerRecord, PartnerStore}; +use super::sync_pixel::{current_timestamp, RateLimiter}; +use super::EcContext; + +/// Inputs needed to dispatch pull sync after response flush. +#[derive(Debug, Clone)] +pub struct PullSyncContext { + ec_hash: String, + client_ip: String, +} + +impl PullSyncContext { + /// Returns the stable EC hash for the request. + #[must_use] + pub fn ec_hash(&self) -> &str { + &self.ec_hash + } + + /// Returns the normalized client IP for pull endpoint query parameters. + #[must_use] + pub fn client_ip(&self) -> &str { + &self.client_ip + } +} + +struct InFlightPull { + partner_id: String, + pending: PendingRequest, +} + +#[derive(Debug, Deserialize)] +struct PullSyncResponse { + uid: Option, +} + +/// Builds post-send pull-sync context from the route EC context. +/// +/// Returns `None` when consent denies EC, there is no active EC hash, or no +/// client IP is available. +#[must_use] +pub fn build_pull_sync_context(ec_context: &EcContext) -> Option { + if !ec_context.ec_allowed() { + return None; + } + + let ec_id = ec_context.ec_value()?; + if !is_valid_ec_id(ec_id) { + log::debug!("Pull sync: skipping dispatch because active EC ID is invalid format"); + return None; + } + + let ec_hash = ec_context.ec_hash()?.to_owned(); + let client_ip = ec_context.client_ip()?.to_owned(); + Some(PullSyncContext { ec_hash, client_ip }) +} + +/// Dispatches partner pull-sync requests in the background. +/// +/// This function is best-effort: all errors are logged and swallowed. +pub fn dispatch_pull_sync( + settings: &Settings, + kv: &KvIdentityGraph, + partner_store: &PartnerStore, + rate_limiter: &dyn RateLimiter, + context: &PullSyncContext, +) { + let now = current_timestamp(); + let kv_entry = match kv.get(context.ec_hash()) { + Ok(entry) => entry.map(|(entry, _)| entry), + Err(err) => { + log::warn!( + "Pull sync: failed to read identity graph for '{}': {err:?}", + context.ec_hash() + ); + return; + } + }; + + let partners = match partner_store.list_registered() { + Ok(partners) => partners, + Err(err) => { + log::warn!("Pull sync: failed to list partners: {err:?}"); + return; + } + }; + + let pull_enabled_count = partners.iter().filter(|p| p.pull_sync_enabled).count(); + log::debug!( + "Pull sync: enumerated {} partners ({} pull-enabled)", + partners.len(), + pull_enabled_count + ); + + if pull_enabled_count == 0 { + return; + } + + let max_concurrency = settings.ec.pull_sync_concurrency.max(1); + let mut in_flight: Vec = Vec::new(); + + for partner in partners { + if !partner.pull_sync_enabled { + continue; + } + + if !is_partner_pull_eligible(&partner, kv_entry.as_ref(), now) { + continue; + } + + let Some(url) = validated_pull_sync_url(&partner) else { + continue; + }; + + let rate_key = format!("pull:{}:{}", partner.id, context.ec_hash()); + match rate_limiter.exceeded(&rate_key, partner.pull_sync_rate_limit) { + Ok(true) => { + log::debug!( + "Pull sync: rate-limited partner '{}' for hash '{}'", + partner.id, + context.ec_hash() + ); + continue; + } + Ok(false) => {} + Err(err) => { + log::warn!( + "Pull sync: failed to read rate limit for partner '{}': {err:?}", + partner.id + ); + continue; + } + } + + let Some(token) = partner.ts_pull_token.as_deref() else { + log::warn!( + "Pull sync: partner '{}' enabled but missing ts_pull_token", + partner.id + ); + continue; + }; + + let request_url = build_pull_request_url(url, context.ec_hash(), context.client_ip()); + let mut request = Request::new(Method::GET, request_url.as_str()); + request.set_header("authorization", format!("Bearer {token}")); + + let backend_name = + match BackendConfig::from_url(request_url.as_str(), settings.proxy.certificate_check) { + Ok(name) => name, + Err(err) => { + log::warn!( + "Pull sync: failed to resolve backend for partner '{}': {err:?}", + partner.id + ); + continue; + } + }; + + let pending = match request.send_async(backend_name) { + Ok(pending) => pending, + Err(err) => { + log::warn!( + "Pull sync: failed to dispatch partner '{}': {err:?}", + partner.id + ); + continue; + } + }; + + in_flight.push(InFlightPull { + partner_id: partner.id, + pending, + }); + + if in_flight.len() >= max_concurrency { + drain_pull_batch(kv, context.ec_hash(), &mut in_flight); + } + } + + drain_pull_batch(kv, context.ec_hash(), &mut in_flight); +} + +fn is_partner_pull_eligible(partner: &PartnerRecord, kv_entry: Option<&KvEntry>, now: u64) -> bool { + let Some(entry) = kv_entry else { + return true; + }; + + let Some(existing) = entry.ids.get(&partner.id) else { + return true; + }; + + now.saturating_sub(existing.synced) >= partner.pull_sync_ttl_sec +} + +fn validated_pull_sync_url(partner: &PartnerRecord) -> Option { + let pull_sync_url = partner.pull_sync_url.as_deref()?; + let parsed = match Url::parse(pull_sync_url) { + Ok(url) => url, + Err(err) => { + log::error!( + "Pull sync: partner '{}' has invalid pull_sync_url '{}': {err}", + partner.id, + pull_sync_url + ); + return None; + } + }; + + if parsed.scheme() != "https" { + log::error!( + "Pull sync: partner '{}' pull_sync_url must use HTTPS, got scheme '{}'", + partner.id, + parsed.scheme() + ); + return None; + } + + let Some(hostname) = parsed.host_str() else { + log::error!( + "Pull sync: partner '{}' pull_sync_url has no hostname: {}", + partner.id, + pull_sync_url + ); + return None; + }; + + let hostname = hostname.trim_end_matches('.').to_ascii_lowercase(); + if !partner.pull_sync_allowed_domains.iter().any(|domain| { + domain + .trim() + .trim_end_matches('.') + .eq_ignore_ascii_case(&hostname) + }) { + log::error!( + "Pull sync: partner '{}' URL host '{}' not in pull_sync_allowed_domains", + partner.id, + hostname + ); + return None; + } + + Some(parsed) +} + +fn build_pull_request_url(mut base_url: Url, ec_hash: &str, client_ip: &str) -> Url { + base_url + .query_pairs_mut() + .append_pair("ec_hash", ec_hash) + .append_pair("ip", client_ip); + base_url +} + +fn drain_pull_batch(kv: &KvIdentityGraph, ec_hash: &str, in_flight: &mut Vec) { + for pending in in_flight.drain(..) { + let partner_id = pending.partner_id; + let response = match pending.pending.wait() { + Ok(response) => response, + Err(err) => { + log::warn!( + "Pull sync: request failed for partner '{}': {err:?}", + partner_id + ); + continue; + } + }; + + let Some(uid) = extract_pull_uid(response, &partner_id) else { + continue; + }; + + if let Err(err) = kv.upsert_partner_id(ec_hash, &partner_id, &uid, current_timestamp()) { + log::warn!( + "Pull sync: failed to upsert partner '{}' for hash '{}': {err:?}", + partner_id, + ec_hash + ); + } + } +} + +fn extract_pull_uid(mut response: fastly::Response, partner_id: &str) -> Option { + let status = response.get_status(); + + if status == StatusCode::NOT_FOUND { + log::debug!( + "Pull sync: partner '{}' returned 404, treating as no-op", + partner_id + ); + return None; + } + + if !status.is_success() { + log::warn!( + "Pull sync: partner '{}' returned non-success status {}", + partner_id, + status + ); + return None; + } + + let body = response.take_body_bytes(); + let payload = match serde_json::from_slice::(&body) { + Ok(payload) => payload, + Err(err) => { + log::warn!( + "Pull sync: partner '{}' returned invalid JSON body: {err}", + partner_id + ); + return None; + } + }; + + const MAX_UID_LENGTH: usize = 256; + + let uid = payload.uid.filter(|value| !value.is_empty()); + match uid { + None => { + log::debug!( + "Pull sync: partner '{}' returned null/empty uid, treating as no-op", + partner_id + ); + None + } + Some(ref value) if value.len() > MAX_UID_LENGTH => { + log::warn!( + "Pull sync: partner '{}' returned uid exceeding {} bytes (got {}), rejecting", + partner_id, + MAX_UID_LENGTH, + value.len() + ); + None + } + _ => uid, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::consent::types::ConsentContext; + use crate::ec::kv_types::KvEntry; + + fn pull_partner(ttl_sec: u64) -> PartnerRecord { + PartnerRecord { + id: "ssp_x".to_owned(), + name: "SSP X".to_owned(), + allowed_return_domains: vec!["sync.example.com".to_owned()], + api_key_hash: "deadbeef".to_owned(), + bidstream_enabled: true, + source_domain: "ssp.example.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 60, + batch_rate_limit: 60, + pull_sync_enabled: true, + pull_sync_url: Some("https://sync.partner.test/pull".to_owned()), + pull_sync_allowed_domains: vec!["sync.partner.test".to_owned()], + pull_sync_ttl_sec: ttl_sec, + pull_sync_rate_limit: 20, + ts_pull_token: Some("token".to_owned()), + } + } + + #[test] + fn build_pull_sync_context_requires_ec_and_ip() { + let consent = ConsentContext { + jurisdiction: crate::consent::jurisdiction::Jurisdiction::NonRegulated, + ..ConsentContext::default() + }; + let ec_context = + EcContext::new_for_test(Some(format!("{}.ABC123", "a".repeat(64))), consent); + + let context = build_pull_sync_context(&ec_context); + assert!( + context.is_none(), + "should require client_ip for pull sync dispatch context" + ); + } + + #[test] + fn build_pull_sync_context_returns_context_when_valid() { + let consent = ConsentContext { + jurisdiction: crate::consent::jurisdiction::Jurisdiction::NonRegulated, + ..ConsentContext::default() + }; + let ec_id = format!("{}.ABC123", "a".repeat(64)); + let ec_context = + EcContext::new_for_test_with_ip(Some(ec_id), consent, Some("1.2.3.4".to_owned())); + + let context = build_pull_sync_context(&ec_context) + .expect("should build pull sync context for valid EC with IP"); + assert_eq!(context.ec_hash(), "a".repeat(64).as_str()); + assert_eq!(context.client_ip(), "1.2.3.4"); + } + + #[test] + fn build_pull_sync_context_rejects_invalid_ec_id() { + let consent = ConsentContext { + jurisdiction: crate::consent::jurisdiction::Jurisdiction::NonRegulated, + ..ConsentContext::default() + }; + let ec_context = EcContext::new_for_test_with_ip( + Some("invalid-ec".to_owned()), + consent, + Some("1.2.3.4".to_owned()), + ); + + let context = build_pull_sync_context(&ec_context); + assert!( + context.is_none(), + "should reject pull sync context when EC ID format is invalid" + ); + } + + #[test] + fn partner_is_eligible_when_missing_from_entry() { + let partner = pull_partner(3600); + let entry = KvEntry::minimal("other_partner", "uid-1", 100); + + assert!( + is_partner_pull_eligible(&partner, Some(&entry), 200), + "should dispatch when partner has no stored sync" + ); + } + + #[test] + fn partner_is_not_eligible_when_not_stale() { + let partner = pull_partner(3600); + let entry = KvEntry::minimal("ssp_x", "uid-1", 1000); + + assert!( + !is_partner_pull_eligible(&partner, Some(&entry), 1500), + "should skip dispatch when sync is fresher than ttl" + ); + } + + #[test] + fn validated_pull_sync_url_rejects_http_scheme() { + let mut partner = pull_partner(3600); + partner.pull_sync_url = Some("http://sync.partner.test/pull".to_owned()); + + let validated = validated_pull_sync_url(&partner); + assert!( + validated.is_none(), + "should reject pull_sync_url with HTTP scheme" + ); + } + + #[test] + fn validated_pull_sync_url_rejects_non_allowlisted_host() { + let mut partner = pull_partner(3600); + partner.pull_sync_url = Some("https://evil.test/pull".to_owned()); + + let validated = validated_pull_sync_url(&partner); + assert!( + validated.is_none(), + "should reject runtime pull_sync_url host outside allowlist" + ); + } + + #[test] + fn validated_pull_sync_url_accepts_normalized_allowlist_match() { + let mut partner = pull_partner(3600); + partner.pull_sync_url = Some("https://SYNC.PARTNER.TEST./pull".to_owned()); + partner.pull_sync_allowed_domains = vec!["sync.partner.test".to_owned()]; + + let validated = validated_pull_sync_url(&partner); + assert!( + validated.is_some(), + "should accept allowlist match after hostname normalization" + ); + } + + #[test] + fn build_pull_request_url_appends_query_pairs() { + let url = Url::parse("https://sync.partner.test/pull?x=1").expect("should parse URL"); + let result = build_pull_request_url(url, "hash123", "1.2.3.4"); + + let query = result.query().expect("should have query string"); + assert!(query.contains("x=1"), "should preserve existing query"); + assert!(query.contains("ec_hash=hash123"), "should append ec_hash"); + assert!(query.contains("ip=1.2.3.4"), "should append ip"); + } + + #[test] + fn extract_pull_uid_treats_404_as_noop() { + let response = fastly::Response::from_status(StatusCode::NOT_FOUND); + + let uid = extract_pull_uid(response, "ssp_x"); + assert!(uid.is_none(), "should treat 404 as no-op"); + } + + #[test] + fn extract_pull_uid_treats_uid_null_as_noop() { + let response = fastly::Response::from_status(StatusCode::OK).with_body("{\"uid\":null}"); + + let uid = extract_pull_uid(response, "ssp_x"); + assert!(uid.is_none(), "should treat uid=null as no-op"); + } + + #[test] + fn extract_pull_uid_rejects_oversized_uid() { + let long_uid = "x".repeat(257); + let body = format!("{{\"uid\":\"{long_uid}\"}}"); + let response = fastly::Response::from_status(StatusCode::OK).with_body(body); + + let uid = extract_pull_uid(response, "ssp_x"); + assert!(uid.is_none(), "should reject uid exceeding 256 bytes"); + } + + #[test] + fn extract_pull_uid_reads_uid_from_success_body() { + let response = + fastly::Response::from_status(StatusCode::OK).with_body("{\"uid\":\"abc123\"}"); + + let uid = extract_pull_uid(response, "ssp_x"); + assert_eq!( + uid.as_deref(), + Some("abc123"), + "should parse uid from 200 body" + ); + } +} diff --git a/crates/trusted-server-core/src/settings.rs b/crates/trusted-server-core/src/settings.rs index 5f4486de..bbceaa8c 100644 --- a/crates/trusted-server-core/src/settings.rs +++ b/crates/trusted-server-core/src/settings.rs @@ -224,12 +224,22 @@ pub struct Ec { /// Required for Story 4+ (partner registry). #[serde(default)] pub partner_store: Option, + + /// Maximum number of concurrent pull-sync requests. + #[serde(default = "Ec::default_pull_sync_concurrency")] + pub pull_sync_concurrency: usize, } impl Ec { /// Known placeholder values that must not be used in production. pub const PASSPHRASE_PLACEHOLDERS: &[&str] = &["secret-key", "secret_key", "trusted-server"]; + /// Default maximum concurrent pull-sync requests. + #[must_use] + pub const fn default_pull_sync_concurrency() -> usize { + 3 + } + /// Returns `true` if `passphrase` matches a known placeholder value /// (case-insensitive). #[must_use] diff --git a/trusted-server.toml b/trusted-server.toml index 7953c4f9..4907a686 100644 --- a/trusted-server.toml +++ b/trusted-server.toml @@ -18,6 +18,7 @@ proxy_secret = "change-me-proxy-secret" passphrase = "trusted-server" ec_store = "ec_identity_store" partner_store = "ec_partner_store" +pull_sync_concurrency = 3 # Custom headers to be included in every response # Allows publishers to include tags such as X-Robots-Tag: noindex @@ -177,4 +178,3 @@ timeout_ms = 1000 [integrations.adserver_mock.context_query_params] permutive_segments = "permutive" -