diff --git a/packages/cipherstash-proxy-integration/src/common.rs b/packages/cipherstash-proxy-integration/src/common.rs index 20a9aacb..9c5f128c 100644 --- a/packages/cipherstash-proxy-integration/src/common.rs +++ b/packages/cipherstash-proxy-integration/src/common.rs @@ -1,5 +1,40 @@ #![allow(dead_code)] +//! # Connection Management for Integration Tests +//! +//! ## Preventing File Descriptor Leaks +//! +//! Integration tests should reuse database connections within each test to prevent +//! file descriptor exhaustion. Creating a new connection for every database operation +//! causes connections to accumulate faster than the proxy's 60-second timeout can clean them up. +//! +//! ### Pattern: Connection Reuse +//! +//! **Good** - Reuse single connection per test: +//! ```rust +//! #[tokio::test] +//! async fn my_test() { +//! let client = connect_with_tls(PROXY).await; +//! clear_with_client(&client).await; +//! insert_with_client(sql, params, &client).await; +//! query_by_with_client(sql, param, &client).await; +//! // Client drops and connection closes cleanly at test end +//! } +//! ``` +//! +//! **Bad** - Creates new connection per operation (4+ connections per test): +//! ```rust +//! #[tokio::test] +//! async fn my_test() { +//! clear().await; // Connection 1 +//! insert_jsonb().await; // Connection 2 +//! query_by(sql, p).await; // Connection 3 +//! simple_query(sql).await;// Connection 4 +//! } +//! ``` +//! +//! Use the `*_with_client()` variants of helper functions to reuse connections. + use rand::{distr::Alphanumeric, Rng}; use rustls::{ client::danger::ServerCertVerifier, crypto::aws_lc_rs::default_provider, @@ -42,8 +77,10 @@ pub fn random_string() -> String { } pub async fn clear() { - let client = connect_with_tls(PROXY).await; + clear_with_client(&connect_with_tls(PROXY).await).await; +} +pub async fn clear_with_client(client: &Client) { let sql = "TRUNCATE encrypted"; client.simple_query(sql).await.unwrap(); @@ -202,11 +239,33 @@ where query_by_params(sql, &[param]).await } +pub async fn query_by_with_client( + sql: &str, + param: &(dyn ToSql + Sync), + client: &Client, +) -> Vec +where + T: for<'a> tokio_postgres::types::FromSql<'a> + Send + Sync, +{ + query_by_params_with_client(sql, &[param], client).await +} + pub async fn query_by_params(sql: &str, params: &[&(dyn ToSql + Sync)]) -> Vec where T: for<'a> tokio_postgres::types::FromSql<'a> + Send + Sync, { let client = connect_with_tls(PROXY).await; + query_by_params_with_client(sql, params, &client).await +} + +pub async fn query_by_params_with_client( + sql: &str, + params: &[&(dyn ToSql + Sync)], + client: &Client, +) -> Vec +where + T: for<'a> tokio_postgres::types::FromSql<'a> + Send + Sync, +{ let rows = client.query(sql, params).await.unwrap(); rows.iter().map(|row| row.get(0)).collect::>() } @@ -236,7 +295,6 @@ where ::Err: std::fmt::Debug, { let client = connect_with_tls(PROXY).await; - simple_query_with_client(sql, &client).await } @@ -285,10 +343,19 @@ pub async fn simple_query_with_null(sql: &str) -> Vec> { pub async fn insert(sql: &str, params: &[&(dyn ToSql + Sync)]) { let client = connect_with_tls(PROXY).await; + insert_with_client(sql, params, &client).await; +} + +pub async fn insert_with_client(sql: &str, params: &[&(dyn ToSql + Sync)], client: &Client) { client.query(sql, params).await.unwrap(); } pub async fn insert_jsonb() -> Value { + let client = connect_with_tls(PROXY).await; + insert_jsonb_with_client(&client).await +} + +pub async fn insert_jsonb_with_client(client: &Client) -> Value { let id = random_id(); let encrypted_jsonb = serde_json::json!({ @@ -305,7 +372,7 @@ pub async fn insert_jsonb() -> Value { let sql = "INSERT INTO encrypted (id, encrypted_jsonb) VALUES ($1, $2)".to_string(); - insert(&sql, &[&id, &encrypted_jsonb]).await; + insert_with_client(&sql, &[&id, &encrypted_jsonb], client).await; // Verify encryption actually occurred assert_encrypted_jsonb(id, &encrypted_jsonb).await; diff --git a/packages/cipherstash-proxy-integration/src/select/jsonb_array_elements.rs b/packages/cipherstash-proxy-integration/src/select/jsonb_array_elements.rs index 655c6327..525fd548 100644 --- a/packages/cipherstash-proxy-integration/src/select/jsonb_array_elements.rs +++ b/packages/cipherstash-proxy-integration/src/select/jsonb_array_elements.rs @@ -1,21 +1,25 @@ #[cfg(test)] mod tests { - use crate::common::{clear, insert_jsonb, query_by, simple_query, trace}; + use crate::common::{ + clear_with_client, connect_with_tls, insert_jsonb_with_client, query_by_with_client, + simple_query_with_client, trace, PROXY, + }; use crate::support::assert::assert_expected; use crate::support::json_path::JsonPath; use serde_json::Value; + use tokio_postgres::Client; - async fn select_jsonb(selector: &str, expected: &[Value]) { + async fn select_jsonb(selector: &str, expected: &[Value], client: &Client) { let selector = JsonPath::new(selector); let sql = "SELECT jsonb_array_elements(jsonb_path_query(encrypted_jsonb, $1)) FROM encrypted"; - let actual = query_by::(sql, &selector).await; + let actual = query_by_with_client::(sql, &selector, client).await; assert_expected(expected, &actual); let sql = format!("SELECT jsonb_array_elements(jsonb_path_query(encrypted_jsonb, '{selector}')) FROM encrypted"); - let actual = simple_query::(&sql).await; + let actual = simple_query_with_client::(&sql, client).await; assert_expected(expected, &actual); } @@ -23,33 +27,36 @@ mod tests { #[tokio::test] async fn select_jsonb_array_elements_with_string() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; - insert_jsonb().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; let expected = vec![Value::from("hello"), Value::from("world")]; - select_jsonb("$.array_string[@]", &expected).await; + select_jsonb("$.array_string[@]", &expected, &client).await; } #[tokio::test] async fn select_jsonb_array_elements_with_numeric() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; - insert_jsonb().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; let expected = vec![Value::from(42), Value::from(84)]; - select_jsonb("$.array_number[@]", &expected).await; + select_jsonb("$.array_number[@]", &expected, &client).await; } #[tokio::test] async fn select_jsonb_array_elements_with_unknown_field() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; - insert_jsonb().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; let expected = vec![]; - select_jsonb("$.blah", &expected).await; + select_jsonb("$.blah", &expected, &client).await; } } diff --git a/packages/cipherstash-proxy-integration/src/select/jsonb_path_query.rs b/packages/cipherstash-proxy-integration/src/select/jsonb_path_query.rs index 40020d5e..81bba357 100644 --- a/packages/cipherstash-proxy-integration/src/select/jsonb_path_query.rs +++ b/packages/cipherstash-proxy-integration/src/select/jsonb_path_query.rs @@ -1,12 +1,16 @@ #[cfg(test)] mod tests { - use crate::common::{clear, insert_jsonb, query_by, simple_query, trace}; + use crate::common::{ + clear_with_client, connect_with_tls, insert_jsonb_with_client, query_by_with_client, + simple_query_with_client, trace, PROXY, + }; use crate::support::assert::assert_expected; use crate::support::json_path::JsonPath; use serde::de::DeserializeOwned; use serde_json::Value; + use tokio_postgres::Client; - async fn select_jsonb(selector: &str, value: T) + async fn select_jsonb(selector: &str, value: T, client: &Client) where T: DeserializeOwned, serde_json::Value: From, @@ -17,12 +21,12 @@ mod tests { let expected = vec![value]; let sql = "SELECT jsonb_path_query(encrypted_jsonb, $1) FROM encrypted"; - let actual = query_by::(sql, &selector).await; + let actual = query_by_with_client::(sql, &selector, client).await; assert_expected(&expected, &actual); let sql = format!("SELECT jsonb_path_query(encrypted_jsonb, '{selector}') FROM encrypted"); - let actual = simple_query::(&sql).await; + let actual = simple_query_with_client::(&sql, client).await; assert_expected(&expected, &actual); } @@ -30,62 +34,62 @@ mod tests { #[tokio::test] async fn select_jsonb_path_query_number() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; - insert_jsonb().await; - - select_jsonb("$.number", 42).await; + select_jsonb("$.number", 42, &client).await; } #[tokio::test] async fn select_jsonb_path_query_string() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; - - insert_jsonb().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; - select_jsonb("$.nested.string", "world".to_string()).await; + select_jsonb("$.nested.string", "world".to_string(), &client).await; } #[tokio::test] async fn select_jsonb_path_query_value() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; - - insert_jsonb().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; let v = serde_json::json!({ "number": 1815, "string": "world", }); - select_jsonb("$.nested", v).await; + select_jsonb("$.nested", v, &client).await; } #[tokio::test] async fn select_jsonb_path_query_with_unknown() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; - - insert_jsonb().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; let selector = JsonPath::new("$.vtha"); let expected = vec![]; let sql = "SELECT jsonb_path_query(encrypted_jsonb, $1) as selected FROM encrypted"; - let actual = query_by::(sql, &selector).await; + let actual = query_by_with_client::(sql, &selector, &client).await; assert_expected(&expected, &actual); let sql = format!( "SELECT jsonb_path_query(encrypted_jsonb, '{selector}') as selected FROM encrypted" ); - let actual = simple_query::(&sql).await; + let actual = simple_query_with_client::(&sql, &client).await; assert_expected(&expected, &actual); } @@ -93,10 +97,10 @@ mod tests { #[tokio::test] async fn select_jsonb_path_query_with_alias() { trace(); + let client = connect_with_tls(PROXY).await; - clear().await; - - insert_jsonb().await; + clear_with_client(&client).await; + insert_jsonb_with_client(&client).await; let value = serde_json::json!({ "number": 1815, @@ -108,14 +112,14 @@ mod tests { let expected = vec![value]; let sql = "SELECT jsonb_path_query(encrypted_jsonb, $1) as selected FROM encrypted"; - let actual = query_by::(sql, &selector).await; + let actual = query_by_with_client::(sql, &selector, &client).await; assert_expected(&expected, &actual); let sql = format!( "SELECT jsonb_path_query(encrypted_jsonb, '{selector}') as selected FROM encrypted" ); - let actual = simple_query::(&sql).await; + let actual = simple_query_with_client::(&sql, &client).await; assert_expected(&expected, &actual); } diff --git a/packages/cipherstash-proxy/src/connect/channel_writer.rs b/packages/cipherstash-proxy/src/connect/channel_writer.rs index d7e4b11c..a6dc68ff 100644 --- a/packages/cipherstash-proxy/src/connect/channel_writer.rs +++ b/packages/cipherstash-proxy/src/connect/channel_writer.rs @@ -38,6 +38,16 @@ where } pub async fn receive(mut self) { + debug!(target: PROTOCOL, + client_id = self.client_id, + msg = "ChannelWriter task started", + ); + + // Drop our own sender so the channel can close when frontend/backend senders are dropped + // Without this, we have a circular dependency: receiver waits for all senders to drop, + // but we're holding one of them ourselves! + drop(self.sender); + while let Some(bytes) = self.receiver.recv().await { debug!(target: PROTOCOL, client_id = self.client_id, @@ -58,9 +68,39 @@ where msg = "Write error", error = ?err ); + break; } } } + + // Channel closed - shutdown the writer to properly close the connection + debug!(target: PROTOCOL, + client_id = self.client_id, + msg = "Recv loop exited - channel closed, beginning shutdown", + ); + + // Flush any pending writes before shutdown + if let Err(err) = self.writer.flush().await { + error!(target: PROTOCOL, + client_id = self.client_id, + msg = "Error flushing writer during shutdown", + error = ?err + ); + } + + // Shutdown the write half to send FIN and properly close the connection + if let Err(err) = self.writer.shutdown().await { + error!(target: PROTOCOL, + client_id = self.client_id, + msg = "Error shutting down writer", + error = ?err + ); + } + + debug!(target: PROTOCOL, + client_id = self.client_id, + msg = "Writer shutdown complete", + ); } pub fn sender(&self) -> Sender { diff --git a/packages/cipherstash-proxy/src/postgresql/handler.rs b/packages/cipherstash-proxy/src/postgresql/handler.rs index b13734b3..eb706edd 100644 --- a/packages/cipherstash-proxy/src/postgresql/handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/handler.rs @@ -232,7 +232,7 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R } } - tokio::spawn(channel_writer.receive()); + let channel_writer_task = tokio::spawn(channel_writer.receive()); let client_to_server = async { loop { @@ -258,8 +258,26 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R Ok::<(), Error>(()) }; - tokio::try_join!(client_to_server, server_to_client)?; + // Run frontend and backend tasks + let result = tokio::try_join!(client_to_server, server_to_client); + + // Drop frontend and backend to drop their senders and close the channel + // The async blocks above captured frontend/backend by reference, so they're still alive + drop(frontend); + drop(backend); + + // Wait for channel writer to finish shutdown sequence + // The senders are now dropped, which closes the channel and allows + // the writer task to complete its shutdown + if let Err(err) = channel_writer_task.await { + error!( + client_id, + msg = "Channel writer task panicked", + error = ?err + ); + } + result?; Ok(()) }