From 03b5356021c824e37c01c937c7481849d58e8a5f Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Thu, 5 Mar 2026 11:54:26 -0500 Subject: [PATCH] Register tasks in DurableBuilder rather than Durable client This enforces that no new tasks can be registered once a Durable client has been constructed. As a result, we can avoid using an RwLock around the TaskRegistry --- benches/checkpoint.rs | 20 +++-- benches/common/setup.rs | 20 ++--- benches/concurrency.rs | 28 +++++-- benches/throughput.rs | 11 ++- src/client.rs | 129 +++++++++++++---------------- src/lib.rs | 7 +- src/worker.rs | 4 +- tests/checkpoint_test.rs | 67 ++++++++++----- tests/concurrency_test.rs | 43 ++++++---- tests/crash_test.rs | 78 ++++++++++++------ tests/event_test.rs | 130 ++++++++++++++++++++--------- tests/execution_test.rs | 157 ++++++++++++++++++++++------------- tests/fanout_test.rs | 120 ++++++++++++++++++--------- tests/lease_test.rs | 59 ++++++++----- tests/lock_order_test.rs | 62 +++++++++----- tests/partition_test.rs | 27 +++--- tests/polling_test.rs | 12 ++- tests/queue_test.rs | 58 +++++++++---- tests/retry_test.rs | 51 ++++++++---- tests/spawn_test.rs | 169 ++++++++++++++++++++++++++------------ tests/telemetry_test.rs | 53 +++++++----- 21 files changed, 835 insertions(+), 470 deletions(-) diff --git a/benches/checkpoint.rs b/benches/checkpoint.rs index 6591512..b6e143d 100644 --- a/benches/checkpoint.rs +++ b/benches/checkpoint.rs @@ -30,8 +30,10 @@ fn bench_step_cache_miss(c: &mut Criterion) { let mut total_time = Duration::ZERO; for _ in 0..iters { - let ctx = BenchContext::new().await; - ctx.client.register::().await.unwrap(); + let ctx = BenchContext::with_builder(|b| { + b.register::().unwrap() + }) + .await; ctx.client .spawn::(MultiStepParams { num_steps }) @@ -82,8 +84,9 @@ fn bench_step_cache_hit(c: &mut Criterion) { let mut total_time = Duration::ZERO; for _ in 0..iters { - let ctx = BenchContext::new().await; - ctx.client.register::().await.unwrap(); + let ctx = BenchContext::with_builder(|b| { + b.register::().unwrap() + }).await; // First run to populate checkpoints let spawn_result = ctx @@ -170,11 +173,10 @@ fn bench_large_payload_checkpoint(c: &mut Criterion) { let mut total_time = Duration::ZERO; for _ in 0..iters { - let ctx = BenchContext::new().await; - ctx.client - .register::() - .await - .unwrap(); + let ctx = BenchContext::with_builder(|b| { + b.register::().unwrap() + }) + .await; ctx.client .spawn::(LargePayloadParams { payload_size }) diff --git a/benches/common/setup.rs b/benches/common/setup.rs index 7130e87..deaf6cd 100644 --- a/benches/common/setup.rs +++ b/benches/common/setup.rs @@ -17,9 +17,8 @@ pub struct BenchContext { } impl BenchContext { - /// Create a new benchmark context with a unique queue. - /// Uses DATABASE_URL environment variable (same as tests). - pub async fn new() -> Self { + /// Create a new benchmark context, allowing task registration on the builder. + pub async fn with_builder(f: impl FnOnce(DurableBuilder) -> DurableBuilder) -> Self { let database_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5436/test".to_string()); @@ -34,9 +33,11 @@ impl BenchContext { let counter = QUEUE_COUNTER.fetch_add(1, Ordering::SeqCst); let queue_name = format!("bench_{}", counter); - let client = DurableBuilder::new() + let builder = DurableBuilder::new() .pool(pool.clone()) - .queue_name(&queue_name) + .queue_name(&queue_name); + + let client = f(builder) .build() .await .expect("Failed to create Durable client"); @@ -53,16 +54,13 @@ impl BenchContext { } } - /// Create a new Durable client using the same pool and queue. - /// Useful for creating multiple workers. + /// Create a new DurableBuilder using the same pool and queue. + /// Useful for creating multiple workers with task registrations. #[allow(dead_code)] - pub async fn new_client(&self) -> Durable { + pub fn new_builder(&self) -> DurableBuilder { DurableBuilder::new() .pool(self.pool.clone()) .queue_name(&self.queue_name) - .build() - .await - .expect("Failed to create Durable client") } /// Clean up the queue after benchmark diff --git a/benches/concurrency.rs b/benches/concurrency.rs index 2637605..bd1022f 100644 --- a/benches/concurrency.rs +++ b/benches/concurrency.rs @@ -30,8 +30,9 @@ fn bench_concurrent_claims(c: &mut Criterion) { let mut total_time = Duration::ZERO; for _ in 0..iters { - let ctx = BenchContext::new().await; - ctx.client.register::().await.unwrap(); + let ctx = + BenchContext::with_builder(|b| b.register::().unwrap()) + .await; // Spawn all tasks for i in 0..num_tasks { @@ -49,8 +50,13 @@ fn bench_concurrent_claims(c: &mut Criterion) { // Spawn multiple worker processes for _ in 0..num_workers { - let client = ctx.new_client().await; - client.register::().await.unwrap(); + let client = ctx + .new_builder() + .register::() + .unwrap() + .build() + .await + .unwrap(); let barrier = barrier.clone(); let handle = tokio::spawn(async move { @@ -123,8 +129,9 @@ fn bench_claim_latency_distribution(c: &mut Criterion) { let mut total_time = Duration::ZERO; for _ in 0..iters { - let ctx = BenchContext::new().await; - ctx.client.register::().await.unwrap(); + let ctx = + BenchContext::with_builder(|b| b.register::().unwrap()) + .await; // Spawn tasks for i in 0..num_tasks { @@ -140,8 +147,13 @@ fn bench_claim_latency_distribution(c: &mut Criterion) { let start = std::time::Instant::now(); for _ in 0..num_workers { - let client = ctx.new_client().await; - client.register::().await.unwrap(); + let client = ctx + .new_builder() + .register::() + .unwrap() + .build() + .await + .unwrap(); let barrier = barrier.clone(); let handle = tokio::spawn(async move { diff --git a/benches/throughput.rs b/benches/throughput.rs index ef7d9a3..334396d 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -17,8 +17,7 @@ fn bench_spawn_latency(c: &mut Criterion) { group.bench_function("single_spawn", |b| { b.iter_custom(|iters| { rt.block_on(async { - let ctx = BenchContext::new().await; - ctx.client.register::().await.unwrap(); + let ctx = BenchContext::with_builder(|b| b.register::().unwrap()).await; let start = std::time::Instant::now(); for _ in 0..iters { @@ -57,8 +56,9 @@ fn bench_task_throughput(c: &mut Criterion) { let mut total_time = Duration::ZERO; for _ in 0..iters { - let ctx = BenchContext::new().await; - ctx.client.register::().await.unwrap(); + let ctx = + BenchContext::with_builder(|b| b.register::().unwrap()) + .await; // Spawn all tasks first for i in 0..num_tasks { @@ -113,8 +113,7 @@ fn bench_e2e_completion(c: &mut Criterion) { group.bench_function("single_task_roundtrip", |b| { b.iter_custom(|iters| { rt.block_on(async { - let ctx = BenchContext::new().await; - ctx.client.register::().await.unwrap(); + let ctx = BenchContext::with_builder(|b| b.register::().unwrap()).await; let worker = ctx .client diff --git a/src/client.rs b/src/client.rs index 0fae58d..be2ae85 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; -use tokio::sync::RwLock; use uuid::Uuid; use crate::error::{DurableError, DurableResult}; @@ -69,13 +68,14 @@ pub(crate) fn validate_headers(headers: &Option>) -> /// The main client for interacting with durable workflows. /// /// Use this client to: -/// - Register task types with [`register`](Self::register) /// - Spawn tasks with [`spawn`](Self::spawn) or [`spawn_with_options`](Self::spawn_with_options) /// - Start workers with [`start_worker`](Self::start_worker) /// - Manage queues with [`create_queue`](Self::create_queue), [`drop_queue`](Self::drop_queue) /// - Emit events with [`emit_event`](Self::emit_event) /// - Cancel tasks with [`cancel_task`](Self::cancel_task) /// +/// Tasks are registered at build time via [`DurableBuilder::register`]. +/// /// # Type Parameter /// /// * `State` - Application state type passed to task handlers. Use `()` if you @@ -88,6 +88,7 @@ pub(crate) fn validate_headers(headers: &Option>) -> /// let client = Durable::builder() /// .database_url("postgres://localhost/myapp") /// .queue_name("tasks") +/// .register::()? /// .build() /// .await?; /// @@ -104,7 +105,6 @@ pub(crate) fn validate_headers(headers: &Option>) -> /// .build_with_state(app_state) /// .await?; /// -/// client.register::().await?; /// client.spawn::(params).await?; /// ``` pub struct Durable @@ -115,7 +115,7 @@ where owns_pool: AtomicBool, queue_name: String, spawn_defaults: SpawnDefaults, - registry: Arc>>, + registry: Arc>, state: State, } @@ -143,13 +143,16 @@ impl Durable { } } - pub(crate) fn registry(&self) -> &Arc>> { + pub(crate) fn registry(&self) -> &Arc> { &self.registry } } /// Builder for configuring a [`Durable`] client. /// +/// Tasks are registered at build time via [`register`](Self::register) or +/// [`register_instance`](Self::register_instance). +/// /// # Example /// /// ```ignore @@ -161,15 +164,7 @@ impl Durable { /// .database_url("postgres://localhost/myapp") /// .queue_name("orders") /// .default_max_attempts(3) -/// .default_retry_strategy(RetryStrategy::Exponential { -/// base_delay: Duration::from_secs(5), -/// factor: 2.0, -/// max_backoff: Duration::from_secs(300), -/// }) -/// .default_cancellation(CancellationPolicy { -/// max_pending_time: Some(Duration::from_secs(3600)), -/// max_running_time: None, -/// }) +/// .register::()? /// .build() /// .await?; /// @@ -179,14 +174,15 @@ impl Durable { /// .build_with_state(my_app_state) /// .await?; /// ``` -pub struct DurableBuilder { +pub struct DurableBuilder { database_url: Option, pool: Option, queue_name: String, spawn_defaults: SpawnDefaults, + registry: TaskRegistry, } -impl DurableBuilder { +impl DurableBuilder { pub fn new() -> Self { Self { database_url: None, @@ -197,9 +193,12 @@ impl DurableBuilder { retry_strategy: None, cancellation: None, }, + registry: HashMap::new(), } } +} +impl DurableBuilder { /// Set database URL (will create a new connection pool) pub fn database_url(mut self, url: impl Into) -> Self { self.database_url = Some(url.into()); @@ -236,43 +235,43 @@ impl DurableBuilder { self } - /// Build the Durable client without application state. + /// Register a task type. Required before spawning or processing. /// - /// Use this when your tasks don't need access to shared resources - /// like HTTP clients or database pools. - pub async fn build(self) -> DurableResult> { - self.build_with_state(()).await + /// Returns an error if a task with the same name is already registered. + pub fn register + Default>(self) -> DurableResult { + self.register_instance(T::default()) + } + + /// Register a task instance. Required before spawning or processing. + /// + /// Use this when you need to register a task with runtime-determined metadata + /// (e.g., a TypeScript tool loaded from a config file). + /// + /// Returns an error if a task with the same name is already registered. + pub fn register_instance>(mut self, task: T) -> DurableResult { + let name = task.name(); + if self.registry.contains_key(name.as_ref()) { + return Err(DurableError::TaskAlreadyRegistered { + task_name: name.to_string(), + }); + } + self.registry.insert(name, Arc::new(TaskWrapper::new(task))); + Ok(self) } /// Build the Durable client with application state. /// /// The state will be cloned and passed to each task execution. - /// Use this to provide shared resources like HTTP clients, database pools, - /// or other application state to your tasks. /// /// # Example /// /// ```ignore - /// #[derive(Clone)] - /// struct AppState { - /// http_client: reqwest::Client, - /// db_pool: PgPool, - /// } - /// - /// let state = AppState { - /// http_client: reqwest::Client::new(), - /// db_pool: pool.clone(), - /// }; - /// /// let client = Durable::builder() /// .database_url("postgres://localhost/myapp") - /// .build_with_state(state) + /// .build_with_state(app_state) /// .await?; /// ``` - pub async fn build_with_state(self, state: State) -> DurableResult> - where - State: Clone + Send + Sync + 'static, - { + pub async fn build_with_state(self, state: State) -> DurableResult> { let (pool, owns_pool) = if let Some(pool) = self.pool { (pool, false) } else { @@ -288,18 +287,28 @@ impl DurableBuilder { owns_pool: AtomicBool::new(owns_pool), queue_name: self.queue_name, spawn_defaults: self.spawn_defaults, - registry: Arc::new(RwLock::new(HashMap::new())), + registry: Arc::new(self.registry), state, }) } } -impl Default for DurableBuilder { +impl Default for DurableBuilder<()> { fn default() -> Self { Self::new() } } +impl DurableBuilder<()> { + /// Build the Durable client without application state. + /// + /// Use this when your tasks don't need access to shared resources + /// like HTTP clients or database pools. + pub async fn build(self) -> DurableResult> { + self.build_with_state(()).await + } +} + impl Durable<()> { /// Create a new client with default settings (no application state). pub async fn new(database_url: &str) -> DurableResult { @@ -308,17 +317,17 @@ impl Durable<()> { .build() .await } - - /// Access the builder for custom configuration - pub fn builder() -> DurableBuilder { - DurableBuilder::new() - } } impl Durable where State: Clone + Send + Sync + 'static, { + /// Access the builder for custom configuration + pub fn builder() -> DurableBuilder { + DurableBuilder::new() + } + /// Get a reference to the underlying connection pool pub fn pool(&self) -> &PgPool { &self.pool @@ -338,31 +347,6 @@ where &self.spawn_defaults } - /// Register a task type. Required before spawning or processing. - /// - /// Returns an error if a task with the same name is already registered. - pub async fn register + Default>(&self) -> DurableResult<&Self> { - self.register_instance(T::default()).await - } - - /// Register a task instance. Required before spawning or processing. - /// - /// Use this when you need to register a task with runtime-determined metadata - /// (e.g., a TypeScript tool loaded from a config file). - /// - /// Returns an error if a task with the same name is already registered. - pub async fn register_instance>(&self, task: T) -> DurableResult<&Self> { - let mut registry = self.registry.write().await; - let name = task.name(); - if registry.contains_key(name.as_ref()) { - return Err(DurableError::TaskAlreadyRegistered { - task_name: name.to_string(), - }); - } - registry.insert(name, Arc::new(TaskWrapper::new(task))); - Ok(self) - } - /// Spawn a task (type-safe version) pub async fn spawn + Default>( &self, @@ -471,8 +455,7 @@ where { // Validate that the task is registered { - let registry = self.registry.read().await; - let Some(task) = registry.get(task_name) else { + let Some(task) = self.registry.get(task_name) else { return Err(DurableError::TaskNotRegistered { task_name: task_name.to_string(), }); diff --git a/src/lib.rs b/src/lib.rs index fed9b2e..6bc5e89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,8 +39,11 @@ //! //! #[tokio::main] //! async fn main() -> anyhow::Result<()> { -//! let client = Durable::new("postgres://localhost/myapp").await?; -//! client.register::().await?; +//! let client = Durable::builder() +//! .database_url("postgres://localhost/myapp") +//! .register::()? +//! .build() +//! .await?; //! //! client.spawn::(MyParams { value: 21 }).await?; //! diff --git a/src/worker.rs b/src/worker.rs index 1da7e58..bef8bc1 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -318,8 +318,7 @@ impl Worker { }; // Look up handler - let registry = durable.registry().read().await; - let handler = match registry.get(task.task_name.as_str()) { + let handler = match durable.registry().get(task.task_name.as_str()) { Some(h) => h.clone(), None => { tracing::error!("Unknown task: {}", task.task_name); @@ -336,7 +335,6 @@ impl Worker { return; } }; - drop(registry); // Execute task with timeout enforcement // We instrument the actual task execution itself, to continue the trace context diff --git a/tests/checkpoint_test.rs b/tests/checkpoint_test.rs index b05e890..3905a0b 100644 --- a/tests/checkpoint_test.rs +++ b/tests/checkpoint_test.rs @@ -8,17 +8,12 @@ use common::tasks::{ LargePayloadParams, LargePayloadTask, ManyStepsParams, ManyStepsTask, reset_deterministic_task_state, }; -use durable::{Durable, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; use sqlx::{AssertSqlSafe, PgPool}; use std::time::Duration; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -31,9 +26,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result<()> { use common::tasks::{StepCountingParams, StepCountingTask}; - let client = create_client(pool.clone(), "ckpt_replay").await; + let client = create_client(pool.clone(), "ckpt_replay") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // First, spawn a task that will fail after step2 let spawn_result = client @@ -85,9 +84,13 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result ); // Now test a successful task with multiple steps to verify checkpoints work - let client2 = create_client(pool.clone(), "ckpt_replay2").await; + let client2 = create_client(pool.clone(), "ckpt_replay2") + .register::() + .unwrap() + .build() + .await + .unwrap(); client2.create_queue(None).await.unwrap(); - client2.register::().await.unwrap(); let spawn_result2 = client2 .spawn::(StepCountingParams { @@ -130,9 +133,13 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result /// Test that ctx.rand() returns the same value after retry. #[sqlx::test(migrator = "MIGRATOR")] async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "ckpt_rand").await; + let client = create_client(pool.clone(), "ckpt_rand") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); reset_deterministic_task_state(); @@ -196,9 +203,13 @@ async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Resul /// Test that ctx.now() returns the same timestamp after retry. #[sqlx::test(migrator = "MIGRATOR")] async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "ckpt_now").await; + let client = create_client(pool.clone(), "ckpt_now") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); reset_deterministic_task_state(); @@ -258,9 +269,13 @@ async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result /// Test that ctx.uuid7() returns the same UUID after retry. #[sqlx::test(migrator = "MIGRATOR")] async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "ckpt_uuid").await; + let client = create_client(pool.clone(), "ckpt_uuid") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); reset_deterministic_task_state(); @@ -320,9 +335,13 @@ async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Resu /// Test that a task with 50+ steps completes correctly. #[sqlx::test(migrator = "MIGRATOR")] async fn test_long_workflow_many_steps(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "ckpt_long").await; + let client = create_client(pool.clone(), "ckpt_long") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let num_steps = 50; @@ -373,9 +392,13 @@ async fn test_long_workflow_many_steps(pool: PgPool) -> sqlx::Result<()> { /// Test that a step returning a large payload (1MB+) persists correctly. #[sqlx::test(migrator = "MIGRATOR")] async fn test_large_payload_checkpoint(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "ckpt_large").await; + let client = create_client(pool.clone(), "ckpt_large") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let size_bytes = 1_000_000; // 1MB diff --git a/tests/concurrency_test.rs b/tests/concurrency_test.rs index 5c430bf..43bf008 100644 --- a/tests/concurrency_test.rs +++ b/tests/concurrency_test.rs @@ -4,20 +4,15 @@ mod common; use common::helpers::wait_for_task_terminal; use common::tasks::{EchoParams, EchoTask}; -use durable::{Durable, MIGRATOR, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, WorkerOptions}; use sqlx::{AssertSqlSafe, PgPool}; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use tokio::sync::Barrier; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -27,9 +22,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { /// Test that a task is claimed by exactly one worker when multiple workers compete. #[sqlx::test(migrator = "MIGRATOR")] async fn test_task_claimed_by_exactly_one_worker(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "conc_claim").await; + let client = create_client(pool.clone(), "conc_claim") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a single task let spawn_result = client @@ -49,8 +48,12 @@ async fn test_task_claimed_by_exactly_one_worker(pool: PgPool) -> sqlx::Result<( let barrier_clone = barrier.clone(); let worker_handle = tokio::spawn(async move { - let client = create_client(pool_clone, "conc_claim").await; - client.register::().await.unwrap(); + let client = create_client(pool_clone, "conc_claim") + .register::() + .unwrap() + .build() + .await + .unwrap(); // Synchronize all workers to start at the same time barrier_clone.wait().await; @@ -121,9 +124,13 @@ async fn test_task_claimed_by_exactly_one_worker(pool: PgPool) -> sqlx::Result<( /// Test that concurrent claims with SKIP LOCKED do not cause deadlocks. #[sqlx::test(migrator = "MIGRATOR")] async fn test_concurrent_claims_with_skip_locked(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "conc_skip").await; + let client = create_client(pool.clone(), "conc_skip") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn many tasks let num_tasks = 50; @@ -149,8 +156,12 @@ async fn test_concurrent_claims_with_skip_locked(pool: PgPool) -> sqlx::Result<( let barrier_clone = barrier.clone(); let handle = tokio::spawn(async move { - let client = create_client(pool_clone, "conc_skip").await; - client.register::().await.unwrap(); + let client = create_client(pool_clone, "conc_skip") + .register::() + .unwrap() + .build() + .await + .unwrap(); // Synchronize all workers to start at the same time barrier_clone.wait().await; diff --git a/tests/crash_test.rs b/tests/crash_test.rs index 7107f75..6d6e805 100644 --- a/tests/crash_test.rs +++ b/tests/crash_test.rs @@ -13,15 +13,10 @@ use common::tasks::{ CpuBoundParams, CpuBoundTask, LongRunningHeartbeatParams, LongRunningHeartbeatTask, SlowNoHeartbeatParams, SlowNoHeartbeatTask, StepCountingParams, StepCountingTask, }; -use durable::{Durable, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -31,9 +26,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { /// Test that a task resumes from checkpoint after a worker crash. #[sqlx::test(migrator = "MIGRATOR")] async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "crash_ckpt").await; + let client = create_client(pool.clone(), "crash_ckpt") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a task that will fail after step 2 let spawn_result = client @@ -114,9 +113,13 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu /// Uses real time delays since fake time only affects database, not worker's tokio timing. #[sqlx::test(migrator = "MIGRATOR")] async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "crash_drop").await; + let client = create_client(pool.clone(), "crash_drop") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let claim_timeout = Duration::from_secs(2); // 2 second lease @@ -184,9 +187,13 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { /// Test that a new worker can claim a task after the original worker's lease expires. #[sqlx::test(migrator = "MIGRATOR")] async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "crash_lease").await; + let client = create_client(pool.clone(), "crash_lease") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; @@ -253,9 +260,13 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> /// Test that heartbeats prevent lease expiration for long-running tasks. #[sqlx::test(migrator = "MIGRATOR")] async fn test_heartbeat_prevents_lease_expiration(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "crash_hb").await; + let client = create_client(pool.clone(), "crash_hb") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let claim_timeout = Duration::from_secs(2); // 2 second lease @@ -307,10 +318,15 @@ async fn test_spawn_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { tracing_subscriber::fmt::init(); use common::tasks::{DoubleTask, SingleSpawnParams, SingleSpawnTask}; - let client = create_client(pool.clone(), "crash_spawn").await; + let client = create_client(pool.clone(), "crash_spawn") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Child task type // Spawn parent task that spawns a child let spawn_result = client @@ -371,9 +387,13 @@ async fn test_spawn_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { /// Test that steps are idempotent after retry. #[sqlx::test(migrator = "MIGRATOR")] async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "crash_step").await; + let client = create_client(pool.clone(), "crash_step") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task that fails after step 2, then succeeds on retry // But fail_after_step2 is always true, so it will fail on retries too @@ -437,9 +457,13 @@ async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { /// Test that a CPU-bound task that can't heartbeat gets reclaimed. #[sqlx::test(migrator = "MIGRATOR")] async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "crash_cpu").await; + let client = create_client(pool.clone(), "crash_cpu") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; @@ -501,9 +525,13 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { /// Uses real time delays since fake time only affects database, not worker's tokio timing. #[sqlx::test(migrator = "MIGRATOR")] async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "crash_slow").await; + let client = create_client(pool.clone(), "crash_slow") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let claim_timeout = Duration::from_secs(2); // 2 second lease diff --git a/tests/event_test.rs b/tests/event_test.rs index 5005b78..6c9dbad 100644 --- a/tests/event_test.rs +++ b/tests/event_test.rs @@ -4,20 +4,18 @@ mod common; use common::helpers::{get_task_state, wait_for_task_terminal}; use common::tasks::{EventEmitterParams, EventEmitterTask, EventWaitParams, EventWaitingTask}; -use durable::{Durable, DurableEventPayload, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{ + Durable, DurableBuilder, DurableEventPayload, MIGRATOR, RetryStrategy, SpawnOptions, + WorkerOptions, +}; use serde_json::json; use sqlx::postgres::PgConnectOptions; use sqlx::{AssertSqlSafe, Connection, PgConnection, PgPool}; use std::time::{Duration, Instant}; use uuid::Uuid; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -27,9 +25,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { /// Test that emit_event wakes a task blocked on await_event. #[sqlx::test(migrator = "MIGRATOR")] async fn test_emit_event_wakes_waiter(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_wake").await; + let client = create_client(pool.clone(), "event_wake") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task that waits for an event let spawn_result = client @@ -95,9 +97,13 @@ async fn test_emit_event_wakes_waiter(pool: PgPool) -> sqlx::Result<()> { /// Test that await_event returns immediately if event already exists. #[sqlx::test(migrator = "MIGRATOR")] async fn test_event_already_emitted_returns_immediately(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_pre").await; + let client = create_client(pool.clone(), "event_pre") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Emit the event BEFORE spawning the task client @@ -152,9 +158,13 @@ async fn test_event_already_emitted_returns_immediately(pool: PgPool) -> sqlx::R /// Test that await_event times out correctly. #[sqlx::test(migrator = "MIGRATOR")] async fn test_event_timeout_triggers(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_timeout").await; + let client = create_client(pool.clone(), "event_timeout") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task with short timeout, never emit event let spawn_result = client @@ -201,9 +211,13 @@ async fn test_event_timeout_triggers(pool: PgPool) -> sqlx::Result<()> { /// Test that multiple tasks waiting for the same event all wake up. #[sqlx::test(migrator = "MIGRATOR")] async fn test_multiple_waiters_same_event(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_multi").await; + let client = create_client(pool.clone(), "event_multi") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn multiple tasks waiting for the same event let task1 = client @@ -274,9 +288,13 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> use common::tasks::{EventThenFailParams, EventThenFailTask, reset_event_then_fail_state}; - let client = create_client(pool.clone(), "event_retry").await; + let client = create_client(pool.clone(), "event_retry") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); reset_event_then_fail_state(); @@ -345,9 +363,13 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> /// Subsequent emits for the same event are no-ops to maintain consistency with lost-wakeup prevention. #[sqlx::test(migrator = "MIGRATOR")] async fn test_event_first_writer_wins(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_dedup").await; + let client = create_client(pool.clone(), "event_dedup") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Emit the event twice with different payloads client @@ -408,9 +430,13 @@ async fn test_event_first_writer_wins(pool: PgPool) -> sqlx::Result<()> { async fn test_multiple_distinct_events(pool: PgPool) -> sqlx::Result<()> { use common::tasks::{MultiEventParams, MultiEventTask}; - let client = create_client(pool.clone(), "event_distinct").await; + let client = create_client(pool.clone(), "event_distinct") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task that waits for two events let spawn_result = client @@ -479,9 +505,13 @@ async fn test_multiple_distinct_events(pool: PgPool) -> sqlx::Result<()> { async fn test_event_write_does_not_propagate_after_wake(pool: PgPool) -> sqlx::Result<()> { use common::tasks::{EventThenDelayParams, EventThenDelayTask}; - let client = create_client(pool.clone(), "event_no_propagate").await; + let client = create_client(pool.clone(), "event_no_propagate") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task that waits for event, then delays before completing let spawn_result = client @@ -549,10 +579,15 @@ async fn test_event_write_does_not_propagate_after_wake(pool: PgPool) -> sqlx::R /// Test that one task can emit an event that another task awaits. #[sqlx::test(migrator = "MIGRATOR")] async fn test_emit_from_different_task(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_cross").await; + let client = create_client(pool.clone(), "event_cross") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn the waiter task first let waiter = client @@ -614,7 +649,10 @@ async fn test_emit_from_different_task(pool: PgPool) -> sqlx::Result<()> { /// Test that emit_event_with in a committed transaction persists the event. #[sqlx::test(migrator = "MIGRATOR")] async fn test_emit_event_with_transaction_commit(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_tx_commit").await; + let client = create_client(pool.clone(), "event_tx_commit") + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); // Create a test table @@ -663,7 +701,10 @@ async fn test_emit_event_with_transaction_commit(pool: PgPool) -> sqlx::Result<( /// Test that emit_event_with in a rolled back transaction does NOT persist the event. #[sqlx::test(migrator = "MIGRATOR")] async fn test_emit_event_with_transaction_rollback(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "event_tx_rollback").await; + let client = create_client(pool.clone(), "event_tx_rollback") + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); // Create a test table @@ -719,9 +760,13 @@ async fn test_emit_event_with_transaction_rollback(pool: PgPool) -> sqlx::Result async fn test_event_timeout_error_payload(pool: PgPool) -> sqlx::Result<()> { use common::helpers::get_failed_payload; - let client = create_client(pool.clone(), "event_timeout_payload").await; + let client = create_client(pool.clone(), "event_timeout_payload") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task with short timeout, never emit event let spawn_result = client @@ -789,7 +834,10 @@ async fn test_event_timeout_error_payload(pool: PgPool) -> sqlx::Result<()> { async fn test_emit_event_with_empty_name_fails(pool: PgPool) -> sqlx::Result<()> { use durable::DurableError; - let client = create_client(pool.clone(), "event_empty_name").await; + let client = create_client(pool.clone(), "event_empty_name") + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); // Try to emit event with empty name @@ -833,9 +881,13 @@ async fn test_event_race_stress(pool: PgPool) -> sqlx::Result<()> { .and_then(|s| s.parse().ok()) .unwrap_or(8); - let client = create_client(pool.clone(), "event_race").await; + let client = create_client(pool.clone(), "event_race") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let worker = client .start_worker(WorkerOptions { @@ -880,7 +932,7 @@ async fn test_event_race_stress(pool: PgPool) -> sqlx::Result<()> { // Jitter: vary start times let jitter = Duration::from_micros((i as u64 * 17) % (jitter_ms * 1000)); tokio::time::sleep(jitter).await; - let emit_client = create_client(pool, "event_race").await; + let emit_client = create_client(pool, "event_race").build().await.unwrap(); emit_client .emit_event::(&event_name, &json!({"idx": i}), None) .await @@ -1143,9 +1195,13 @@ async fn test_await_emit_event_race_does_not_lose_wakeup(pool: PgPool) -> sqlx:: /// joiners to see NULL and sleep forever. #[sqlx::test(migrator = "MIGRATOR")] async fn test_await_event_late_joiner_sees_payload(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "late_join").await; + let client = create_client(pool.clone(), "late_join") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let event_name = "late-joiner-event"; let payload = json!({"late": "joiner"}); diff --git a/tests/execution_test.rs b/tests/execution_test.rs index da4beb7..436469d 100644 --- a/tests/execution_test.rs +++ b/tests/execution_test.rs @@ -7,19 +7,14 @@ use common::tasks::{ MultiStepOutput, MultiStepTask, MultipleCallsOutput, MultipleConvenienceCallsTask, ResearchParams, ResearchResult, ResearchTask, ReservedPrefixTask, }; -use durable::{Durable, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; use sqlx::{AssertSqlSafe, PgPool}; use std::borrow::Cow; use std::time::Duration; -/// Helper to create a Durable client from the test pool. -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +/// Helper to create a DurableBuilder from the test pool. +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } #[derive(sqlx::FromRow)] @@ -68,9 +63,13 @@ async fn get_task_result( #[sqlx::test(migrator = "MIGRATOR")] async fn test_simple_task_executes_and_completes(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_simple").await; + let client = create_client(pool.clone(), "exec_simple") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a task let spawn_result = client @@ -111,9 +110,13 @@ async fn test_simple_task_executes_and_completes(pool: PgPool) -> sqlx::Result<( #[sqlx::test(migrator = "MIGRATOR")] async fn test_task_state_transitions(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_states").await; + let client = create_client(pool.clone(), "exec_states") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a task let spawn_result = client @@ -150,9 +153,13 @@ async fn test_task_state_transitions(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_empty_params_task_executes(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_empty").await; + let client = create_client(pool.clone(), "exec_empty") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(()) @@ -188,9 +195,13 @@ async fn test_empty_params_task_executes(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_multi_step_task_completes_all_steps(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_steps").await; + let client = create_client(pool.clone(), "exec_steps") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(()) @@ -231,9 +242,13 @@ async fn test_multi_step_task_completes_all_steps(pool: PgPool) -> sqlx::Result< #[sqlx::test(migrator = "MIGRATOR")] async fn test_multiple_tasks_execute_concurrently(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_concurrent").await; + let client = create_client(pool.clone(), "exec_concurrent") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn multiple tasks let mut task_ids = Vec::new(); @@ -273,9 +288,13 @@ async fn test_multiple_tasks_execute_concurrently(pool: PgPool) -> sqlx::Result< #[sqlx::test(migrator = "MIGRATOR")] async fn test_worker_concurrency_limit_respected(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_limit").await; + let client = create_client(pool.clone(), "exec_limit") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn more tasks than concurrency limit for i in 0..10 { @@ -316,9 +335,13 @@ async fn test_worker_concurrency_limit_respected(pool: PgPool) -> sqlx::Result<( #[sqlx::test(migrator = "MIGRATOR")] async fn test_worker_graceful_shutdown_waits(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_shutdown").await; + let client = create_client(pool.clone(), "exec_shutdown") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(EchoParams { @@ -353,7 +376,10 @@ async fn test_worker_graceful_shutdown_waits(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_unregistered_task_fails(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_unreg").await; + let client = create_client(pool.clone(), "exec_unreg") + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); // Note: We don't register any task handler @@ -386,9 +412,13 @@ async fn test_unregistered_task_fails(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_task_result_stored_correctly(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_result").await; + let client = create_client(pool.clone(), "exec_result") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let test_message = "This is a test message with special chars: <>&\"'"; @@ -425,9 +455,13 @@ async fn test_task_result_stored_correctly(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_research_task_readme_example(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_research").await; + let client = create_client(pool.clone(), "exec_research") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(ResearchParams { @@ -476,9 +510,13 @@ async fn test_research_task_readme_example(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_convenience_methods_execute(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_convenience").await; + let client = create_client(pool.clone(), "exec_convenience") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(()) @@ -525,12 +563,13 @@ async fn test_convenience_methods_execute(pool: PgPool) -> sqlx::Result<()> { async fn test_multiple_convenience_calls_produce_different_values( pool: PgPool, ) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_multi_convenience").await; - client.create_queue(None).await.unwrap(); - client + let client = create_client(pool.clone(), "exec_multi_convenience") .register::() + .unwrap() + .build() .await .unwrap(); + client.create_queue(None).await.unwrap(); let spawn_result = client .spawn::(()) @@ -572,9 +611,13 @@ async fn test_multiple_convenience_calls_produce_different_values( #[sqlx::test(migrator = "MIGRATOR")] async fn test_reserved_prefix_rejected(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "exec_reserved").await; + let client = create_client(pool.clone(), "exec_reserved") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(()) @@ -605,9 +648,13 @@ async fn test_reserved_prefix_rejected(pool: PgPool) -> sqlx::Result<()> { async fn test_reserved_prefix_error_payload(pool: PgPool) -> sqlx::Result<()> { use common::helpers::get_failed_payload; - let client = create_client(pool.clone(), "exec_reserved_payload").await; + let client = create_client(pool.clone(), "exec_reserved_payload") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn_with_options::((), { @@ -667,9 +714,13 @@ async fn test_reserved_prefix_error_payload(pool: PgPool) -> sqlx::Result<()> { async fn test_long_running_task_with_heartbeat_completes(pool: PgPool) -> sqlx::Result<()> { use common::tasks::{LongRunningHeartbeatParams, LongRunningHeartbeatTask}; - let client = create_client(pool.clone(), "exec_heartbeat_timer").await; + let client = create_client(pool.clone(), "exec_heartbeat_timer") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Run a task for 3 seconds with 1 second claim_timeout // Task heartbeats every 200ms, so it should stay alive @@ -762,19 +813,6 @@ impl durable::Task for WriteToDbTask { } } -/// Helper to create a Durable client with application state. -async fn create_client_with_state(pool: PgPool, queue_name: &str) -> durable::Durable { - let app_state = AppState { - db_pool: pool.clone(), - }; - durable::Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build_with_state(app_state) - .await - .expect("Failed to create Durable client with state") -} - #[sqlx::test(migrator = "MIGRATOR")] async fn test_task_uses_application_state(pool: PgPool) -> sqlx::Result<()> { // Create a test table for the task to write to @@ -788,10 +826,19 @@ async fn test_task_uses_application_state(pool: PgPool) -> sqlx::Result<()> { .execute(&pool) .await?; - // Create client with application state - let client = create_client_with_state(pool.clone(), "exec_state").await; + let app_state = AppState { + db_pool: pool.clone(), + }; + let client = durable::Durable::builder() + .pool(pool.clone()) + .queue_name("exec_state") + .register::() + .unwrap() + .build_with_state(app_state) + .await + .expect("Failed to create Durable client with state"); + client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a task that will use the state to write to the database let spawn_result = client diff --git a/tests/fanout_test.rs b/tests/fanout_test.rs index 6312690..f8d1bd3 100644 --- a/tests/fanout_test.rs +++ b/tests/fanout_test.rs @@ -8,19 +8,16 @@ use common::tasks::{ SpawnByNameParams, SpawnByNameTask, SpawnFailingChildTask, SpawnSlowChildParams, SpawnSlowChildTask, }; -use durable::{CancellationPolicy, Durable, MIGRATOR, RetryStrategy, WorkerOptions}; +use durable::{ + CancellationPolicy, Durable, DurableBuilder, MIGRATOR, RetryStrategy, WorkerOptions, +}; use serde_json::Value as JsonValue; use sqlx::{AssertSqlSafe, PgPool}; use std::time::Duration; -/// Helper to create a Durable client from the test pool. -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +/// Helper to create a DurableBuilder from the test pool. +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } #[derive(sqlx::FromRow)] @@ -91,10 +88,15 @@ async fn get_parent_task_id( #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_single_child_and_join(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "fanout_single").await; + let client = create_client(pool.clone(), "fanout_single") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task let spawn_result = client @@ -138,10 +140,15 @@ async fn test_spawn_single_child_and_join(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_multiple_children_and_join(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "fanout_multi").await; + let client = create_client(pool.clone(), "fanout_multi") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task with multiple values let spawn_result = client @@ -192,10 +199,15 @@ async fn test_spawn_multiple_children_and_join(pool: PgPool) -> sqlx::Result<()> #[sqlx::test(migrator = "MIGRATOR")] async fn test_child_has_parent_task_id(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "fanout_parent").await; + let client = create_client(pool.clone(), "fanout_parent") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task let spawn_result = client @@ -249,10 +261,15 @@ async fn test_child_has_parent_task_id(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_child_failure_propagates_to_parent(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "fanout_fail").await; + let client = create_client(pool.clone(), "fanout_fail") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task that will spawn a failing child // Use max_attempts=1 for both parent and child to avoid long retry waits @@ -292,10 +309,15 @@ async fn test_child_failure_propagates_to_parent(pool: PgPool) -> sqlx::Result<( #[sqlx::test(migrator = "MIGRATOR")] async fn test_cascade_cancel_when_parent_cancelled(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "fanout_cancel").await; + let client = create_client(pool.clone(), "fanout_cancel") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task that will spawn a slow child (5 seconds) let spawn_result = client @@ -366,10 +388,15 @@ async fn test_cascade_cancel_when_parent_auto_cancelled_by_max_duration( // Use single-conn pool for fake_time let test_pool = single_conn_pool(&pool).await; - let client = create_client(test_pool.clone(), "fanout_auto_cancel").await; + let client = create_client(test_pool.clone(), "fanout_auto_cancel") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&test_pool, start_time).await?; @@ -511,13 +538,15 @@ async fn test_spawn_by_name_from_task_context(pool: PgPool) -> sqlx::Result<()> max_pending_time: Some(Duration::from_secs(3600)), max_running_time: None, }) + .register::() + .unwrap() + .register::() + .unwrap() .build() .await .expect("Failed to create client"); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task that will use spawn_by_name internally let spawn_result = client @@ -611,10 +640,15 @@ async fn test_join_cancelled_child_returns_child_cancelled_error(pool: PgPool) - use common::helpers::{get_failed_payload, wait_for_task_terminal}; use common::tasks::{JoinCancelledChildParams, JoinCancelledChildTask, SlowChildTask}; - let client = create_client(pool.clone(), "fanout_child_cancel").await; + let client = create_client(pool.clone(), "fanout_child_cancel") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task with max_attempts=1 to fail quickly let spawn_result = client @@ -703,10 +737,15 @@ async fn test_join_cancelled_child_returns_child_cancelled_error(pool: PgPool) - async fn test_child_failed_error_contains_message(pool: PgPool) -> sqlx::Result<()> { use common::helpers::{get_failed_payload, wait_for_task_terminal}; - let client = create_client(pool.clone(), "fanout_child_msg").await; + let client = create_client(pool.clone(), "fanout_child_msg") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent task with max_attempts=1 let spawn_result = client @@ -775,10 +814,15 @@ async fn test_join_timeout_when_parent_claim_expires(pool: PgPool) -> sqlx::Resu use common::helpers::{get_failed_payload, wait_for_task_terminal}; use common::tasks::{SlowChildTask, SpawnSlowChildParams, SpawnSlowChildTask}; - let client = create_client(pool.clone(), "fanout_join_timeout").await; + let client = create_client(pool.clone(), "fanout_join_timeout") + .register::() + .unwrap() + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); - client.register::().await.unwrap(); // Spawn parent with short claim_timeout that will expire while waiting for slow child // The parent doesn't heartbeat during join(), so its claim will expire diff --git a/tests/lease_test.rs b/tests/lease_test.rs index 7e0a36d..4c4c14b 100644 --- a/tests/lease_test.rs +++ b/tests/lease_test.rs @@ -10,17 +10,12 @@ use common::tasks::{ LongRunningHeartbeatParams, LongRunningHeartbeatTask, SleepThenCheckpointParams, SleepThenCheckpointTask, }; -use durable::{Durable, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; use sqlx::PgPool; use std::time::Duration; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -30,9 +25,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { /// Test that claiming a task sets the correct expiry time. #[sqlx::test(migrator = "MIGRATOR")] async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lease_claim").await; + let client = create_client(pool.clone(), "lease_claim") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; @@ -89,9 +88,13 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { /// Test that heartbeat extends the lease. #[sqlx::test(migrator = "MIGRATOR")] async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lease_hb").await; + let client = create_client(pool.clone(), "lease_hb") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; @@ -164,9 +167,13 @@ async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { use common::tasks::{ManyStepsParams, ManyStepsTask}; - let client = create_client(pool.clone(), "lease_ckpt").await; + let client = create_client(pool.clone(), "lease_ckpt") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; @@ -235,9 +242,13 @@ async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { /// Test that checkpoints are rejected once the lease has expired. #[sqlx::test(migrator = "MIGRATOR")] async fn test_checkpoint_rejected_after_lease_expired(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lease_expired_ckpt").await; + let client = create_client(pool.clone(), "lease_expired_ckpt") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let claim_timeout = Duration::from_secs(1); @@ -317,9 +328,13 @@ async fn test_checkpoint_rejected_after_lease_expired(pool: PgPool) -> sqlx::Res async fn test_checkpoint_rejected_after_lease_expired_single_worker( pool: PgPool, ) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lease_expired_single").await; + let client = create_client(pool.clone(), "lease_expired_single") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let claim_timeout = Duration::from_secs(1); @@ -386,9 +401,13 @@ async fn test_checkpoint_rejected_after_lease_expired_single_worker( /// Test that heartbeat detects if the task has been cancelled. #[sqlx::test(migrator = "MIGRATOR")] async fn test_heartbeat_detects_cancellation(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lease_cancel").await; + let client = create_client(pool.clone(), "lease_cancel") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a long-running task let spawn_result = client diff --git a/tests/lock_order_test.rs b/tests/lock_order_test.rs index 4048dea..c0912dc 100644 --- a/tests/lock_order_test.rs +++ b/tests/lock_order_test.rs @@ -21,20 +21,18 @@ use common::helpers::{get_task_state, single_conn_pool, wait_for_task_terminal}; use common::tasks::{ DoubleParams, DoubleTask, FailingParams, FailingTask, SleepParams, SleepingTask, }; -use durable::{Durable, DurableEventPayload, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{ + Durable, DurableBuilder, DurableEventPayload, MIGRATOR, RetryStrategy, SpawnOptions, + WorkerOptions, +}; use serde_json::json; use sqlx::postgres::{PgConnectOptions, PgConnection}; use sqlx::{AssertSqlSafe, Connection, PgPool}; use std::time::{Duration, Instant}; use uuid::Uuid; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -45,9 +43,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { /// Completes a task and verifies the task reaches completed state. #[sqlx::test(migrator = "MIGRATOR")] async fn test_complete_run_with_lock_ordering(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lock_complete").await; + let client = create_client(pool.clone(), "lock_complete") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(DoubleParams { value: 21 }) @@ -81,9 +83,13 @@ async fn test_complete_run_with_lock_ordering(pool: PgPool) -> sqlx::Result<()> /// Fails a task and verifies it eventually reaches failed state after retries. #[sqlx::test(migrator = "MIGRATOR")] async fn test_fail_run_with_lock_ordering(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lock_fail").await; + let client = create_client(pool.clone(), "lock_fail") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn_with_options::( @@ -129,9 +135,13 @@ async fn test_fail_run_with_lock_ordering(pool: PgPool) -> sqlx::Result<()> { /// Sleeps and verifies the task suspends and then completes. #[sqlx::test(migrator = "MIGRATOR")] async fn test_sleep_for_with_lock_ordering(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lock_sleep").await; + let client = create_client(pool.clone(), "lock_sleep") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(SleepParams { seconds: 1 }) @@ -166,9 +176,13 @@ async fn test_sleep_for_with_lock_ordering(pool: PgPool) -> sqlx::Result<()> { /// This would deadlock if lock ordering were inconsistent. #[sqlx::test(migrator = "MIGRATOR")] async fn test_concurrent_complete_and_cancel(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "lock_conc_cc").await; + let client = create_client(pool.clone(), "lock_conc_cc") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn several tasks let mut task_ids = Vec::new(); @@ -228,9 +242,13 @@ async fn test_concurrent_complete_and_cancel(pool: PgPool) -> sqlx::Result<()> { async fn test_emit_event_with_lock_ordering(pool: PgPool) -> sqlx::Result<()> { use common::tasks::{EventWaitParams, EventWaitingTask}; - let client = create_client(pool.clone(), "lock_emit").await; + let client = create_client(pool.clone(), "lock_emit") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(EventWaitParams { @@ -297,9 +315,13 @@ async fn test_concurrent_emit_and_cancel(pool: PgPool) -> sqlx::Result<()> { // Use single-conn pool to ensure deterministic event emission let test_pool = single_conn_pool(&pool).await; - let client = create_client(pool.clone(), "lock_emit_cancel").await; + let client = create_client(pool.clone(), "lock_emit_cancel") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn multiple tasks waiting for the same event let mut task_ids = Vec::new(); diff --git a/tests/partition_test.rs b/tests/partition_test.rs index 0a44298..67f9517 100644 --- a/tests/partition_test.rs +++ b/tests/partition_test.rs @@ -4,17 +4,12 @@ mod common; use common::helpers::{count_runs_for_task, get_checkpoint_count, wait_for_task_terminal}; use common::tasks::{StepCountingParams, StepCountingTask}; -use durable::{Durable, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; use sqlx::PgPool; use std::time::Duration; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -25,9 +20,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { /// Simulates "connection lost during checkpoint" by using a task that fails after step 2. #[sqlx::test(migrator = "MIGRATOR")] async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "part_ckpt").await; + let client = create_client(pool.clone(), "part_ckpt") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a task that will fail after step 2 (simulating checkpoint failure) let spawn_result = client @@ -89,9 +88,13 @@ async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> { use common::tasks::{SlowNoHeartbeatParams, SlowNoHeartbeatTask}; - let client = create_client(pool.clone(), "part_stale").await; + let client = create_client(pool.clone(), "part_stale") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let claim_timeout = Duration::from_secs(2); // Short lease diff --git a/tests/polling_test.rs b/tests/polling_test.rs index 7242b7d..ed7de0d 100644 --- a/tests/polling_test.rs +++ b/tests/polling_test.rs @@ -15,12 +15,13 @@ async fn test_poll_completed_task(pool: PgPool) { let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_poll_completed") + .register::() + .unwrap() .build() .await .unwrap(); durable.create_queue(None).await.unwrap(); - durable.register::().await.unwrap(); let spawned = durable .spawn::(EchoParams { @@ -88,12 +89,13 @@ async fn test_poll_failed_task(pool: PgPool) { let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_poll_failed") + .register::() + .unwrap() .build() .await .unwrap(); durable.create_queue(None).await.unwrap(); - durable.register::().await.unwrap(); // Spawn with max_attempts=1 and no retry so it fails immediately let spawned = durable @@ -151,12 +153,13 @@ async fn test_poll_cancelled_task(pool: PgPool) { let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_poll_cancelled") + .register::() + .unwrap() .build() .await .unwrap(); durable.create_queue(None).await.unwrap(); - durable.register::().await.unwrap(); let spawned = durable .spawn::(EchoParams { @@ -183,12 +186,13 @@ async fn test_poll_failed_task_user_error(pool: PgPool) { let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_poll_user_error") + .register::() + .unwrap() .build() .await .unwrap(); durable.create_queue(None).await.unwrap(); - durable.register::().await.unwrap(); // UserErrorTask returns a non-retryable User error, so max_attempts doesn't matter // but set to 1 to be explicit diff --git a/tests/queue_test.rs b/tests/queue_test.rs index e33b077..250c975 100644 --- a/tests/queue_test.rs +++ b/tests/queue_test.rs @@ -2,17 +2,12 @@ mod common; -use durable::{Durable, MIGRATOR}; +use durable::{Durable, DurableBuilder, MIGRATOR}; use sqlx::PgPool; -/// Helper to create a Durable client from the test pool. -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +/// Helper to create a DurableBuilder from the test pool. +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -21,7 +16,10 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_queue_successfully(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "test_queue").await; + let client = create_client(pool.clone(), "test_queue") + .build() + .await + .unwrap(); // Create the queue client @@ -38,7 +36,10 @@ async fn test_create_queue_successfully(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_queue_is_idempotent(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "idempotent_queue").await; + let client = create_client(pool.clone(), "idempotent_queue") + .build() + .await + .unwrap(); // Create the same queue twice - should not error client @@ -60,7 +61,10 @@ async fn test_create_queue_is_idempotent(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_queue_with_explicit_name(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "default_queue").await; + let client = create_client(pool.clone(), "default_queue") + .build() + .await + .unwrap(); // Create a queue with an explicit name different from default client @@ -80,7 +84,10 @@ async fn test_create_queue_with_explicit_name(pool: PgPool) -> sqlx::Result<()> #[sqlx::test(migrator = "MIGRATOR")] async fn test_drop_queue_removes_it(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "drop_test_queue").await; + let client = create_client(pool.clone(), "drop_test_queue") + .build() + .await + .unwrap(); // Create then drop the queue client @@ -101,7 +108,10 @@ async fn test_drop_queue_removes_it(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_drop_queue_with_explicit_name(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "default").await; + let client = create_client(pool.clone(), "default") + .build() + .await + .unwrap(); // Create a queue with explicit name client @@ -127,7 +137,10 @@ async fn test_drop_queue_with_explicit_name(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_queues_returns_all_created_queues(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "default").await; + let client = create_client(pool.clone(), "default") + .build() + .await + .unwrap(); // Create multiple queues client @@ -154,7 +167,10 @@ async fn test_list_queues_returns_all_created_queues(pool: PgPool) -> sqlx::Resu #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_queues_empty_initially(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "default").await; + let client = create_client(pool.clone(), "default") + .build() + .await + .unwrap(); // Without creating any queues, list should be empty let queues = client.list_queues().await.expect("Failed to list queues"); @@ -169,7 +185,10 @@ async fn test_list_queues_empty_initially(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_queue_with_underscores(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "queue_with_underscores").await; + let client = create_client(pool.clone(), "queue_with_underscores") + .build() + .await + .unwrap(); client .create_queue(None) @@ -184,7 +203,10 @@ async fn test_create_queue_with_underscores(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_queue_with_numbers(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "queue123").await; + let client = create_client(pool.clone(), "queue123") + .build() + .await + .unwrap(); client .create_queue(None) diff --git a/tests/retry_test.rs b/tests/retry_test.rs index 63717d8..ae4613d 100644 --- a/tests/retry_test.rs +++ b/tests/retry_test.rs @@ -4,17 +4,12 @@ mod common; use common::helpers::{advance_time, count_runs_for_task, set_fake_time, wait_for_task_terminal}; use common::tasks::{FailingParams, FailingTask, UserErrorParams, UserErrorTask}; -use durable::{Durable, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, RetryStrategy, SpawnOptions, WorkerOptions}; use sqlx::{AssertSqlSafe, PgPool}; use std::time::Duration; -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -24,9 +19,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { /// Test that RetryStrategy::None creates no retry run. #[sqlx::test(migrator = "MIGRATOR")] async fn test_retry_strategy_none_no_retry(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "retry_none").await; + let client = create_client(pool.clone(), "retry_none") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task with no retry strategy let spawn_result = client @@ -74,9 +73,13 @@ async fn test_retry_strategy_none_no_retry(pool: PgPool) -> sqlx::Result<()> { /// Test that RetryStrategy::Fixed creates retry at T + base_seconds. #[sqlx::test(migrator = "MIGRATOR")] async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "retry_fixed").await; + let client = create_client(pool.clone(), "retry_fixed") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Set fake time for deterministic testing let start_time = chrono::Utc::now(); @@ -154,9 +157,13 @@ async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { /// Test that RetryStrategy::Exponential increases delays correctly. #[sqlx::test(migrator = "MIGRATOR")] async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "retry_exp").await; + let client = create_client(pool.clone(), "retry_exp") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; @@ -239,9 +246,13 @@ async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<( /// Test that max_attempts is honored and task fails permanently after N attempts. #[sqlx::test(migrator = "MIGRATOR")] async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "retry_max").await; + let client = create_client(pool.clone(), "retry_max") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; @@ -305,9 +316,13 @@ async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { /// Test that a UserError (non-retryable error) does not get retried even with retry strategy. #[sqlx::test(migrator = "MIGRATOR")] async fn test_user_error_not_retried(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "user_error_no_retry").await; + let client = create_client(pool.clone(), "user_error_no_retry") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn task WITH retry strategy configured - but UserError should still not retry let spawn_result = client diff --git a/tests/spawn_test.rs b/tests/spawn_test.rs index 41635ba..a5e92ee 100644 --- a/tests/spawn_test.rs +++ b/tests/spawn_test.rs @@ -3,19 +3,17 @@ mod common; use common::tasks::{EchoParams, EchoTask, FailingParams, FailingTask}; -use durable::{CancellationPolicy, Durable, DurableError, MIGRATOR, RetryStrategy, SpawnOptions}; +use durable::{ + CancellationPolicy, Durable, DurableBuilder, DurableError, MIGRATOR, RetryStrategy, + SpawnOptions, +}; use sqlx::PgPool; use std::collections::HashMap; use std::time::Duration; -/// Helper to create a Durable client from the test pool. -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +/// Helper to create a DurableBuilder from the test pool. +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -24,9 +22,13 @@ async fn create_client(pool: PgPool, queue_name: &str) -> Durable { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_returns_valid_ids(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_test").await; + let client = create_client(pool.clone(), "spawn_test") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let result = client .spawn::(EchoParams { @@ -45,9 +47,13 @@ async fn test_spawn_returns_valid_ids(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_multiple_tasks_get_unique_ids(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_multi").await; + let client = create_client(pool.clone(), "spawn_multi") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let result1 = client .spawn::(EchoParams { @@ -78,9 +84,13 @@ async fn test_spawn_multiple_tasks_get_unique_ids(pool: PgPool) -> sqlx::Result< #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_custom_max_attempts(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_attempts").await; + let client = create_client(pool.clone(), "spawn_attempts") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut options = SpawnOptions::default(); options.max_attempts = Some(10); @@ -103,9 +113,13 @@ async fn test_spawn_with_custom_max_attempts(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_retry_strategy_none(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_retry_none").await; + let client = create_client(pool.clone(), "spawn_retry_none") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut options = SpawnOptions::default(); options.retry_strategy = Some(RetryStrategy::None); @@ -127,9 +141,13 @@ async fn test_spawn_with_retry_strategy_none(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_retry_strategy_fixed(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_retry_fixed").await; + let client = create_client(pool.clone(), "spawn_retry_fixed") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut options = SpawnOptions::default(); options.retry_strategy = Some(RetryStrategy::Fixed { @@ -153,9 +171,13 @@ async fn test_spawn_with_retry_strategy_fixed(pool: PgPool) -> sqlx::Result<()> #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_retry_strategy_exponential(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_retry_exp").await; + let client = create_client(pool.clone(), "spawn_retry_exp") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut options = SpawnOptions::default(); options.retry_strategy = Some(RetryStrategy::Exponential { @@ -181,9 +203,13 @@ async fn test_spawn_with_retry_strategy_exponential(pool: PgPool) -> sqlx::Resul #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_headers(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_headers").await; + let client = create_client(pool.clone(), "spawn_headers") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut headers = HashMap::new(); headers.insert("correlation_id".to_string(), serde_json::json!("abc-123")); @@ -209,9 +235,13 @@ async fn test_spawn_with_headers(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_cancellation_policy(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_cancel").await; + let client = create_client(pool.clone(), "spawn_cancel") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut options = SpawnOptions::default(); options.cancellation = Some(CancellationPolicy { @@ -240,9 +270,13 @@ async fn test_spawn_with_cancellation_policy(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_by_name(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_by_name").await; + let client = create_client(pool.clone(), "spawn_by_name") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let params = serde_json::json!({ "message": "dynamic spawn" @@ -260,9 +294,13 @@ async fn test_spawn_by_name(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_by_name_invalid_params(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_by_name").await; + let client = create_client(pool.clone(), "spawn_by_name") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let params = serde_json::json!({ "message": 12345 @@ -287,9 +325,13 @@ async fn test_spawn_by_name_invalid_params(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_by_name_with_options(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_by_name_opts").await; + let client = create_client(pool.clone(), "spawn_by_name_opts") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let params = serde_json::json!({ "message": "value" @@ -317,9 +359,13 @@ async fn test_spawn_by_name_with_options(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_empty_params(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_empty").await; + let client = create_client(pool.clone(), "spawn_empty") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Empty object is not valid JSON params for EchoTask, // but spawn_by_name_unchecked does not validate the JSON @@ -335,9 +381,13 @@ async fn test_spawn_with_empty_params(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_complex_params(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_complex").await; + let client = create_client(pool.clone(), "spawn_complex") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Complex nested JSON structure - the params don't need to match the task's Params type // because spawn_by_name_unchecked does not validate the JSON @@ -375,12 +425,13 @@ async fn test_client_default_max_attempts(pool: PgPool) -> sqlx::Result<()> { .pool(pool) .queue_name("default_attempts") .default_max_attempts(3) + .register::() + .unwrap() .build() .await .expect("Failed to create client"); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn without specifying max_attempts - should use default of 3 let result = client @@ -401,9 +452,13 @@ async fn test_client_default_max_attempts(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_transaction_commit(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_tx_commit").await; + let client = create_client(pool.clone(), "spawn_tx_commit") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Create a test table sqlx::query("CREATE TABLE test_orders (id UUID PRIMARY KEY, status TEXT)") @@ -454,9 +509,13 @@ async fn test_spawn_with_transaction_commit(pool: PgPool) -> sqlx::Result<()> { #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_with_transaction_rollback(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "spawn_tx_rollback").await; + let client = create_client(pool.clone(), "spawn_tx_rollback") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Create a test table sqlx::query("CREATE TABLE test_orders_rb (id UUID PRIMARY KEY, status TEXT)") @@ -514,9 +573,13 @@ async fn test_spawn_with_transaction_rollback(pool: PgPool) -> sqlx::Result<()> #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_rejects_reserved_header_prefix(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "reserved_headers").await; + let client = create_client(pool.clone(), "reserved_headers") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut headers = HashMap::new(); headers.insert("durable::custom".to_string(), serde_json::json!("value")); @@ -546,9 +609,13 @@ async fn test_spawn_rejects_reserved_header_prefix(pool: PgPool) -> sqlx::Result #[sqlx::test(migrator = "MIGRATOR")] async fn test_spawn_allows_non_reserved_headers(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "allowed_headers").await; + let client = create_client(pool.clone(), "allowed_headers") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let mut headers = HashMap::new(); // These should all be allowed - they don't start with "durable::" @@ -582,17 +649,11 @@ async fn test_spawn_allows_non_reserved_headers(pool: PgPool) -> sqlx::Result<() #[sqlx::test(migrator = "MIGRATOR")] async fn test_register_duplicate_task_errors(pool: PgPool) -> sqlx::Result<()> { - let client = create_client(pool.clone(), "register_dup").await; - - // First registration should succeed - client + let result = create_client(pool.clone(), "register_dup") .register::() - .await - .expect("First registration should succeed"); - - // Second registration of the same task should fail - let result2 = client.register::().await; - match result2 { + .unwrap() + .register::(); // second registration - should be Err + match result { Ok(_) => panic!("Duplicate registration should fail"), Err(err) => { assert!( diff --git a/tests/telemetry_test.rs b/tests/telemetry_test.rs index 9971624..1a2325e 100644 --- a/tests/telemetry_test.rs +++ b/tests/telemetry_test.rs @@ -12,7 +12,7 @@ mod common; use common::helpers::wait_for_task_state; use common::tasks::{EchoParams, EchoTask, FailingParams, FailingTask, MultiStepTask}; -use durable::{Durable, MIGRATOR, WorkerOptions}; +use durable::{Durable, DurableBuilder, MIGRATOR, WorkerOptions}; use metrics_util::CompositeKey; use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshot}; use ordered_float::OrderedFloat; @@ -20,14 +20,9 @@ use sqlx::PgPool; use std::sync::OnceLock; use std::time::Duration; -/// Helper to create a Durable client from the test pool. -async fn create_client(pool: PgPool, queue_name: &str) -> Durable { - Durable::builder() - .pool(pool) - .queue_name(queue_name) - .build() - .await - .expect("Failed to create Durable client") +/// Helper to create a DurableBuilder from the test pool. +fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder { + Durable::builder().pool(pool).queue_name(queue_name) } // ============================================================================ @@ -140,9 +135,13 @@ async fn test_task_lifecycle_metrics(pool: PgPool) -> sqlx::Result<()> { let snapshotter = get_snapshotter(); let queue_name = "metrics_lifecycle"; - let client = create_client(pool.clone(), queue_name).await; + let client = create_client(pool.clone(), queue_name) + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a task let spawn_result = client @@ -227,9 +226,13 @@ async fn test_task_failure_metrics(pool: PgPool) -> sqlx::Result<()> { let baseline = snapshotter.snapshot(); let baseline_failed_count = count_metrics_by_name(baseline, "durable_tasks_failed_total"); - let client = create_client(pool.clone(), "metrics_failure").await; + let client = create_client(pool.clone(), "metrics_failure") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // Spawn a task that will fail let spawn_result = client @@ -280,9 +283,13 @@ async fn test_worker_gauge_metrics(pool: PgPool) -> sqlx::Result<()> { let snapshotter = get_snapshotter(); let queue_name = "metrics_worker"; - let client = create_client(pool.clone(), queue_name).await; + let client = create_client(pool.clone(), queue_name) + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let worker = client .start_worker(WorkerOptions { @@ -343,9 +350,13 @@ async fn test_checkpoint_metrics(pool: PgPool) -> sqlx::Result<()> { let baseline_ckpt_count = count_metrics_by_name(baseline, "durable_checkpoint_duration_seconds"); - let client = create_client(pool.clone(), "metrics_checkpoint").await; + let client = create_client(pool.clone(), "metrics_checkpoint") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); // MultiStepTask has steps which record checkpoint metrics let spawn_result = client @@ -403,9 +414,13 @@ async fn test_task_execution_duration_metrics(pool: PgPool) -> sqlx::Result<()> let baseline_duration_count = count_metrics_by_name(baseline, "durable_task_execution_duration_seconds"); - let client = create_client(pool.clone(), "metrics_exec_dur").await; + let client = create_client(pool.clone(), "metrics_exec_dur") + .register::() + .unwrap() + .build() + .await + .unwrap(); client.create_queue(None).await.unwrap(); - client.register::().await.unwrap(); let spawn_result = client .spawn::(EchoParams {