diff --git a/Cargo.lock b/Cargo.lock index b015eb6d..80000dd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2764,6 +2764,7 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.9", + "subtle", "temp-env", "tokio", "tokio-test", diff --git a/Cargo.toml b/Cargo.toml index 424c357d..00655e1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ regex = "1.12.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.149" sha2 = "0.10.9" +subtle = "2.6" temp-env = "0.3.6" tokio = { version = "1.49", features = ["sync", "macros", "io-util", "rt", "time"] } tokio-test = "0.4" diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index d97c8402..4bbd9c78 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -10,6 +10,8 @@ use trusted_server_core::constants::{ ENV_FASTLY_IS_STAGING, ENV_FASTLY_SERVICE_VERSION, HEADER_X_GEO_INFO_AVAILABLE, HEADER_X_TS_ENV, HEADER_X_TS_VERSION, }; +use trusted_server_core::ec::admin::handle_register_partner; +use trusted_server_core::ec::partner::PartnerStore; use trusted_server_core::error::TrustedServerError; use trusted_server_core::geo::GeoInfo; use trusted_server_core::http_util::sanitize_forwarded_headers; @@ -105,10 +107,13 @@ async fn route_request( // Signature verification endpoint (Method::POST, "/verify-signature") => handle_verify_signature(settings, req), - // Key rotation admin endpoints + // Admin endpoints // Keep in sync with Settings::ADMIN_ENDPOINTS in crates/trusted-server-core/src/settings.rs (Method::POST, "/admin/keys/rotate") => handle_rotate_key(settings, req), (Method::POST, "/admin/keys/deactivate") => handle_deactivate_key(settings, req), + (Method::POST, "/admin/partners/register") => { + require_partner_store(settings).and_then(|store| handle_register_partner(&store, req)) + } // Unified auction endpoint (returns creative HTML inline) (Method::POST, "/auction") => handle_auction(settings, orchestrator, req).await, @@ -209,3 +214,15 @@ fn init_logger() { .apply() .expect("should initialize logger"); } + +/// Constructs a `PartnerStore` from settings, or returns 503 if the +/// `partner_store` config is not set. +fn require_partner_store(settings: &Settings) -> Result> { + let store_name = settings.ec.partner_store.as_deref().ok_or_else(|| { + Report::new(TrustedServerError::KvStore { + store_name: "ec.partner_store".to_owned(), + message: "ec.partner_store is not configured".to_owned(), + }) + })?; + Ok(PartnerStore::new(store_name)) +} diff --git a/crates/trusted-server-core/Cargo.toml b/crates/trusted-server-core/Cargo.toml index 5ba2d814..bdbb2b01 100644 --- a/crates/trusted-server-core/Cargo.toml +++ b/crates/trusted-server-core/Cargo.toml @@ -40,6 +40,7 @@ regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } +subtle = { workspace = true } tokio = { workspace = true } toml = { workspace = true } trusted-server-js = { path = "../js" } diff --git a/crates/trusted-server-core/src/ec/admin.rs b/crates/trusted-server-core/src/ec/admin.rs new file mode 100644 index 00000000..db32a3cf --- /dev/null +++ b/crates/trusted-server-core/src/ec/admin.rs @@ -0,0 +1,380 @@ +//! Admin endpoints for partner management. +//! +//! Provides `POST /admin/partners/register` for registering and updating +//! partner configurations. Authentication is handled by the `[[handlers]]` +//! basic-auth layer before this code runs. + +use error_stack::{Report, ResultExt}; +use fastly::{Request, Response}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use url::Host; + +use crate::error::TrustedServerError; + +use super::partner::{ + hash_api_key, validate_partner_id, validate_pull_sync_config, PartnerRecord, PartnerStore, +}; + +/// Request body for `POST /admin/partners/register`. +/// +/// Accepts `api_key` as plaintext — it is hashed before storage and +/// never persisted in cleartext. +#[derive(Debug, Deserialize)] +pub struct RegisterPartnerRequest { + pub id: String, + pub name: String, + pub allowed_return_domains: Vec, + /// Raw API key — will be SHA-256 hashed before storage. + pub api_key: String, + #[serde(default)] + pub bidstream_enabled: bool, + pub source_domain: String, + #[serde(default = "default_openrtb_atype")] + pub openrtb_atype: u8, + #[serde(default = "default_sync_rate_limit")] + pub sync_rate_limit: u32, + #[serde(default = "default_batch_rate_limit")] + pub batch_rate_limit: u32, + #[serde(default)] + pub pull_sync_enabled: bool, + #[serde(default)] + pub pull_sync_url: Option, + #[serde(default)] + pub pull_sync_allowed_domains: Vec, + #[serde(default = "default_pull_sync_ttl_sec")] + pub pull_sync_ttl_sec: u64, + #[serde(default = "default_pull_sync_rate_limit")] + pub pull_sync_rate_limit: u32, + #[serde(default)] + pub ts_pull_token: Option, +} + +fn default_openrtb_atype() -> u8 { + 3 +} +fn default_sync_rate_limit() -> u32 { + 100 +} +fn default_batch_rate_limit() -> u32 { + 60 +} +fn default_pull_sync_ttl_sec() -> u64 { + 86400 +} +fn default_pull_sync_rate_limit() -> u32 { + 10 +} + +fn bad_request(message: impl Into) -> Report { + Report::new(TrustedServerError::BadRequest { + message: message.into(), + }) +} + +fn normalize_required_text( + value: &str, + field_name: &str, +) -> Result> { + let trimmed = value.trim(); + if trimmed.is_empty() { + return Err(bad_request(format!("{field_name} is required"))); + } + Ok(trimmed.to_owned()) +} + +fn normalize_hostname(value: &str, field_name: &str) -> Result> { + let trimmed = value.trim().trim_end_matches('.'); + if trimmed.is_empty() { + return Err(bad_request(format!("{field_name} is required"))); + } + + let normalized = trimmed.to_ascii_lowercase(); + Host::parse(&normalized) + .map_err(|_| bad_request(format!("{field_name} must be a valid hostname")))?; + + Ok(normalized) +} + +fn normalize_hostname_list( + values: Vec, + field_name: &str, +) -> Result, Report> { + let mut normalized_values = Vec::with_capacity(values.len()); + let mut seen = HashSet::with_capacity(values.len()); + + for value in values { + let trimmed = value.trim().trim_end_matches('.'); + if trimmed.is_empty() { + return Err(bad_request(format!( + "{field_name} entries must not be empty" + ))); + } + + let normalized = trimmed.to_ascii_lowercase(); + Host::parse(&normalized).map_err(|_| { + bad_request(format!("{field_name} contains invalid hostname '{value}'")) + })?; + + if seen.insert(normalized.clone()) { + normalized_values.push(normalized); + } + } + + Ok(normalized_values) +} + +/// Response body for `POST /admin/partners/register`. +/// +/// Echoes key fields without exposing sensitive data (`api_key_hash`, +/// `ts_pull_token`). +#[derive(Debug, Serialize)] +pub struct RegisterPartnerResponse { + pub id: String, + pub name: String, + pub pull_sync_enabled: bool, + pub bidstream_enabled: bool, + pub created: bool, +} + +/// Handles `POST /admin/partners/register`. +/// +/// Registers a new partner or updates an existing one. Authentication is +/// handled upstream by the `[[handlers]]` basic-auth layer. +/// +/// # Errors +/// +/// Returns `Report` on validation failure (400), +/// KV store failure (503), or JSON parse failure (400). +pub fn handle_register_partner( + partner_store: &PartnerStore, + mut req: Request, +) -> Result> { + // Parse request body. + let body_bytes = req.take_body_bytes(); + let request: RegisterPartnerRequest = + serde_json::from_slice(&body_bytes).change_context(TrustedServerError::BadRequest { + message: "Invalid JSON in request body".to_owned(), + })?; + + let RegisterPartnerRequest { + id, + name, + allowed_return_domains, + api_key, + bidstream_enabled, + source_domain, + openrtb_atype, + sync_rate_limit, + batch_rate_limit, + pull_sync_enabled, + pull_sync_url, + pull_sync_allowed_domains, + pull_sync_ttl_sec, + pull_sync_rate_limit, + ts_pull_token, + } = request; + + // Validate partner ID. + validate_partner_id(&id).map_err(bad_request)?; + + // Validate and normalize required fields. + let name = normalize_required_text(&name, "name")?; + if api_key.trim().is_empty() { + return Err(bad_request("api_key is required")); + } + let source_domain = normalize_hostname(&source_domain, "source_domain")?; + + if allowed_return_domains.is_empty() { + return Err(bad_request( + "allowed_return_domains must have at least one entry", + )); + } + let allowed_return_domains = + normalize_hostname_list(allowed_return_domains, "allowed_return_domains")?; + let pull_sync_allowed_domains = + normalize_hostname_list(pull_sync_allowed_domains, "pull_sync_allowed_domains")?; + + // Build the PartnerRecord with hashed API key. + let record = PartnerRecord { + id, + name, + allowed_return_domains, + api_key_hash: hash_api_key(&api_key), + bidstream_enabled, + source_domain, + openrtb_atype, + sync_rate_limit, + batch_rate_limit, + pull_sync_enabled, + pull_sync_url, + pull_sync_allowed_domains, + pull_sync_ttl_sec, + pull_sync_rate_limit, + ts_pull_token, + }; + + // Validate pull sync configuration. + validate_pull_sync_config(&record).map_err(bad_request)?; + + // Persist to KV store. + let created = partner_store.upsert(&record)?; + + let status = if created { + log::info!("Registered new partner '{}'", record.id); + fastly::http::StatusCode::CREATED + } else { + log::info!("Updated existing partner '{}'", record.id); + fastly::http::StatusCode::OK + }; + + let response_body = RegisterPartnerResponse { + id: record.id, + name: record.name, + pull_sync_enabled: record.pull_sync_enabled, + bidstream_enabled: record.bidstream_enabled, + created, + }; + + let body = serde_json::to_string(&response_body).change_context( + TrustedServerError::Configuration { + message: "Failed to serialize registration response".to_owned(), + }, + )?; + + Ok(Response::from_status(status) + .with_content_type(fastly::mime::APPLICATION_JSON) + .with_body(body)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn request_deserializes_with_defaults() { + let json = r#"{ + "id": "ssp_x", + "name": "SSP Example", + "allowed_return_domains": ["sync.example-ssp.com"], + "api_key": "raw-secret-key", + "source_domain": "example-ssp.com" + }"#; + + let req: RegisterPartnerRequest = + serde_json::from_str(json).expect("should deserialize with defaults"); + + assert_eq!(req.id, "ssp_x"); + assert_eq!(req.openrtb_atype, 3, "should default to 3"); + assert_eq!(req.sync_rate_limit, 100, "should default to 100"); + assert_eq!(req.batch_rate_limit, 60, "should default to 60"); + assert_eq!(req.pull_sync_ttl_sec, 86400, "should default to 86400"); + assert_eq!(req.pull_sync_rate_limit, 10, "should default to 10"); + assert!(!req.bidstream_enabled, "should default to false"); + assert!(!req.pull_sync_enabled, "should default to false"); + assert!(req.pull_sync_url.is_none()); + assert!(req.ts_pull_token.is_none()); + } + + #[test] + fn response_does_not_contain_sensitive_fields() { + let response = RegisterPartnerResponse { + id: "ssp_x".to_owned(), + name: "SSP Example".to_owned(), + pull_sync_enabled: false, + bidstream_enabled: true, + created: true, + }; + + let json = serde_json::to_string(&response).expect("should serialize"); + assert!(!json.contains("api_key"), "should not contain api_key"); + assert!( + !json.contains("api_key_hash"), + "should not contain api_key_hash" + ); + assert!( + !json.contains("ts_pull_token"), + "should not contain ts_pull_token" + ); + } + + #[test] + fn request_deserializes_full_payload() { + let json = r#"{ + "id": "ssp_x", + "name": "SSP Example", + "allowed_return_domains": ["sync.example-ssp.com"], + "api_key": "raw-secret-key", + "bidstream_enabled": true, + "source_domain": "example-ssp.com", + "openrtb_atype": 3, + "sync_rate_limit": 200, + "batch_rate_limit": 120, + "pull_sync_enabled": true, + "pull_sync_url": "https://sync.example-ssp.com/pull", + "pull_sync_allowed_domains": ["sync.example-ssp.com"], + "pull_sync_ttl_sec": 43200, + "pull_sync_rate_limit": 5, + "ts_pull_token": "bearer-token-123" + }"#; + + let req: RegisterPartnerRequest = + serde_json::from_str(json).expect("should deserialize full payload"); + + assert_eq!(req.sync_rate_limit, 200); + assert_eq!(req.batch_rate_limit, 120); + assert!(req.pull_sync_enabled); + assert_eq!( + req.pull_sync_url.as_deref(), + Some("https://sync.example-ssp.com/pull") + ); + } + + #[test] + fn normalize_required_text_rejects_whitespace_only() { + let err = normalize_required_text(" ", "name") + .expect_err("should reject whitespace-only required field"); + assert!( + err.to_string().contains("name is required"), + "should mention required field" + ); + } + + #[test] + fn normalize_hostname_normalizes_case_and_trailing_dot() { + let normalized = normalize_hostname(" Sync.Example.COM. ", "source_domain") + .expect("should parse host"); + assert_eq!(normalized, "sync.example.com"); + } + + #[test] + fn normalize_hostname_list_rejects_empty_entry() { + let err = normalize_hostname_list( + vec!["sync.example.com".to_owned(), " ".to_owned()], + "allowed_return_domains", + ) + .expect_err("should reject empty domain entries"); + assert!( + err.to_string() + .contains("allowed_return_domains entries must not be empty"), + "should surface empty-entry error" + ); + } + + #[test] + fn normalize_hostname_list_deduplicates_normalized_values() { + let normalized = normalize_hostname_list( + vec![ + "Sync.Example.com".to_owned(), + "sync.example.com.".to_owned(), + "cdn.example.com".to_owned(), + ], + "allowed_return_domains", + ) + .expect("should normalize hostnames"); + assert_eq!( + normalized, + vec!["sync.example.com".to_owned(), "cdn.example.com".to_owned()] + ); + } +} diff --git a/crates/trusted-server-core/src/ec/mod.rs b/crates/trusted-server-core/src/ec/mod.rs index 97116239..df5e6289 100644 --- a/crates/trusted-server-core/src/ec/mod.rs +++ b/crates/trusted-server-core/src/ec/mod.rs @@ -18,12 +18,16 @@ //! - [`cookies`] — `Set-Cookie` header creation and expiration helpers //! - [`kv`] — KV Store identity graph operations (CAS, tombstones, debounce) //! - [`kv_types`] — Schema types for KV identity graph entries +//! - [`partner`] — Partner registry (`PartnerRecord`, `PartnerStore`) +//! - [`admin`] — Admin endpoints for partner management +pub mod admin; pub mod consent; pub mod cookies; pub mod generation; pub mod kv; pub mod kv_types; +pub mod partner; use cookie::CookieJar; use error_stack::Report; diff --git a/crates/trusted-server-core/src/ec/partner.rs b/crates/trusted-server-core/src/ec/partner.rs new file mode 100644 index 00000000..7e1566a8 --- /dev/null +++ b/crates/trusted-server-core/src/ec/partner.rs @@ -0,0 +1,732 @@ +//! Partner registry — `PartnerRecord` schema and `PartnerStore` operations. +//! +//! Each partner (SSP, DSP, identity vendor) is stored as a JSON record in +//! the Fastly KV Store keyed by `partner_id`. A secondary index +//! `apikey:{sha256_hex}` provides O(1) API key lookups for batch sync auth. + +use std::{collections::HashSet, sync::OnceLock}; + +use error_stack::{Report, ResultExt}; +use fastly::kv_store::KVStore; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use subtle::ConstantTimeEq; + +use crate::error::TrustedServerError; + +/// Regex pattern for valid partner IDs. +/// Lowercase alphanumeric, hyphens, underscores; 1-32 characters. +const PARTNER_ID_PATTERN: &str = r"^[a-z0-9_-]{1,32}$"; + +/// Reserved partner IDs that would collide with managed `X-ts-*` headers. +const RESERVED_PARTNER_IDS: &[&str] = &[ + "ec", + "eids", + "ec-consent", + "eids-truncated", + "synthetic", + "ts", + "version", + "env", +]; + +/// Prefix for the API key hash secondary index keys. +const APIKEY_INDEX_PREFIX: &str = "apikey:"; + +/// Cached compiled regex for partner ID validation. +static PARTNER_ID_REGEX: OnceLock> = OnceLock::new(); + +fn partner_id_regex() -> Result<&'static Regex, String> { + PARTNER_ID_REGEX + .get_or_init(|| { + Regex::new(PARTNER_ID_PATTERN) + .map_err(|e| format!("internal error compiling partner ID regex: {e}")) + }) + .as_ref() + .map_err(Clone::clone) +} + +/// A registered partner configuration stored in the partner KV store. +/// +/// Created via `POST /admin/partners/register`. Used by pixel sync, batch +/// sync, pull sync, and auction bidstream decoration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PartnerRecord { + /// Unique partner identifier. Must match [`PARTNER_ID_PATTERN`] and + /// not be in [`RESERVED_PARTNER_IDS`]. Used to build `X-ts-` + /// response headers. + pub id: String, + /// Human-readable partner name. + pub name: String, + /// Exact hostnames allowed as `return` URL domains in pixel sync. + pub allowed_return_domains: Vec, + /// SHA-256 hex of the partner's API key. Plaintext is never stored. + pub api_key_hash: String, + /// Whether this partner's UIDs appear in auction `user.eids`. + pub bidstream_enabled: bool, + /// `OpenRTB` `source.domain` for EID entries (e.g. `"liveramp.com"`). + pub source_domain: String, + /// `OpenRTB` `atype` value (typically 3). + pub openrtb_atype: u8, + /// Max pixel sync writes per EC hash per partner per hour. + pub sync_rate_limit: u32, + /// Max batch sync API requests per partner per minute. + pub batch_rate_limit: u32, + /// Whether server-to-server pull sync is enabled for this partner. + pub pull_sync_enabled: bool, + /// URL to call for pull sync. Required when `pull_sync_enabled`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pull_sync_url: Option, + /// Allowlist of domains TS may call for this partner's pull sync. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub pull_sync_allowed_domains: Vec, + /// Seconds between pull sync refreshes (default 86400). + pub pull_sync_ttl_sec: u64, + /// Max pull sync calls per EC hash per partner per hour. + pub pull_sync_rate_limit: u32, + /// Outbound bearer token for pull sync requests. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub ts_pull_token: Option, +} + +/// Validates a partner ID format and checks against reserved names. +/// +/// # Errors +/// +/// Returns a descriptive error string on validation failure. +pub fn validate_partner_id(id: &str) -> Result<(), String> { + let re = partner_id_regex()?; + if !re.is_match(id) { + return Err(format!( + "partner ID must match {PARTNER_ID_PATTERN}, got: '{id}'" + )); + } + if RESERVED_PARTNER_IDS.contains(&id) { + return Err(format!("partner ID '{id}' is reserved")); + } + Ok(()) +} + +/// Validates pull sync configuration consistency. +/// +/// When `pull_sync_enabled` is true, both `pull_sync_url` and +/// `ts_pull_token` must be present, and the URL's hostname must +/// appear in `pull_sync_allowed_domains`. +/// +/// # Errors +/// +/// Returns a descriptive error string on validation failure. +pub fn validate_pull_sync_config(record: &PartnerRecord) -> Result<(), String> { + if !record.pull_sync_enabled { + return Ok(()); + } + + let url_str = record.pull_sync_url.as_deref().unwrap_or(""); + if url_str.is_empty() { + return Err( + "pull_sync_url and ts_pull_token are required when pull_sync_enabled is true" + .to_owned(), + ); + } + + if record.ts_pull_token.as_deref().unwrap_or("").is_empty() { + return Err( + "pull_sync_url and ts_pull_token are required when pull_sync_enabled is true" + .to_owned(), + ); + } + + // Validate that the pull sync URL hostname is in the allowed domains. + let parsed = + url::Url::parse(url_str).map_err(|e| format!("pull_sync_url is not a valid URL: {e}"))?; + let host = parsed + .host_str() + .ok_or("pull_sync_url has no hostname")? + .trim_end_matches('.') + .to_ascii_lowercase(); + + let allowed: HashSet = record + .pull_sync_allowed_domains + .iter() + .map(|domain| domain.trim().trim_end_matches('.').to_ascii_lowercase()) + .collect(); + + if !allowed.contains(&host) { + return Err("pull_sync_url domain must be in pull_sync_allowed_domains".to_owned()); + } + + Ok(()) +} + +/// Computes the SHA-256 hex digest of an API key. +#[must_use] +pub fn hash_api_key(api_key: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(api_key.as_bytes()); + hex::encode(hasher.finalize()) +} + +/// Wraps a Fastly KV Store for partner registry operations. +/// +/// Partner records are keyed by `partner_id`. A secondary index +/// `apikey:{sha256_hex}` maps API key hashes to partner IDs for +/// O(1) auth lookups during batch sync. +pub struct PartnerStore { + store_name: String, +} + +impl PartnerStore { + /// Creates a new partner store backed by the named KV store. + #[must_use] + pub fn new(store_name: impl Into) -> Self { + Self { + store_name: store_name.into(), + } + } + + /// Returns the configured store name. + #[must_use] + pub fn store_name(&self) -> &str { + &self.store_name + } + + /// Opens the underlying Fastly KV store. + fn open_store(&self) -> Result> { + KVStore::open(&self.store_name) + .change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: "Failed to open partner store".to_owned(), + })? + .ok_or_else(|| { + Report::new(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: "Partner store not found".to_owned(), + }) + }) + } + + /// Reads a partner record by ID. + /// + /// Returns `Ok(None)` when the partner is not registered. + /// + /// # Errors + /// + /// Returns [`TrustedServerError::KvStore`] on store or deserialization failure. + pub fn get( + &self, + partner_id: &str, + ) -> Result, Report> { + let store = self.open_store()?; + let mut response = match store.lookup(partner_id) { + Ok(resp) => resp, + Err(fastly::kv_store::KVStoreError::ItemNotFound) => return Ok(None), + Err(err) => { + return Err( + Report::new(err).change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("Failed to read partner '{partner_id}'"), + }), + ); + } + }; + + let body_bytes = response.take_body_bytes(); + let record: PartnerRecord = + serde_json::from_slice(&body_bytes).change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("Failed to deserialize partner '{partner_id}'"), + })?; + + Ok(Some(record)) + } + + /// Writes or updates a partner record and maintains the API key index. + /// + /// Returns `true` if this was a new partner (create), `false` if an + /// existing partner was updated. + /// + /// Index maintenance order: + /// 1. Read existing `apikey:` index value for rollback + /// 2. Write new `apikey:` index + /// 3. Write primary record + /// 4. Delete old `apikey:` index (if key rotated) + /// + /// Writes are still **not fully atomic**, but this order ensures + /// registration does not return success after a failed index write and + /// performs best-effort rollback when primary write fails. + /// + /// # Errors + /// + /// Returns [`TrustedServerError::KvStore`] on store failure. + pub fn upsert(&self, record: &PartnerRecord) -> Result> { + let store = self.open_store()?; + + // Read existing record to detect API key rotation. + let existing = match store.lookup(&record.id) { + Ok(mut resp) => { + let bytes = resp.take_body_bytes(); + serde_json::from_slice::(&bytes).ok() + } + Err(_) => None, + }; + + let is_create = existing.is_none(); + let old_api_key_hash = existing.as_ref().map(|r| r.api_key_hash.clone()); + + let index_key = format!("{APIKEY_INDEX_PREFIX}{}", record.api_key_hash); + let previous_index_partner_id = self.read_index_partner_id(&store, &index_key)?; + + // 1. Write new API key index. + store + .build_insert() + .execute(&index_key, record.id.as_str()) + .change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!( + "Failed to write API key index for partner '{}' (hash '{}')", + record.id, record.api_key_hash + ), + })?; + + // 2. Write primary record. + let body = serde_json::to_string(record).change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("Failed to serialize partner '{}'", record.id), + })?; + + if let Err(err) = store.build_insert().execute(&record.id, body) { + self.restore_previous_index_mapping( + &store, + &index_key, + previous_index_partner_id.as_deref(), + &record.id, + ); + return Err( + Report::new(err).change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("Failed to write partner '{}'", record.id), + }), + ); + } + + // 3. Delete old API key index if key rotated. + if let Some(ref old_hash) = old_api_key_hash { + if *old_hash != record.api_key_hash { + let old_key = format!("{APIKEY_INDEX_PREFIX}{old_hash}"); + if let Err(err) = store.delete(&old_key) { + log::warn!( + "Failed to delete old API key index for partner '{}': {err:?}", + record.id, + ); + } + } + } + + Ok(is_create) + } + + fn read_index_partner_id( + &self, + store: &KVStore, + index_key: &str, + ) -> Result, Report> { + let mut response = match store.lookup(index_key) { + Ok(resp) => resp, + Err(fastly::kv_store::KVStoreError::ItemNotFound) => return Ok(None), + Err(err) => { + return Err( + Report::new(err).change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!( + "Failed to read existing API key index before upsert ('{index_key}')" + ), + }), + ); + } + }; + + let partner_id = String::from_utf8(response.take_body_bytes()).map_err(|_| { + Report::new(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!( + "Existing API key index value is not valid UTF-8 before upsert ('{index_key}')" + ), + }) + })?; + + Ok(Some(partner_id)) + } + + fn restore_previous_index_mapping( + &self, + store: &KVStore, + index_key: &str, + previous_partner_id: Option<&str>, + partner_id: &str, + ) { + if let Some(previous_partner_id) = previous_partner_id { + if previous_partner_id == partner_id { + return; + } + + if let Err(err) = store.build_insert().execute(index_key, previous_partner_id) { + log::warn!( + "Failed to restore previous API key index mapping after write failure for partner '{}': {err:?}", + partner_id, + ); + } + return; + } + + match store.delete(index_key) { + Ok(()) | Err(fastly::kv_store::KVStoreError::ItemNotFound) => {} + Err(err) => { + log::warn!( + "Failed to roll back API key index after write failure for partner '{}': {err:?}", + partner_id, + ); + } + } + } + + /// Looks up a partner by API key hash using the `apikey:` secondary index. + /// + /// After resolving the index to a partner ID, re-verifies that the + /// stored `api_key_hash` matches the lookup hash (guards against stale + /// index entries from key rotation). + /// + /// # Errors + /// + /// Returns [`TrustedServerError::KvStore`] on store failure. + pub fn find_by_api_key_hash( + &self, + hash: &str, + ) -> Result, Report> { + let store = self.open_store()?; + + // Look up the secondary index. + let index_key = format!("{APIKEY_INDEX_PREFIX}{hash}"); + let mut index_response = match store.lookup(&index_key) { + Ok(resp) => resp, + Err(fastly::kv_store::KVStoreError::ItemNotFound) => return Ok(None), + Err(err) => { + return Err( + Report::new(err).change_context(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("Failed to read API key index for hash '{hash}'"), + }), + ); + } + }; + + let partner_id = String::from_utf8(index_response.take_body_bytes()).map_err(|_| { + Report::new(TrustedServerError::KvStore { + store_name: self.store_name.clone(), + message: format!("API key index value for hash '{hash}' is not valid UTF-8"), + }) + })?; + + // Fetch the actual partner record. + let record = match self.get(&partner_id)? { + Some(r) => r, + None => { + // Stale index — partner was deleted. + log::warn!( + "API key index points to non-existent partner '{partner_id}' (stale index)" + ); + return Ok(None); + } + }; + + // Verify the stored hash matches — guards against stale index from + // key rotation. + if record.api_key_hash != hash { + log::warn!( + "API key hash mismatch for partner '{}' (stale index after key rotation)", + record.id, + ); + return Ok(None); + } + + Ok(Some(record)) + } + + /// Verifies an API key against the stored hash for a given partner. + /// + /// Uses SHA-256 hashing and constant-time comparison to prevent + /// timing attacks. + /// + /// # Errors + /// + /// Returns [`TrustedServerError::KvStore`] if the partner lookup fails. + pub fn verify_api_key( + &self, + partner_id: &str, + api_key: &str, + ) -> Result> { + let record = match self.get(partner_id)? { + Some(r) => r, + None => return Ok(false), + }; + + let incoming_hash = hash_api_key(api_key); + let stored_bytes = record.api_key_hash.as_bytes(); + let incoming_bytes = incoming_hash.as_bytes(); + + Ok(stored_bytes.ct_eq(incoming_bytes).into()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn partner_record_serialization_roundtrip() { + let record = PartnerRecord { + id: "ssp_x".to_owned(), + name: "SSP Example".to_owned(), + allowed_return_domains: vec!["sync.example-ssp.com".to_owned()], + api_key_hash: hash_api_key("test-api-key"), + bidstream_enabled: true, + source_domain: "example-ssp.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: false, + pull_sync_url: None, + pull_sync_allowed_domains: vec![], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: None, + }; + + let json = serde_json::to_string(&record).expect("should serialize"); + let deserialized: PartnerRecord = serde_json::from_str(&json).expect("should deserialize"); + + assert_eq!(deserialized, record); + } + + #[test] + fn hash_api_key_is_deterministic() { + let h1 = hash_api_key("my-secret-key"); + let h2 = hash_api_key("my-secret-key"); + assert_eq!(h1, h2); + assert_eq!(h1.len(), 64, "should be 64-char hex SHA-256"); + } + + #[test] + fn hash_api_key_differs_for_different_keys() { + let h1 = hash_api_key("key-a"); + let h2 = hash_api_key("key-b"); + assert_ne!(h1, h2); + } + + #[test] + fn validate_partner_id_accepts_valid() { + assert!(validate_partner_id("ssp_x").is_ok()); + assert!(validate_partner_id("liveramp").is_ok()); + assert!(validate_partner_id("a-b_c-1").is_ok()); + assert!(validate_partner_id("a").is_ok()); + } + + #[test] + fn validate_partner_id_rejects_uppercase() { + let err = validate_partner_id("SSP_X").unwrap_err(); + assert!( + err.contains("must match"), + "should reject uppercase, got: {err}" + ); + } + + #[test] + fn validate_partner_id_rejects_too_long() { + let long = "a".repeat(33); + let err = validate_partner_id(&long).unwrap_err(); + assert!( + err.contains("must match"), + "should reject >32 chars, got: {err}" + ); + } + + #[test] + fn validate_partner_id_rejects_empty() { + let err = validate_partner_id("").unwrap_err(); + assert!( + err.contains("must match"), + "should reject empty, got: {err}" + ); + } + + #[test] + fn validate_partner_id_rejects_reserved() { + for reserved in RESERVED_PARTNER_IDS { + let err = validate_partner_id(reserved).unwrap_err(); + assert!( + err.contains("reserved"), + "should reject '{reserved}', got: {err}" + ); + } + } + + #[test] + fn validate_partner_id_rejects_special_chars() { + assert!(validate_partner_id("ssp.x").is_err(), "should reject dots"); + assert!( + validate_partner_id("ssp x").is_err(), + "should reject spaces" + ); + assert!( + validate_partner_id("ssp/x").is_err(), + "should reject slashes" + ); + } + + #[test] + fn validate_pull_sync_ok_when_disabled() { + let record = PartnerRecord { + id: "test".to_owned(), + name: "Test".to_owned(), + allowed_return_domains: vec![], + api_key_hash: String::new(), + bidstream_enabled: false, + source_domain: "test.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: false, + pull_sync_url: None, + pull_sync_allowed_domains: vec![], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: None, + }; + assert!(validate_pull_sync_config(&record).is_ok()); + } + + #[test] + fn validate_pull_sync_rejects_missing_url() { + let record = PartnerRecord { + id: "test".to_owned(), + name: "Test".to_owned(), + allowed_return_domains: vec![], + api_key_hash: String::new(), + bidstream_enabled: false, + source_domain: "test.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: true, + pull_sync_url: None, + pull_sync_allowed_domains: vec![], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: Some("token".to_owned()), + }; + let err = validate_pull_sync_config(&record).unwrap_err(); + assert!(err.contains("pull_sync_url"), "got: {err}"); + } + + #[test] + fn validate_pull_sync_rejects_missing_token() { + let record = PartnerRecord { + id: "test".to_owned(), + name: "Test".to_owned(), + allowed_return_domains: vec![], + api_key_hash: String::new(), + bidstream_enabled: false, + source_domain: "test.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: true, + pull_sync_url: Some("https://sync.test.com/pull".to_owned()), + pull_sync_allowed_domains: vec!["sync.test.com".to_owned()], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: None, + }; + let err = validate_pull_sync_config(&record).unwrap_err(); + assert!(err.contains("ts_pull_token"), "got: {err}"); + } + + #[test] + fn validate_pull_sync_rejects_url_not_in_allowed_domains() { + let record = PartnerRecord { + id: "test".to_owned(), + name: "Test".to_owned(), + allowed_return_domains: vec![], + api_key_hash: String::new(), + bidstream_enabled: false, + source_domain: "test.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: true, + pull_sync_url: Some("https://evil.com/pull".to_owned()), + pull_sync_allowed_domains: vec!["sync.test.com".to_owned()], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: Some("token".to_owned()), + }; + let err = validate_pull_sync_config(&record).unwrap_err(); + assert!(err.contains("pull_sync_allowed_domains"), "got: {err}"); + } + + #[test] + fn validate_pull_sync_accepts_valid_config() { + let record = PartnerRecord { + id: "test".to_owned(), + name: "Test".to_owned(), + allowed_return_domains: vec![], + api_key_hash: String::new(), + bidstream_enabled: false, + source_domain: "test.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: true, + pull_sync_url: Some("https://sync.test.com/pull".to_owned()), + pull_sync_allowed_domains: vec!["sync.test.com".to_owned()], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: Some("token".to_owned()), + }; + assert!(validate_pull_sync_config(&record).is_ok()); + } + + #[test] + fn optional_fields_omitted_from_json() { + let record = PartnerRecord { + id: "test".to_owned(), + name: "Test".to_owned(), + allowed_return_domains: vec![], + api_key_hash: String::new(), + bidstream_enabled: false, + source_domain: "test.com".to_owned(), + openrtb_atype: 3, + sync_rate_limit: 100, + batch_rate_limit: 60, + pull_sync_enabled: false, + pull_sync_url: None, + pull_sync_allowed_domains: vec![], + pull_sync_ttl_sec: 86400, + pull_sync_rate_limit: 10, + ts_pull_token: None, + }; + let json = serde_json::to_string(&record).expect("should serialize"); + assert!( + !json.contains("pull_sync_url"), + "None pull_sync_url should be omitted" + ); + assert!( + !json.contains("ts_pull_token"), + "None ts_pull_token should be omitted" + ); + assert!( + !json.contains("pull_sync_allowed_domains"), + "empty pull_sync_allowed_domains should be omitted" + ); + } +} diff --git a/crates/trusted-server-core/src/settings.rs b/crates/trusted-server-core/src/settings.rs index e2f65c2c..5f4486de 100644 --- a/crates/trusted-server-core/src/settings.rs +++ b/crates/trusted-server-core/src/settings.rs @@ -483,7 +483,11 @@ impl Settings { /// endpoints are always protected by authentication. /// Update [`ADMIN_ENDPOINTS`](Self::ADMIN_ENDPOINTS) when adding new /// admin routes to `crates/trusted-server-adapter-fastly/src/main.rs`. - pub(crate) const ADMIN_ENDPOINTS: &[&str] = &["/admin/keys/rotate", "/admin/keys/deactivate"]; + pub(crate) const ADMIN_ENDPOINTS: &[&str] = &[ + "/admin/keys/rotate", + "/admin/keys/deactivate", + "/admin/partners/register", + ]; /// Returns admin endpoint paths that no configured handler covers. /// @@ -1431,8 +1435,12 @@ mod tests { let uncovered = settings.uncovered_admin_endpoints(); assert_eq!( uncovered, - vec!["/admin/keys/rotate", "/admin/keys/deactivate"], - "should report both admin endpoints as uncovered" + vec![ + "/admin/keys/rotate", + "/admin/keys/deactivate", + "/admin/partners/register", + ], + "should report all admin endpoints as uncovered" ); } @@ -1461,8 +1469,8 @@ mod tests { let uncovered = settings.uncovered_admin_endpoints(); assert_eq!( uncovered, - vec!["/admin/keys/deactivate"], - "should detect that only deactivate is uncovered" + vec!["/admin/keys/deactivate", "/admin/partners/register"], + "should detect endpoints not covered by the rotate-only handler" ); }