From 56f8b36eb76e45e9c5e243d428cd4dbabef4e46e Mon Sep 17 00:00:00 2001 From: Matias Date: Tue, 17 Mar 2026 12:01:10 -0300 Subject: [PATCH 1/2] feat(ampsync): add configurable index creation on table setup --- Cargo.lock | 2 + crates/bin/ampsync/Cargo.toml | 2 + crates/bin/ampsync/README.md | 48 +++ crates/bin/ampsync/src/commands/sync.rs | 34 +- crates/bin/ampsync/src/config.rs | 11 + crates/bin/ampsync/src/engine.rs | 249 +++++++++++-- crates/bin/ampsync/src/index.rs | 451 ++++++++++++++++++++++++ crates/bin/ampsync/src/lib.rs | 1 + crates/bin/ampsync/src/manager.rs | 8 +- crates/bin/ampsync/src/sql.rs | 204 +++++++++++ crates/bin/ampsync/src/task.rs | 8 +- 11 files changed, 991 insertions(+), 27 deletions(-) create mode 100644 crates/bin/ampsync/src/index.rs diff --git a/Cargo.lock b/Cargo.lock index b43ff87c4..e63c83ce2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1432,11 +1432,13 @@ dependencies = [ "pg_escape", "pgtemp", "rustls", + "serde", "sqlparser", "sqlx", "thiserror 2.0.18", "tokio", "tokio-util", + "toml", "tracing", "uuid", ] diff --git a/crates/bin/ampsync/Cargo.toml b/crates/bin/ampsync/Cargo.toml index f5f0053c2..d127c3ced 100644 --- a/crates/bin/ampsync/Cargo.toml +++ b/crates/bin/ampsync/Cargo.toml @@ -32,6 +32,8 @@ datasets-common = { path = "../../core/datasets-common" } futures.workspace = true monitoring = { path = "../../core/monitoring" } pg_escape.workspace = true +serde.workspace = true +toml.workspace = true rustls.workspace = true sqlparser.workspace = true sqlx = { version = "0.8.6", features = [ diff --git a/crates/bin/ampsync/README.md b/crates/bin/ampsync/README.md index e6e48d5c3..c252807d8 100644 --- a/crates/bin/ampsync/README.md +++ b/crates/bin/ampsync/README.md @@ -25,6 +25,7 @@ Options: --grpc-max-decode-mb Max gRPC decode size in MiB (default: 32, range: 1-512) --max-db-connections Max connections (default: 10, range: 1-1000) --retention-blocks Retention blocks (default: 128, min: 64) + --config TOML config file for index definitions --auth-token Authentication token for Arrow Flight -h, --help Print help -V, --version Print version @@ -55,6 +56,7 @@ All CLI arguments can also be set via environment variables: - **`MAX_DB_CONNECTIONS`** (default: `10`): Database connection pool size (valid range: 1-1000) - **`RETENTION_BLOCKS`** (default: `128`): Watermark retention window (must be >= 64) - **`AMP_AUTH_TOKEN`** (optional): Bearer token for authenticating requests to the Arrow Flight server +- **`AMPSYNC_CONFIG`** (optional): Path to TOML config file for index definitions (see [Index Configuration](#index-configuration)) ## Running @@ -93,6 +95,52 @@ services: restart: unless-stopped ``` +## Index Configuration + +Ampsync can create PostgreSQL indexes on synced tables via a TOML config file +passed with `--config` (or `AMPSYNC_CONFIG` env var). Indexes are created +atomically alongside tables using `IF NOT EXISTS`, so adding new indexes and +restarting is safe and idempotent. + +### Config file format + +Each `[[index]]` entry defines one index: + +```toml +[[index]] +table = "transfers" # required — must be in --tables list +name = "idx_transfers_sender" # required — valid SQL identifier +columns = ["sender", "block_number desc"] # required — key columns, optional sort +method = "btree" # optional — btree (default), hash, gin, gist, brin +unique = false # optional — default false +include = ["amount"] # optional — INCLUDE columns (covering index) +where = "amount > 1000" # optional — partial index WHERE clause +``` + +### Example + +```toml +# Covering index for lookups by seller with timestamp ordering +[[index]] +table = "usdc_transfers" +name = "idx_usdc_transfers_seller_timestamp_covering" +columns = ["seller_address", "timestamp desc"] +include = ["buyer_address", "value_usdc"] + +# Simple index on buyer address +[[index]] +table = "usdc_transfers" +name = "idx_usdc_transfers_buyer_address" +columns = ["buyer_address"] + +# Partial index for large transfers only +[[index]] +table = "usdc_transfers" +name = "idx_usdc_transfers_large" +columns = ["seller_address"] +where = "value_usdc > 10000" +``` + ## Database Schema ### System Columns diff --git a/crates/bin/ampsync/src/commands/sync.rs b/crates/bin/ampsync/src/commands/sync.rs index 79dbe1784..2c99fde60 100644 --- a/crates/bin/ampsync/src/commands/sync.rs +++ b/crates/bin/ampsync/src/commands/sync.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use amp_client::{AmpClient, PostgresStateStore}; use anyhow::{Context, Result}; use sqlx::postgres::PgPoolOptions; @@ -6,7 +8,7 @@ use tracing::info; use crate::{ config::{self, SyncConfig}, engine::Engine, - health, + health, index, manager::StreamManager, }; @@ -73,8 +75,36 @@ pub async fn run(config: SyncConfig) -> Result<()> { info!("Amp client initialized"); + // Load index configuration from config file (if provided) + let indexes_by_table = match &config.config { + Some(config_path) => { + info!(path = %config_path.display(), "Loading index configuration"); + index::load_config(config_path, &config.tables) + .await + .context("Failed to load index configuration")? + } + None => HashMap::new(), + }; + + if !indexes_by_table.is_empty() { + let total: usize = indexes_by_table.values().map(|v| v.len()).sum(); + info!( + total_indexes = total, + tables_with_indexes = indexes_by_table.len(), + "Index configuration loaded" + ); + } + // Spawn streaming tasks (table creation happens in StreamTask::new) - let manager = StreamManager::new(&mappings, dataset, &config, engine, client, pool); + let manager = StreamManager::new( + &mappings, + dataset, + &config, + engine, + client, + pool, + &indexes_by_table, + ); // Start health server if configured if let Some(port) = config.health_port { diff --git a/crates/bin/ampsync/src/config.rs b/crates/bin/ampsync/src/config.rs index 4ba345a60..bb1b3f4ef 100644 --- a/crates/bin/ampsync/src/config.rs +++ b/crates/bin/ampsync/src/config.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use clap::{Args, Parser, Subcommand}; use datasets_common::{partial_reference::PartialReference, reference::Reference}; @@ -123,6 +125,15 @@ pub struct SyncConfig { #[arg(long, env = "AMP_AUTH_TOKEN")] pub auth_token: Option, + /// Path to TOML config file for index definitions and other settings + /// + /// The config file can define indexes to create on synced tables. + /// See the "Index Configuration" section in the crate README for format details. + /// + /// Can also be set via AMPSYNC_CONFIG environment variable + #[arg(long, env = "AMPSYNC_CONFIG")] + pub config: Option, + /// Health check server port (optional) /// /// When provided, starts an HTTP server on 0.0.0.0 that exposes a /healthz diff --git a/crates/bin/ampsync/src/engine.rs b/crates/bin/ampsync/src/engine.rs index 80b44d333..aaa190a61 100644 --- a/crates/bin/ampsync/src/engine.rs +++ b/crates/bin/ampsync/src/engine.rs @@ -193,11 +193,20 @@ pub enum CreateTableError { source: arrow_to_postgres::error::Error, }, - /// Failed to execute CREATE TABLE DDL statement - #[error("Failed to execute CREATE TABLE for '{table_name}' with {num_columns} columns")] - ExecuteDdl { + /// Index references a column not present in the table schema + #[error( + "Index '{index_name}' on table '{table_name}' references column '{column}' which does not exist in the schema" + )] + IndexColumnNotFound { + table_name: String, + index_name: String, + column: String, + }, + + /// Failed to set up table (CREATE TABLE + CREATE INDEX in transaction) + #[error("Failed to set up table '{table_name}'")] + SetupTable { table_name: String, - num_columns: usize, #[source] source: sqlx::Error, }, @@ -377,7 +386,16 @@ impl Engine { } } - /// Creates a table with composite primary key `(_tx_id, _row_index)`. + /// Creates a table with composite primary key `(_tx_id, _row_index)` and optional indexes. + /// + /// Table and indexes are created atomically in a single transaction: + /// - `CREATE TABLE IF NOT EXISTS` + `CREATE INDEX IF NOT EXISTS` for each index + /// - If the table already exists, both statements are no-ops (fully idempotent) + /// - If any statement fails, the entire transaction rolls back (all-or-nothing) + /// - PostgreSQL serializes concurrent DDL via `AccessExclusiveLock` (race-safe) + /// + /// The whole transaction is retried on transient errors (connection issues, + /// deadlocks, etc.) with exponential backoff and circuit breaker. /// /// # Composite Primary Key Design /// @@ -405,7 +423,10 @@ impl Engine { &self, table_name: &str, stream_schema: SchemaRef, + indexes: &[crate::index::IndexConfig], ) -> Result<(), CreateTableError> { + let table_name_owned = table_name.to_string(); + // Build schema with system columns prepended // System columns: _tx_id, _row_index let mut fields = vec![ @@ -418,7 +439,7 @@ impl Engine { // Get PostgreSQL type mapping from arrow-to-postgres let encoder = ArrowToPostgresBinaryEncoder::try_new(&full_schema).map_err(|err| { CreateTableError::BuildEncoder { - table_name: table_name.to_string(), + table_name: table_name_owned.clone(), source: err, } })?; @@ -434,36 +455,106 @@ impl Engine { )); } - let num_columns = columns.len(); - // Build CREATE TABLE statement with safe identifier quoting let ddl = sql::create_table(table_name, &columns.join(", "), "_tx_id, _row_index"); - let table_name_owned = table_name.to_string(); - let ddl_clone = ddl.clone(); + // Validate that index columns exist in the table schema (includes system columns) + let schema_columns: std::collections::HashSet<&str> = pg_schema + .columns + .iter() + .map(|(name, _)| name.as_str()) + .collect(); + for idx in indexes { + for col in &idx.columns { + if !schema_columns.contains(col.name.as_str()) { + return Err(CreateTableError::IndexColumnNotFound { + table_name: table_name_owned, + index_name: idx.name.clone(), + column: col.name.clone(), + }); + } + } + for col in &idx.include { + if !schema_columns.contains(col.as_str()) { + return Err(CreateTableError::IndexColumnNotFound { + table_name: table_name_owned, + index_name: idx.name.clone(), + column: col.clone(), + }); + } + } + } - // Execute DDL with retry logic - let create_fn = || async { sqlx::query(&ddl_clone).execute(&self.pool).await }; + // Build CREATE INDEX statements + let index_statements: Vec = indexes + .iter() + .map(|idx| { + let cols: Vec<(&str, Option<&str>)> = idx + .columns + .iter() + .map(|c| (c.name.as_str(), c.order.map(|o| o.as_sql()))) + .collect(); + sql::create_index( + &idx.name, + table_name, + &cols, + idx.method.as_sql(), + idx.unique, + &idx.include, + idx.where_clause.as_deref(), + ) + }) + .collect(); - create_fn + for idx in indexes { + tracing::info!( + table = table_name, + index = idx.name.as_str(), + columns = ?idx.columns, + method = idx.method.as_sql(), + unique = idx.unique, + "will_create_index" + ); + } + + // Execute CREATE TABLE + CREATE INDEX in a single transaction with retry + let setup_fn = || { + let ddl = ddl.clone(); + let index_statements = index_statements.clone(); + async move { + let mut tx = self.pool.begin().await?; + + sqlx::query(&ddl).execute(&mut *tx).await?; + + for index_sql in &index_statements { + sqlx::query(index_sql).execute(&mut *tx).await?; + } + + tx.commit().await?; + Ok::<(), sqlx::Error>(()) + } + }; + + let table_name_for_retry = table_name_owned.clone(); + setup_fn .retry(Self::db_retry_policy()) .when(Self::create_retryable_with_circuit_breaker( Self::DB_OPERATION_MAX_RETRY_DURATION, )) .notify(|err: &sqlx::Error, duration: Duration| { tracing::warn!( - table = table_name_owned.as_str(), + table = table_name_for_retry.as_str(), error = %err, error_source = logging::error_source(&err), retry_after = ?duration, - "db_create_table_retry" + "db_setup_table_retry" ); }) .await - .map_err(|err| CreateTableError::ExecuteDdl { + .map_err(|err| CreateTableError::SetupTable { table_name: table_name_owned, - num_columns, source: err, })?; + Ok(()) } @@ -769,7 +860,10 @@ mod tests { Field::new("name", DataType::Utf8, false), ])); - engine.create_table("test_table", schema).await.unwrap(); + engine + .create_table("test_table", schema, &[]) + .await + .unwrap(); // Verify table exists and has correct schema let result: (i64,) = sqlx::query_as( @@ -782,6 +876,110 @@ mod tests { assert_eq!(result.0, 4); // _tx_id, _row_index, id, name } + #[tokio::test] + async fn test_create_table_with_indexes() { + use crate::index::{ColumnSpec, IndexConfig, IndexMethod, SortOrder}; + + let _pg = pgtemp::PgTempDB::new(); + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&_pg.connection_uri()) + .await + .unwrap(); + let engine = Engine::new(pool); + + let schema = Arc::new(Schema::new(vec![ + Field::new("sender", DataType::Utf8, false), + Field::new("receiver", DataType::Utf8, false), + Field::new("amount", DataType::Int64, false), + Field::new("block_number", DataType::Int64, false), + ])); + + let indexes = vec![ + IndexConfig { + table: "transfers".to_string(), + name: "idx_sender".to_string(), + columns: vec![ColumnSpec { + name: "sender".to_string(), + order: None, + }], + method: IndexMethod::Btree, + unique: false, + include: vec![], + where_clause: None, + }, + IndexConfig { + table: "transfers".to_string(), + name: "idx_sender_block".to_string(), + columns: vec![ + ColumnSpec { + name: "sender".to_string(), + order: None, + }, + ColumnSpec { + name: "block_number".to_string(), + order: Some(SortOrder::Desc), + }, + ], + method: IndexMethod::Btree, + unique: false, + include: vec!["amount".to_string()], + where_clause: None, + }, + IndexConfig { + table: "transfers".to_string(), + name: "idx_large_transfers".to_string(), + columns: vec![ColumnSpec { + name: "sender".to_string(), + order: None, + }], + method: IndexMethod::Btree, + unique: false, + include: vec![], + where_clause: Some("amount > 1000".to_string()), + }, + ]; + + engine + .create_table("transfers", schema, &indexes) + .await + .unwrap(); + + // Verify indexes exist in pg_catalog + let index_names: Vec<(String,)> = sqlx::query_as( + "SELECT indexname FROM pg_indexes WHERE tablename = 'transfers' ORDER BY indexname", + ) + .fetch_all(&engine.pool) + .await + .unwrap(); + + let names: Vec<&str> = index_names.iter().map(|r| r.0.as_str()).collect(); + assert!( + names.contains(&"idx_sender"), + "missing idx_sender: {names:?}" + ); + assert!( + names.contains(&"idx_sender_block"), + "missing idx_sender_block: {names:?}" + ); + assert!( + names.contains(&"idx_large_transfers"), + "missing idx_large_transfers: {names:?}" + ); + + // Verify idempotency: calling again with same indexes succeeds + let schema2 = Arc::new(Schema::new(vec![ + Field::new("sender", DataType::Utf8, false), + Field::new("receiver", DataType::Utf8, false), + Field::new("amount", DataType::Int64, false), + Field::new("block_number", DataType::Int64, false), + ])); + engine + .create_table("transfers", schema2, &indexes) + .await + .unwrap(); + } + #[tokio::test] async fn test_insert_and_delete() { let _pg = pgtemp::PgTempDB::new(); @@ -798,7 +996,10 @@ mod tests { DataType::Int64, false, )])); - engine.create_table("test_table", schema).await.unwrap(); + engine + .create_table("test_table", schema, &[]) + .await + .unwrap(); // Create batch with system columns (_tx_id, _row_index) + user columns let num_rows = 3; @@ -859,7 +1060,10 @@ mod tests { DataType::Int64, false, )])); - engine.create_table("test_table", schema).await.unwrap(); + engine + .create_table("test_table", schema, &[]) + .await + .unwrap(); // Create batch with system columns (_tx_id, _row_index) + user columns let batch_with_meta = RecordBatch::try_new( @@ -1019,7 +1223,10 @@ mod tests { DataType::Int64, false, )])); - engine.create_table("test_chunking", schema).await.unwrap(); + engine + .create_table("test_chunking", schema, &[]) + .await + .unwrap(); // Create a large batch (2000 rows) with system columns let num_rows = 2000; diff --git a/crates/bin/ampsync/src/index.rs b/crates/bin/ampsync/src/index.rs new file mode 100644 index 000000000..1fd8ea3e3 --- /dev/null +++ b/crates/bin/ampsync/src/index.rs @@ -0,0 +1,451 @@ +//! Index configuration for ampsync-managed database indexes. +//! +//! Indexes are declared in a TOML config file and created alongside tables +//! in a single atomic transaction. Both use `IF NOT EXISTS` for idempotency, +//! so adding new indexes to the config and restarting will create them on +//! existing tables without affecting existing indexes. + +use std::{collections::HashMap, path::Path}; + +use serde::Deserialize; + +use crate::sql; + +/// Top-level config file structure. +/// +/// Loaded from a TOML file specified via `--config`. +#[derive(Debug, Clone, Deserialize)] +pub struct ConfigFile { + #[serde(default)] + pub index: Vec, +} + +/// Configuration for a single database index. +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct IndexConfig { + /// Which table this index belongs to. + pub table: String, + /// User-chosen index name. + pub name: String, + /// Columns to include in the index key. + /// Each entry can be a plain column name (`"sender"`) or a column with sort + /// direction (`"timestamp desc"`). + pub columns: Vec, + /// Index method (default: btree). + #[serde(default)] + pub method: IndexMethod, + /// Whether the index enforces uniqueness. + #[serde(default)] + pub unique: bool, + /// Non-key columns to include for covering index (INCLUDE clause). + #[serde(default)] + pub include: Vec, + /// Optional WHERE clause for partial indexes. + #[serde(rename = "where")] + pub where_clause: Option, +} + +/// A column reference with optional sort direction. +/// +/// Deserialized from a string like `"sender"`, `"timestamp desc"`, or `"block_number asc"`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ColumnSpec { + pub name: String, + pub order: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SortOrder { + Asc, + Desc, +} + +impl SortOrder { + pub fn as_sql(&self) -> &'static str { + match self { + Self::Asc => "ASC", + Self::Desc => "DESC", + } + } +} + +impl<'de> Deserialize<'de> for ColumnSpec { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + let parts: Vec<&str> = s.split_whitespace().collect(); + match parts.len() { + 1 => Ok(ColumnSpec { + name: parts[0].to_string(), + order: None, + }), + 2 => { + let order = match parts[1].to_lowercase().as_str() { + "asc" => SortOrder::Asc, + "desc" => SortOrder::Desc, + other => { + return Err(serde::de::Error::custom(format!( + "invalid sort order '{other}', expected 'asc' or 'desc'" + ))); + } + }; + Ok(ColumnSpec { + name: parts[0].to_string(), + order: Some(order), + }) + } + _ => Err(serde::de::Error::custom(format!( + "invalid column spec '{s}', expected 'column' or 'column asc|desc'" + ))), + } + } +} + +/// Supported PostgreSQL index methods. +#[derive(Debug, Clone, Copy, Default, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum IndexMethod { + #[default] + Btree, + Hash, + Gin, + Gist, + Brin, +} + +impl IndexMethod { + pub fn as_sql(&self) -> &'static str { + match self { + Self::Btree => "btree", + Self::Hash => "hash", + Self::Gin => "gin", + Self::Gist => "gist", + Self::Brin => "brin", + } + } +} + +/// Errors from loading or validating index configuration. +#[derive(Debug, thiserror::Error)] +pub enum IndexConfigError { + #[error("Failed to read config file '{path}'")] + ReadFile { + path: String, + #[source] + source: std::io::Error, + }, + + #[error("Failed to parse config file '{path}'")] + ParseFile { + path: String, + #[source] + source: toml::de::Error, + }, + + #[error("Invalid index name '{name}': {source}")] + InvalidIndexName { + name: String, + #[source] + source: sql::ValidateIdentifierError, + }, + + #[error("Invalid column name '{column}' in index '{name}': {source}")] + InvalidColumnName { + name: String, + column: String, + #[source] + source: sql::ValidateIdentifierError, + }, + + #[error("Invalid INCLUDE column name '{column}' in index '{name}': {source}")] + InvalidIncludeColumnName { + name: String, + column: String, + #[source] + source: sql::ValidateIdentifierError, + }, + + #[error("Invalid table name '{table}' in index '{name}': {source}")] + InvalidTableName { + name: String, + table: String, + #[source] + source: sql::ValidateIdentifierError, + }, + + #[error("Index '{name}' has no columns")] + EmptyColumns { name: String }, + + #[error("Invalid WHERE clause in index '{name}': {reason}")] + InvalidWhereClause { name: String, reason: String }, + + #[error("Duplicate index name '{name}'")] + DuplicateName { name: String }, + + #[error("Index '{name}' references table '{table}' which is not in the sync table list")] + TableNotInSyncList { name: String, table: String }, +} + +/// Load and validate index configuration from a TOML file. +/// +/// Returns indexes grouped by table name for efficient lookup. +pub async fn load_config( + path: &Path, + sync_tables: &[String], +) -> Result>, IndexConfigError> { + let content = + tokio::fs::read_to_string(path) + .await + .map_err(|err| IndexConfigError::ReadFile { + path: path.display().to_string(), + source: err, + })?; + + let config: ConfigFile = + toml::from_str(&content).map_err(|err| IndexConfigError::ParseFile { + path: path.display().to_string(), + source: err, + })?; + + validate_indexes(&config.index, sync_tables)?; + + let mut by_table: HashMap> = HashMap::new(); + for idx in config.index { + by_table.entry(idx.table.clone()).or_default().push(idx); + } + + Ok(by_table) +} + +/// Validate all index configurations. +fn validate_indexes( + indexes: &[IndexConfig], + sync_tables: &[String], +) -> Result<(), IndexConfigError> { + let mut seen_names = std::collections::HashSet::new(); + + for idx in indexes { + // Validate index name + sql::validate_identifier(&idx.name).map_err(|err| IndexConfigError::InvalidIndexName { + name: idx.name.clone(), + source: err, + })?; + + // Check for duplicate names + if !seen_names.insert(&idx.name) { + return Err(IndexConfigError::DuplicateName { + name: idx.name.clone(), + }); + } + + // Validate table name + sql::validate_identifier(&idx.table).map_err(|err| IndexConfigError::InvalidTableName { + name: idx.name.clone(), + table: idx.table.clone(), + source: err, + })?; + + // Check table is in the sync list + if !sync_tables.iter().any(|t| t == &idx.table) { + return Err(IndexConfigError::TableNotInSyncList { + name: idx.name.clone(), + table: idx.table.clone(), + }); + } + + // Validate columns + if idx.columns.is_empty() { + return Err(IndexConfigError::EmptyColumns { + name: idx.name.clone(), + }); + } + + for col in &idx.columns { + sql::validate_identifier(&col.name).map_err(|err| { + IndexConfigError::InvalidColumnName { + name: idx.name.clone(), + column: col.name.clone(), + source: err, + } + })?; + } + + // Validate INCLUDE columns + for col in &idx.include { + sql::validate_identifier(col).map_err(|err| { + IndexConfigError::InvalidIncludeColumnName { + name: idx.name.clone(), + column: col.clone(), + source: err, + } + })?; + } + + // Validate WHERE clause if present + if let Some(where_clause) = &idx.where_clause { + sql::check_where_clause_syntax(where_clause).map_err(|reason| { + IndexConfigError::InvalidWhereClause { + name: idx.name.clone(), + reason, + } + })?; + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn col(name: &str) -> ColumnSpec { + ColumnSpec { + name: name.to_string(), + order: None, + } + } + + fn col_desc(name: &str) -> ColumnSpec { + ColumnSpec { + name: name.to_string(), + order: Some(SortOrder::Desc), + } + } + + fn idx(name: &str, table: &str, columns: Vec) -> IndexConfig { + IndexConfig { + table: table.to_string(), + name: name.to_string(), + columns, + method: IndexMethod::Btree, + unique: false, + include: vec![], + where_clause: None, + } + } + + #[test] + fn test_parse_minimal_index() { + let toml = r#" +[[index]] +table = "transfers" +name = "sender_idx" +columns = ["sender"] +"#; + let config: ConfigFile = toml::from_str(toml).unwrap(); + assert_eq!(config.index.len(), 1); + assert_eq!(config.index[0].name, "sender_idx"); + assert_eq!(config.index[0].columns, vec![col("sender")]); + assert_eq!(config.index[0].method, IndexMethod::Btree); + assert!(!config.index[0].unique); + assert!(config.index[0].include.is_empty()); + assert!(config.index[0].where_clause.is_none()); + } + + #[test] + fn test_parse_full_index() { + let toml = r#" +[[index]] +table = "transfers" +name = "large_transfers" +columns = ["sender", "block_number desc"] +method = "btree" +unique = true +include = ["value"] +where = "amount > 1000" +"#; + let config: ConfigFile = toml::from_str(toml).unwrap(); + let idx = &config.index[0]; + assert_eq!(idx.columns, vec![col("sender"), col_desc("block_number")]); + assert!(idx.unique); + assert_eq!(idx.method, IndexMethod::Btree); + assert_eq!(idx.include, vec!["value"]); + assert_eq!(idx.where_clause.as_deref(), Some("amount > 1000")); + } + + #[test] + fn test_parse_sort_orders() { + let toml = r#" +[[index]] +table = "t" +name = "idx" +columns = ["a asc", "b desc", "c"] +"#; + let config: ConfigFile = toml::from_str(toml).unwrap(); + assert_eq!( + config.index[0].columns, + vec![ + ColumnSpec { + name: "a".to_string(), + order: Some(SortOrder::Asc) + }, + ColumnSpec { + name: "b".to_string(), + order: Some(SortOrder::Desc) + }, + ColumnSpec { + name: "c".to_string(), + order: None + }, + ] + ); + } + + #[test] + fn test_parse_all_methods() { + for method in ["btree", "hash", "gin", "gist", "brin"] { + let toml = format!( + r#" +[[index]] +table = "t" +name = "idx" +columns = ["c"] +method = "{method}" +"# + ); + let config: ConfigFile = toml::from_str(&toml).unwrap(); + assert_eq!(config.index[0].method.as_sql(), method); + } + } + + #[test] + fn test_empty_config() { + let config: ConfigFile = toml::from_str("").unwrap(); + assert!(config.index.is_empty()); + } + + #[test] + fn test_validate_rejects_duplicate_names() { + let indexes = vec![ + idx("idx", "t", vec![col("a")]), + idx("idx", "t", vec![col("b")]), + ]; + let err = validate_indexes(&indexes, &["t".to_string()]).unwrap_err(); + assert!(matches!(err, IndexConfigError::DuplicateName { .. })); + } + + #[test] + fn test_validate_rejects_empty_columns() { + let indexes = vec![idx("idx", "t", vec![])]; + let err = validate_indexes(&indexes, &["t".to_string()]).unwrap_err(); + assert!(matches!(err, IndexConfigError::EmptyColumns { .. })); + } + + #[test] + fn test_validate_rejects_table_not_in_sync_list() { + let indexes = vec![idx("idx", "other", vec![col("a")])]; + let err = validate_indexes(&indexes, &["transfers".to_string()]).unwrap_err(); + assert!(matches!(err, IndexConfigError::TableNotInSyncList { .. })); + } + + #[test] + fn test_validate_rejects_invalid_index_name() { + let indexes = vec![idx("bad name", "t", vec![col("a")])]; + let err = validate_indexes(&indexes, &["t".to_string()]).unwrap_err(); + assert!(matches!(err, IndexConfigError::InvalidIndexName { .. })); + } +} diff --git a/crates/bin/ampsync/src/lib.rs b/crates/bin/ampsync/src/lib.rs index 6d4b43e0e..cd5031ca3 100644 --- a/crates/bin/ampsync/src/lib.rs +++ b/crates/bin/ampsync/src/lib.rs @@ -4,6 +4,7 @@ pub mod commands; pub mod config; pub mod engine; pub mod health; +pub mod index; pub mod manager; pub mod sql; pub mod task; diff --git a/crates/bin/ampsync/src/manager.rs b/crates/bin/ampsync/src/manager.rs index 1ce581d04..362e84bcb 100644 --- a/crates/bin/ampsync/src/manager.rs +++ b/crates/bin/ampsync/src/manager.rs @@ -4,7 +4,7 @@ //! managing restart logic with exponential backoff, and coordinating graceful //! shutdown of all tasks. -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use amp_client::AmpClient; use datasets_common::reference::Reference; @@ -17,6 +17,7 @@ use tracing::{error, info, warn}; use crate::{ config::{SyncConfig, TableMapping}, engine::Engine, + index::IndexConfig, task::{StreamTask, StreamTaskConfig}, }; @@ -58,6 +59,7 @@ impl StreamManager { engine: Engine, client: AmpClient, pool: PgPool, + indexes_by_table: &HashMap>, ) -> Self { let shutdown = CancellationToken::new(); @@ -78,6 +80,10 @@ impl StreamManager { client: client.clone(), pool: pool.clone(), retention: config.retention_blocks, + indexes: indexes_by_table + .get(&mapping.source) + .cloned() + .unwrap_or_default(), shutdown: shutdown.clone(), }; diff --git a/crates/bin/ampsync/src/sql.rs b/crates/bin/ampsync/src/sql.rs index 3dafe10bf..148dd8eaf 100644 --- a/crates/bin/ampsync/src/sql.rs +++ b/crates/bin/ampsync/src/sql.rs @@ -208,6 +208,102 @@ pub fn delete_between(table_name: &str, column: &str) -> String { ) } +/// Check that a string is syntactically valid SQL for use in a WHERE clause. +/// +/// Uses sqlparser to verify the clause parses as a valid expression within a +/// SELECT statement. This is a **syntax check only** — it does not prevent +/// subqueries, function calls, or other valid-but-unexpected SQL constructs. +/// The config file is operator-controlled (same trust level as database +/// credentials), so syntax validation is sufficient. +/// +/// # Example +/// ```ignore +/// assert!(check_where_clause_syntax("amount > 1000").is_ok()); +/// assert!(check_where_clause_syntax("status = 'active' AND amount > 0").is_ok()); +/// assert!(check_where_clause_syntax("; DROP TABLE users").is_err()); +/// ``` +pub fn check_where_clause_syntax(clause: &str) -> Result<(), String> { + if clause.trim().is_empty() { + return Err("WHERE clause cannot be empty".to_string()); + } + + // Try to parse as a WHERE clause in a SELECT statement + let sql = format!("SELECT 1 WHERE {}", clause); + let dialect = PostgreSqlDialect {}; + + match Parser::parse_sql(&dialect, &sql) { + Ok(statements) => { + if statements.len() != 1 { + return Err("WHERE clause must not contain multiple statements".to_string()); + } + Ok(()) + } + Err(e) => Err(format!("Failed to parse: {}", e)), + } +} + +/// Safely format a CREATE INDEX IF NOT EXISTS statement. +/// +/// All identifiers (index name, table name, column names) are quoted. +/// Supports index method, uniqueness, column sort order, INCLUDE columns, +/// and partial index (WHERE clause). +/// +/// `columns` is a slice of `(column_name, optional_sort_direction)` pairs, +/// where sort direction is a SQL keyword like `"ASC"` or `"DESC"`. +/// +/// **Safety**: Assumes all identifiers have been validated via [`validate_identifier`] +/// and the WHERE clause (if any) via [`check_where_clause_syntax`]. +/// The WHERE clause is interpolated as raw SQL after sqlparser validation. +/// This is acceptable because the config file is operator-controlled (same trust +/// level as database credentials). +pub fn create_index( + index_name: &str, + table_name: &str, + columns: &[(&str, Option<&str>)], + method: &str, + unique: bool, + include: &[String], + where_clause: Option<&str>, +) -> String { + let quoted_index = quote_identifier(index_name); + let quoted_table = quote_identifier(table_name); + let quoted_columns: Vec = columns + .iter() + .map(|(name, order)| { + let quoted = quote_identifier(name).to_string(); + match order { + Some(dir) => format!("{} {}", quoted, dir), + None => quoted, + } + }) + .collect(); + + let unique_str = if unique { "UNIQUE " } else { "" }; + + let mut sql = format!( + "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({})", + unique_str, + quoted_index, + quoted_table, + method, + quoted_columns.join(", "), + ); + + if !include.is_empty() { + let quoted_include: Vec = include + .iter() + .map(|c| quote_identifier(c).to_string()) + .collect(); + sql.push_str(&format!(" INCLUDE ({})", quoted_include.join(", "))); + } + + if let Some(clause) = where_clause { + sql.push_str(&format!(" WHERE {}", clause)); + } + + sql +} + /// Quote a column name for safe use in SQL. /// /// This ensures column names (especially reserved keywords like "to", "from", "select") @@ -350,4 +446,112 @@ mod tests { assert!(sql.contains("INCLUDING DEFAULTS")); assert!(sql.contains("ON COMMIT DROP")); } + + #[test] + fn test_create_index_basic() { + let sql = create_index( + "idx_sender", + "transfers", + &[("sender", None)], + "btree", + false, + &[], + None, + ); + assert!(sql.contains("CREATE INDEX IF NOT EXISTS")); + assert!(!sql.contains("UNIQUE")); + assert!(sql.contains("USING btree")); + assert!(!sql.contains("INCLUDE")); + assert!(sql.contains("idx_sender") || sql.contains("\"idx_sender\"")); + assert!(sql.contains("transfers") || sql.contains("\"transfers\"")); + assert!(sql.contains("sender") || sql.contains("\"sender\"")); + } + + #[test] + fn test_create_index_unique_with_method() { + let sql = create_index( + "idx_account", + "balances", + &[("account", None)], + "hash", + true, + &[], + None, + ); + assert!(sql.contains("CREATE UNIQUE INDEX IF NOT EXISTS")); + assert!(sql.contains("USING hash")); + } + + #[test] + fn test_create_index_with_sort_order() { + let sql = create_index( + "idx_ts", + "transfers", + &[("seller_address", None), ("timestamp", Some("DESC"))], + "btree", + false, + &[], + None, + ); + assert!(sql.contains("DESC")); + assert!(sql.contains("seller_address") || sql.contains("\"seller_address\"")); + } + + #[test] + fn test_create_index_with_include() { + let sql = create_index( + "idx_covering", + "transfers", + &[("sender", None)], + "btree", + false, + &["buyer_address".to_string(), "value_usdc".to_string()], + None, + ); + assert!(sql.contains("INCLUDE")); + assert!(sql.contains("buyer_address") || sql.contains("\"buyer_address\"")); + assert!(sql.contains("value_usdc") || sql.contains("\"value_usdc\"")); + } + + #[test] + fn test_create_index_composite_columns() { + let sql = create_index( + "idx_multi", + "transfers", + &[("sender", None), ("block_number", None)], + "btree", + false, + &[], + None, + ); + assert!(sql.contains("sender")); + assert!(sql.contains("block_number")); + } + + #[test] + fn test_create_index_with_where_clause() { + let sql = create_index( + "idx_large", + "transfers", + &[("sender", None)], + "btree", + false, + &[], + Some("amount > 1000"), + ); + assert!(sql.contains("WHERE amount > 1000")); + } + + #[test] + fn test_check_where_clause_syntax_valid() { + assert!(check_where_clause_syntax("amount > 1000").is_ok()); + assert!(check_where_clause_syntax("status = 'active' AND amount > 0").is_ok()); + assert!(check_where_clause_syntax("id IS NOT NULL").is_ok()); + } + + #[test] + fn test_check_where_clause_syntax_empty() { + assert!(check_where_clause_syntax("").is_err()); + assert!(check_where_clause_syntax(" ").is_err()); + } } diff --git a/crates/bin/ampsync/src/task.rs b/crates/bin/ampsync/src/task.rs index a0ccebd5c..f7f40e79c 100644 --- a/crates/bin/ampsync/src/task.rs +++ b/crates/bin/ampsync/src/task.rs @@ -9,7 +9,7 @@ use sqlx::PgPool; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; -use crate::engine::Engine; +use crate::{engine::Engine, index::IndexConfig}; /// Configuration for creating a [`StreamTask`]. #[derive(Clone)] @@ -23,6 +23,7 @@ pub struct StreamTaskConfig { pub client: AmpClient, pub pool: PgPool, pub retention: BlockNum, + pub indexes: Vec, pub shutdown: CancellationToken, } @@ -152,6 +153,7 @@ impl StreamTask { client, pool, retention, + indexes, shutdown, } = config; @@ -199,10 +201,10 @@ impl StreamTask { "retrieved_schema_from_stream" ); - // Create destination table using stream schema (idempotent via IF NOT EXISTS) + // Create destination table using stream schema (with indexes, idempotent via IF NOT EXISTS) info!(destination_table = %destination_table, "creating_table"); engine - .create_table(&destination_table, schema) + .create_table(&destination_table, schema, &indexes) .await .map_err(|err| StreamTaskError::CreateTable { table_name: destination_table.clone(), From d8338a42bacfc8dff850212147a17ef9432eb376 Mon Sep 17 00:00:00 2001 From: Matias Date: Thu, 19 Mar 2026 11:03:20 -0300 Subject: [PATCH 2/2] fix(ampsync): namespace index names with destination table to prevent silent collisions PostgreSQL index names are schema-global, so the same config used with different --table-suffix/--table-version values would silently no-op on CREATE INDEX IF NOT EXISTS. Index names are now auto-prefixed as {destination_table}_{config_name}. Also validates index method parameter in sql::create_index, adds tests for error paths (nonexistent columns, name too long, multi-statement WHERE injection), and documents additive-only index lifecycle in README. --- crates/bin/ampsync/README.md | 21 +++- crates/bin/ampsync/src/engine.rs | 163 ++++++++++++++++++++++++++++--- crates/bin/ampsync/src/sql.rs | 20 ++++ 3 files changed, 189 insertions(+), 15 deletions(-) diff --git a/crates/bin/ampsync/README.md b/crates/bin/ampsync/README.md index c252807d8..e1f5ce9b5 100644 --- a/crates/bin/ampsync/README.md +++ b/crates/bin/ampsync/README.md @@ -102,6 +102,9 @@ passed with `--config` (or `AMPSYNC_CONFIG` env var). Indexes are created atomically alongside tables using `IF NOT EXISTS`, so adding new indexes and restarting is safe and idempotent. +**Additive only**: Removing an index from the config file does **not** drop it +from PostgreSQL. To remove an index, run `DROP INDEX` manually. + ### Config file format Each `[[index]]` entry defines one index: @@ -109,7 +112,7 @@ Each `[[index]]` entry defines one index: ```toml [[index]] table = "transfers" # required — must be in --tables list -name = "idx_transfers_sender" # required — valid SQL identifier +name = "idx_sender" # required — valid SQL identifier columns = ["sender", "block_number desc"] # required — key columns, optional sort method = "btree" # optional — btree (default), hash, gin, gist, brin unique = false # optional — default false @@ -117,26 +120,36 @@ include = ["amount"] # optional — INCLUDE columns (cov where = "amount > 1000" # optional — partial index WHERE clause ``` +### Index naming + +The actual PostgreSQL index name is `{destination_table}_{name}`. This +automatic namespacing prevents silent collisions when the same config is used +with different `--table-suffix` or `--table-version` values (PostgreSQL index +names are schema-global, not scoped to a table). + +For example, with `--tables transfers --table-suffix green` and `name = "idx_sender"`, +the PostgreSQL index will be named `transfers_green_idx_sender`. + ### Example ```toml # Covering index for lookups by seller with timestamp ordering [[index]] table = "usdc_transfers" -name = "idx_usdc_transfers_seller_timestamp_covering" +name = "idx_seller_ts" columns = ["seller_address", "timestamp desc"] include = ["buyer_address", "value_usdc"] # Simple index on buyer address [[index]] table = "usdc_transfers" -name = "idx_usdc_transfers_buyer_address" +name = "idx_buyer" columns = ["buyer_address"] # Partial index for large transfers only [[index]] table = "usdc_transfers" -name = "idx_usdc_transfers_large" +name = "idx_large" columns = ["seller_address"] where = "value_usdc > 10000" ``` diff --git a/crates/bin/ampsync/src/engine.rs b/crates/bin/ampsync/src/engine.rs index aaa190a61..1d1820132 100644 --- a/crates/bin/ampsync/src/engine.rs +++ b/crates/bin/ampsync/src/engine.rs @@ -203,6 +203,16 @@ pub enum CreateTableError { column: String, }, + /// Namespaced index name exceeds PostgreSQL's 63-byte identifier limit + #[error( + "Index name '{original_name}' combined with table '{table_name}' produces '{pg_name}' which exceeds PostgreSQL's 63-byte limit" + )] + IndexNameTooLong { + table_name: String, + original_name: String, + pg_name: String, + }, + /// Failed to set up table (CREATE TABLE + CREATE INDEX in transaction) #[error("Failed to set up table '{table_name}'")] SetupTable { @@ -485,17 +495,34 @@ impl Engine { } } + // Build namespaced index names: "{table_name}_{config_name}" + // This prevents silent collisions when the same config is used with + // different --table-suffix or --table-version values, since PostgreSQL + // index names are schema-global (not scoped to a table). + let mut namespaced: Vec<(String, &crate::index::IndexConfig)> = Vec::new(); + for idx in indexes { + let pg_name = format!("{}_{}", table_name, idx.name); + if pg_name.len() > 63 { + return Err(CreateTableError::IndexNameTooLong { + table_name: table_name_owned, + original_name: idx.name.clone(), + pg_name, + }); + } + namespaced.push((pg_name, idx)); + } + // Build CREATE INDEX statements - let index_statements: Vec = indexes + let index_statements: Vec = namespaced .iter() - .map(|idx| { + .map(|(pg_name, idx)| { let cols: Vec<(&str, Option<&str>)> = idx .columns .iter() .map(|c| (c.name.as_str(), c.order.map(|o| o.as_sql()))) .collect(); sql::create_index( - &idx.name, + pg_name, table_name, &cols, idx.method.as_sql(), @@ -506,10 +533,11 @@ impl Engine { }) .collect(); - for idx in indexes { + for (pg_name, idx) in &namespaced { tracing::info!( table = table_name, - index = idx.name.as_str(), + pg_index_name = pg_name.as_str(), + config_name = idx.name.as_str(), columns = ?idx.columns, method = idx.method.as_sql(), unique = idx.unique, @@ -953,18 +981,19 @@ mod tests { .await .unwrap(); + // Index names are auto-prefixed with the destination table name let names: Vec<&str> = index_names.iter().map(|r| r.0.as_str()).collect(); assert!( - names.contains(&"idx_sender"), - "missing idx_sender: {names:?}" + names.contains(&"transfers_idx_sender"), + "missing transfers_idx_sender: {names:?}" ); assert!( - names.contains(&"idx_sender_block"), - "missing idx_sender_block: {names:?}" + names.contains(&"transfers_idx_sender_block"), + "missing transfers_idx_sender_block: {names:?}" ); assert!( - names.contains(&"idx_large_transfers"), - "missing idx_large_transfers: {names:?}" + names.contains(&"transfers_idx_large_transfers"), + "missing transfers_idx_large_transfers: {names:?}" ); // Verify idempotency: calling again with same indexes succeeds @@ -980,6 +1009,118 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn test_create_table_rejects_index_with_nonexistent_column() { + use crate::index::{ColumnSpec, IndexConfig, IndexMethod}; + + let _pg = pgtemp::PgTempDB::new(); + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&_pg.connection_uri()) + .await + .unwrap(); + let engine = Engine::new(pool); + + let schema = Arc::new(Schema::new(vec![Field::new( + "sender", + DataType::Utf8, + false, + )])); + + let indexes = vec![IndexConfig { + table: "t".to_string(), + name: "idx_bad".to_string(), + columns: vec![ColumnSpec { + name: "nonexistent".to_string(), + order: None, + }], + method: IndexMethod::Btree, + unique: false, + include: vec![], + where_clause: None, + }]; + + let err = engine + .create_table("t", schema, &indexes) + .await + .unwrap_err(); + assert!(matches!(err, CreateTableError::IndexColumnNotFound { .. })); + } + + #[tokio::test] + async fn test_create_table_rejects_index_with_nonexistent_include_column() { + use crate::index::{ColumnSpec, IndexConfig, IndexMethod}; + + let _pg = pgtemp::PgTempDB::new(); + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&_pg.connection_uri()) + .await + .unwrap(); + let engine = Engine::new(pool); + + let schema = Arc::new(Schema::new(vec![Field::new( + "sender", + DataType::Utf8, + false, + )])); + + let indexes = vec![IndexConfig { + table: "t".to_string(), + name: "idx_bad_include".to_string(), + columns: vec![ColumnSpec { + name: "sender".to_string(), + order: None, + }], + method: IndexMethod::Btree, + unique: false, + include: vec!["missing_col".to_string()], + where_clause: None, + }]; + + let err = engine + .create_table("t", schema, &indexes) + .await + .unwrap_err(); + assert!(matches!(err, CreateTableError::IndexColumnNotFound { .. })); + } + + #[tokio::test] + async fn test_create_table_rejects_index_name_too_long() { + use crate::index::{ColumnSpec, IndexConfig, IndexMethod}; + + let _pg = pgtemp::PgTempDB::new(); + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&_pg.connection_uri()) + .await + .unwrap(); + let engine = Engine::new(pool); + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + + // Table name + "_" + index name must exceed 63 bytes + let long_name = "a".repeat(60); + let indexes = vec![IndexConfig { + table: "long_table".to_string(), + name: long_name, + columns: vec![ColumnSpec { + name: "id".to_string(), + order: None, + }], + method: IndexMethod::Btree, + unique: false, + include: vec![], + where_clause: None, + }]; + + let err = engine + .create_table("long_table", schema, &indexes) + .await + .unwrap_err(); + assert!(matches!(err, CreateTableError::IndexNameTooLong { .. })); + } + #[tokio::test] async fn test_insert_and_delete() { let _pg = pgtemp::PgTempDB::new(); diff --git a/crates/bin/ampsync/src/sql.rs b/crates/bin/ampsync/src/sql.rs index 148dd8eaf..afb7c6e16 100644 --- a/crates/bin/ampsync/src/sql.rs +++ b/crates/bin/ampsync/src/sql.rs @@ -256,6 +256,9 @@ pub fn check_where_clause_syntax(clause: &str) -> Result<(), String> { /// The WHERE clause is interpolated as raw SQL after sqlparser validation. /// This is acceptable because the config file is operator-controlled (same trust /// level as database credentials). +/// Valid PostgreSQL index methods accepted by [`create_index`]. +const VALID_INDEX_METHODS: &[&str] = &["btree", "hash", "gin", "gist", "brin"]; + pub fn create_index( index_name: &str, table_name: &str, @@ -265,6 +268,11 @@ pub fn create_index( include: &[String], where_clause: Option<&str>, ) -> String { + assert!( + VALID_INDEX_METHODS.contains(&method), + "invalid index method '{method}', expected one of: {VALID_INDEX_METHODS:?}" + ); + let quoted_index = quote_identifier(index_name); let quoted_table = quote_identifier(table_name); let quoted_columns: Vec = columns @@ -554,4 +562,16 @@ mod tests { assert!(check_where_clause_syntax("").is_err()); assert!(check_where_clause_syntax(" ").is_err()); } + + #[test] + fn test_check_where_clause_syntax_rejects_multi_statement() { + assert!(check_where_clause_syntax("1=1; DROP TABLE users").is_err()); + assert!(check_where_clause_syntax("true; DELETE FROM users").is_err()); + } + + #[test] + #[should_panic(expected = "invalid index method")] + fn test_create_index_rejects_invalid_method() { + create_index("idx", "t", &[("c", None)], "evil", false, &[], None); + } }