From 32ba6204b8cd849d8f96c05edea1a67aec77be8c Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Tue, 31 Mar 2026 10:58:45 -0600 Subject: [PATCH] Update docs for io_result-based when_all/when_any signatures (#244) All documentation and Antora example pages still described the old API where when_all/when_any accepted plain task and returned tuples/variants without error_code. Update to match the current io_result-aware combinators: - when_all children must be io_task, result is io_result - when_any result is variant with error at index 0; only !ec wins - Replace monostate references with tuple<> for void tasks - Replace fictitious timeout_after examples with the real timeout combinator - Fix when_any docs that incorrectly described exceptions as valid completions --- .../pages/4.coroutines/4f.composition.adoc | 153 +++++----- .../7.examples/7b.producer-consumer.adoc | 20 +- .../pages/7.examples/7g.parallel-fetch.adoc | 113 ++++---- .../7.examples/7k.strand-serialization.adoc | 16 +- .../ROOT/pages/7.examples/7l.async-mutex.adoc | 14 +- .../pages/7.examples/7m.parallel-tasks.adoc | 10 +- .../pages/7.examples/7n.custom-executor.adoc | 17 +- doc/unlisted/coroutines-when-all.adoc | 122 +++++--- doc/unlisted/coroutines-when-any.adoc | 267 ++++++++++-------- .../when_any_cancellation.cpp | 17 -- 10 files changed, 414 insertions(+), 335 deletions(-) diff --git a/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc b/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc index ab4c6a81e..d4750b19b 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc @@ -34,82 +34,95 @@ task<> concurrent() == when_all: Wait for All Tasks -`when_all` launches multiple tasks concurrently and waits for all of them to complete: +`when_all` launches multiple `io_task` children concurrently and waits for all of them to complete. It returns `task>`, a single `ec` plus the flattened payloads: [source,cpp] ---- #include -task fetch_a() { co_return 1; } -task fetch_b() { co_return 2; } -task fetch_c() { co_return "hello"; } +io_task fetch_a() { co_return io_result{{}, 1}; } +io_task fetch_b() { co_return io_result{{}, 2}; } +io_task fetch_c() { co_return io_result{{}, "hello"}; } task<> example() { - auto [a, b, c] = co_await when_all(fetch_a(), fetch_b(), fetch_c()); - + auto [ec, a, b, c] = co_await when_all(fetch_a(), fetch_b(), fetch_c()); + + // ec == std::error_code{} (success) // a == 1 // b == 2 // c == "hello" } ---- -=== Result Tuple +=== Result Type -`when_all` returns a tuple of results in the same order as the input tasks. Use structured bindings to unpack them. +`when_all` returns `io_result` where each `Ri` is the child's payload flattened: `io_result` contributes `T`, `io_result<>` contributes `tuple<>`. Check `ec` first; values are only meaningful when `!ec`. -=== Void Tasks +=== Void io_tasks -Tasks returning `void` contribute `std::monostate` to the result tuple, preserving the task-index-to-result-index mapping: +`io_task<>` children contribute `tuple<>` to the result: [source,cpp] ---- -task<> void_task() { co_return; } -task int_task() { co_return 42; } +io_task<> void_task() { co_return io_result<>{}; } +io_task int_task() { co_return io_result{{}, 42}; } task<> example() { - auto [a, b, c] = co_await when_all(int_task(), void_task(), int_task()); - // a == 42 (int, index 0) - // b == monostate (monostate, index 1) - // c == 42 (int, index 2) + auto [ec, a, b, c] = co_await when_all(int_task(), void_task(), int_task()); + // a == 42 (int) + // b == tuple<> (from void io_task) + // c == 42 (int) } ---- -If all tasks return `void`, `when_all` returns a tuple of `std::monostate`: +When all children are `io_task<>`, just check `r.ec`: [source,cpp] ---- task<> example() { - auto [a, b] = co_await when_all(void_task_a(), void_task_b()); - // a and b are std::monostate + auto r = co_await when_all(void_task_a(), void_task_b()); + if (r.ec) + // handle error } ---- === Error Handling -If any task throws an exception: +I/O errors are reported through the `ec` field of the `io_result`. When any child returns a non-zero `ec`: + +1. Stop is requested for sibling tasks +2. All tasks complete (or respond to stop) +3. The first `ec` is propagated in the outer `io_result` + +[source,cpp] +---- +task<> example() +{ + auto [ec, a, b] = co_await when_all(task_a(), task_b()); + if (ec) + std::cerr << "Error: " << ec.message() << "\n"; +} +---- -1. The exception is captured -2. Stop is requested for sibling tasks -3. All tasks are allowed to complete (or respond to stop) -4. The *first* exception is rethrown; later exceptions are discarded +If a task throws an exception, it is captured and rethrown after all tasks complete. Exceptions take priority over `ec`. [source,cpp] ---- -task might_fail(bool fail) +io_task might_throw(bool fail) { if (fail) throw std::runtime_error("failed"); - co_return 42; + co_return io_result{{}, 42}; } task<> example() { try { - co_await when_all(might_fail(true), might_fail(false)); + co_await when_all(might_throw(true), might_throw(false)); } catch (std::runtime_error const& e) { @@ -124,23 +137,24 @@ When one task fails, `when_all` requests stop for its siblings. Well-behaved tas [source,cpp] ---- -task<> long_running() +io_task<> long_running() { - auto token = co_await get_stop_token(); - + auto token = co_await this_coro::stop_token; + for (int i = 0; i < 1000; ++i) { if (token.stop_requested()) - co_return; // Exit early when sibling fails - + co_return io_result<>{}; // Exit early when sibling fails + co_await do_iteration(); } + co_return io_result<>{}; } ---- -== when_any: First-to-Finish Wins +== when_any: First-to-Succeed Wins -`when_any` launches multiple tasks concurrently and returns when the *first* one completes: +`when_any` launches multiple `io_task` children concurrently and returns when the first one *succeeds* (`!ec`): [source,cpp] ---- @@ -149,17 +163,19 @@ task<> long_running() task<> example() { auto result = co_await when_any( - fetch_int(), // task - fetch_string() // task + fetch_int(), // io_task + fetch_string() // io_task ); - // result.index() indicates which task won (0 or 1) - // result is std::variant + // result is std::variant + // index 0: all tasks failed (error_code) + // index 1: fetch_int won + // index 2: fetch_string won } ---- -The result is a variant with one alternative per input task, preserving positional correspondence. Use `.index()` to determine the winner. When a winner is determined, stop is requested for all siblings. All tasks complete before `when_any` returns. +The result is a `variant` with `error_code` at index 0 (failure/no winner) and one alternative per input task at indices 1..N. Only tasks returning `!ec` can win; errors and exceptions do not count as winning. When a winner is found, stop is requested for all siblings. All tasks complete before `when_any` returns. -For detailed coverage including error handling, cancellation, and the vector overload, see Racing Tasks. +For detailed coverage including error handling, cancellation, and the range overload, see Racing Tasks. == Practical Patterns @@ -169,66 +185,68 @@ Fetch multiple resources simultaneously: [source,cpp] ---- -task fetch_page_data(std::string url) +io_task fetch_page_data(std::string url) { - auto [header, body, sidebar] = co_await when_all( + auto [ec, header, body, sidebar] = co_await when_all( fetch_header(url), fetch_body(url), fetch_sidebar(url) ); - - co_return page_data{ + if (ec) + co_return io_result{ec, {}}; + + co_return io_result{{}, { std::move(header), std::move(body), std::move(sidebar) - }; + }}; } ---- === Fan-Out/Fan-In -Process items in parallel, then combine results: +Process items in parallel, then combine results using the range overload: [source,cpp] ---- -task process_item(item const& i); +io_task process_item(item const& i); task process_all(std::vector const& items) { - std::vector> tasks; + std::vector> tasks; for (auto const& item : items) tasks.push_back(process_item(item)); - - // This requires a range-based when_all (not yet available) - // For now, use fixed-arity when_all - + + auto [ec, results] = co_await when_all(std::move(tasks)); + if (ec) + co_return 0; + int total = 0; - // ... accumulate results + for (auto v : results) + total += v; co_return total; } ---- -=== Timeout with Fallback +=== Timeout -Use `when_any` to implement timeout with fallback: +The `timeout` combinator races an awaitable against a deadline: [source,cpp] ---- -task fetch_with_timeout(Request req) -{ - auto result = co_await when_any( - fetch_data(req), - timeout_after(100ms) - ); - - if (result.index() == 1) - throw timeout_error{"Request timed out"}; +#include - co_return std::get<0>(result); +task<> example() +{ + auto [ec, n] = co_await timeout(sock.read_some(buf), 50ms); + if (ec == cond::timeout) + { + // deadline expired before read completed + } } ---- -The `timeout_after` helper waits for the specified duration then throws. If `fetch_data` completes first, its result is returned. If the timer wins, the timeout exception propagates. +`timeout` returns the same `io_result` type as the inner awaitable. On timeout, `ec` is set to `error::timeout` and payload values are default-initialized. Unlike `when_any`, exceptions from the inner awaitable are always propagated and never swallowed by the timer. == Implementation Notes @@ -262,6 +280,9 @@ This design ensures proper context propagation to all children. | `` | First-completion racing with when_any + +| `` +| Race an awaitable against a deadline |=== You have now learned how to compose tasks concurrently with `when_all` and `when_any`. In the next section, you will learn about frame allocators for customizing coroutine memory allocation. diff --git a/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc b/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc index c00cf6859..00bf16419 100644 --- a/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc +++ b/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc @@ -37,27 +37,27 @@ int main() capy::async_event data_ready; int shared_value = 0; - auto producer = [&]() -> capy::task<> { + auto producer = [&]() -> capy::io_task<> { std::cout << "Producer: preparing data...\n"; shared_value = 42; std::cout << "Producer: data ready, signaling\n"; data_ready.set(); - co_return; + co_return capy::io_result<>{}; }; - auto consumer = [&]() -> capy::task<> { + auto consumer = [&]() -> capy::io_task<> { std::cout << "Consumer: waiting for data...\n"; auto [ec] = co_await data_ready.wait(); (void)ec; std::cout << "Consumer: received value " << shared_value << "\n"; - co_return; + co_return capy::io_result<>{}; }; // Run both tasks concurrently using when_all, through a strand. // The strand serializes execution, ensuring thread-safe access // to the shared async_event and shared_value. auto run_both = [&]() -> capy::task<> { - co_await capy::when_all(producer(), consumer()); + (void) co_await capy::when_all(producer(), consumer()); }; capy::run_async(s, on_complete, on_error)(run_both()); @@ -99,12 +99,12 @@ capy::async_event data_ready; [source,cpp] ---- -auto producer = [&]() -> capy::task<> { +auto producer = [&]() -> capy::io_task<> { std::cout << "Producer: preparing data...\n"; shared_value = 42; std::cout << "Producer: data ready, signaling\n"; data_ready.set(); - co_return; + co_return capy::io_result<>{}; }; ---- @@ -114,12 +114,12 @@ The producer prepares data and signals completion by calling `set()`. [source,cpp] ---- -auto consumer = [&]() -> capy::task<> { +auto consumer = [&]() -> capy::io_task<> { std::cout << "Consumer: waiting for data...\n"; auto [ec] = co_await data_ready.wait(); (void)ec; std::cout << "Consumer: received value " << shared_value << "\n"; - co_return; + co_return capy::io_result<>{}; }; ---- @@ -133,7 +133,7 @@ The consumer waits until the event is set. The `co_await data_ready.wait()` susp // The strand serializes execution, ensuring thread-safe access // to the shared async_event and shared_value. auto run_both = [&]() -> capy::task<> { - co_await capy::when_all(producer(), consumer()); + (void) co_await capy::when_all(producer(), consumer()); }; capy::run_async(s, on_complete, on_error)(run_both()); diff --git a/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc b/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc index e36db4e51..78e610a76 100644 --- a/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc +++ b/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc @@ -21,6 +21,7 @@ Running multiple operations concurrently with `when_all`. #include #include #include +#include namespace capy = boost::capy; @@ -50,24 +51,29 @@ capy::task fetch_account_balance(int user_id) co_return user_id * 1.5; // Fake balance } -// Fetch all user data in parallel +// Fetch all user data in parallel using variadic when_all. +// Heterogeneous return types are flattened into the result. capy::task<> fetch_user_dashboard(std::string username) { std::cout << "\n=== Fetching dashboard for: " << username << " ===\n"; - + // First, get the user ID (needed for other queries) int user_id = co_await fetch_user_id(username); std::cout << "Got user ID: " << user_id << "\n\n"; - - // Now fetch all user data in parallel + + // when_all requires io_task children. Wrap plain tasks: std::cout << "Starting parallel fetches...\n"; - // name: std::string, orders: int, balance: double - auto [name, orders, balance] = co_await capy::when_all( - fetch_user_name(user_id), - fetch_order_count(user_id), - fetch_account_balance(user_id) - ); - + + auto wrap = [](auto inner) -> capy::io_task { + co_return capy::io_result{ + {}, co_await std::move(inner)}; + }; + + auto [ec, name, orders, balance] = co_await capy::when_all( + wrap(fetch_user_name(user_id)), + wrap(fetch_order_count(user_id)), + wrap(fetch_account_balance(user_id))); + std::cout << "\nDashboard results:\n"; std::cout << " Name: " << name << "\n"; std::cout << " Orders: " << orders << "\n"; @@ -75,61 +81,60 @@ capy::task<> fetch_user_dashboard(std::string username) } // Example with void tasks -capy::task<> log_access(std::string resource) +capy::io_task<> log_access(std::string resource) { std::cout << "Logging access to: " << resource << "\n"; - co_return; + co_return capy::io_result<>{}; } -capy::task<> update_metrics(std::string metric) +capy::io_task<> update_metrics(std::string metric) { std::cout << "Updating metric: " << metric << "\n"; - co_return; + co_return capy::io_result<>{}; } capy::task fetch_with_side_effects() { std::cout << "\n=== Fetch with side effects ===\n"; - - // void tasks don't contribute to result tuple - std::tuple results = co_await capy::when_all( - log_access("api/data"), // void - no result - update_metrics("api_calls"), // void - no result - fetch_user_name(42) // returns string - ); - std::string data = std::get<0>(results); // std::string - + + auto r = co_await capy::when_all( + log_access("api/data"), + update_metrics("api_calls")); + if (r.ec) + co_return "error"; + + auto data = co_await fetch_user_name(42); + std::cout << "Data: " << data << "\n"; co_return data; } // Error handling example -capy::task might_fail(bool should_fail, std::string name) +capy::io_task might_fail(bool should_fail, std::string name) { std::cout << "Task " << name << " starting\n"; - + if (should_fail) { throw std::runtime_error(name + " failed!"); } - + std::cout << "Task " << name << " completed\n"; - co_return 42; + co_return capy::io_result{{}, 42}; } capy::task<> demonstrate_error_handling() { std::cout << "\n=== Error handling ===\n"; - + try { - // a: int, b: int, c: int - auto [a, b, c] = co_await capy::when_all( + auto [ec2, a, b, c] = co_await capy::when_all( might_fail(false, "A"), might_fail(true, "B"), // This one fails - might_fail(false, "C") - ); - std::cout << "All succeeded: " << a << ", " << b << ", " << c << "\n"; + might_fail(false, "C")); + std::cout << "All succeeded: " << a << ", " + << b << ", " << c << "\n"; } catch (std::runtime_error const& e) { @@ -152,7 +157,7 @@ int main() capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice")); capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects()); capy::run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling()); - + done.wait(); // Block until all tasks complete return 0; } @@ -172,28 +177,26 @@ target_link_libraries(parallel_fetch PRIVATE capy) [source,cpp] ---- -auto [name, orders, balance] = co_await capy::when_all( - fetch_user_name(user_id), - fetch_order_count(user_id), - fetch_account_balance(user_id) -); +auto [ec, name, orders, balance] = co_await capy::when_all( + wrap(fetch_user_name(user_id)), + wrap(fetch_order_count(user_id)), + wrap(fetch_account_balance(user_id))); ---- -All three tasks run concurrently. `when_all` completes when all tasks finish. Results are returned in a tuple matching input order. +`when_all` requires children returning `io_result`, so plain tasks are wrapped. All three run concurrently. The result is `io_result`, a single `ec` plus the flattened payloads in input order. -=== Void Filtering +=== Void io_tasks [source,cpp] ---- -std::tuple results = co_await capy::when_all( - log_access("api/data"), // void - filtered out - update_metrics("api_calls"), // void - filtered out - fetch_user_name(42) // string - in tuple -); -std::string data = std::get<0>(results); // std::string +auto r = co_await capy::when_all( + log_access("api/data"), + update_metrics("api_calls")); +if (r.ec) + co_return "error"; ---- -Tasks returning `void` don't contribute to the result tuple. Only non-void results appear. +`io_task<>` children return `io_result<>` (just an error code, no payload). Check `r.ec` to detect failure. === Error Propagation @@ -201,7 +204,10 @@ Tasks returning `void` don't contribute to the result tuple. Only non-void resul ---- try { - auto results = co_await capy::when_all(task_a(), task_b(), task_c()); + auto [ec2, a, b, c] = co_await capy::when_all( + might_fail(false, "A"), + might_fail(true, "B"), + might_fail(false, "C")); } catch (...) { @@ -210,12 +216,7 @@ catch (...) } ---- -When a task throws: - -1. The exception is captured -2. Stop is requested for siblings -3. All tasks complete (or respond to stop) -4. First exception is rethrown +I/O errors are reported via `ec` in the `io_result`. Thrown exceptions are captured separately — Upon error cancellation is requested and the first exception is rethrown after all tasks complete. == Output diff --git a/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc b/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc index 2b0374d3d..8509d7403 100644 --- a/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc +++ b/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc @@ -20,6 +20,7 @@ Protecting shared state with a strand instead of a mutex. #include #include #include +#include namespace capy = boost::capy; @@ -48,20 +49,19 @@ int main() // Each coroutine increments the shared counter without locks. // The strand ensures only one coroutine runs at a time. - auto increment = [&](int id) -> capy::task<> { + auto increment = [&](int id) -> capy::io_task<> { for (int i = 0; i < increments_per_coro; ++i) ++counter; std::cout << "Coroutine " << id << " finished, counter = " << counter << "\n"; - co_return; + co_return capy::io_result<>{}; }; auto run_all = [&]() -> capy::task<> { - co_await capy::when_all( - increment(0), increment(1), increment(2), - increment(3), increment(4), increment(5), - increment(6), increment(7), increment(8), - increment(9)); + std::vector> tasks; + for (int i = 0; i < num_coroutines; ++i) + tasks.push_back(increment(i)); + (void) co_await capy::when_all(std::move(tasks)); }; capy::run_async(s, on_complete, on_error)(run_all()); @@ -100,7 +100,7 @@ A `strand` wraps an executor and guarantees that handlers dispatched through it ---- int counter = 0; -auto increment = [&](int id) -> capy::task<> { +auto increment = [&](int id) -> capy::io_task<> { for (int i = 0; i < increments_per_coro; ++i) ++counter; // ... diff --git a/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc b/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc index 1ebc4a5c5..c0eb10a7a 100644 --- a/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc +++ b/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc @@ -46,14 +46,14 @@ int main() int acquisition_order = 0; std::vector order_log; - auto worker = [&](int id) -> capy::task<> { + auto worker = [&](int id) -> capy::io_task<> { std::cout << "Worker " << id << " waiting for lock\n"; auto [ec, guard] = co_await mtx.scoped_lock(); if (ec) { std::cout << "Worker " << id << " canceled: " << ec.message() << "\n"; - co_return; + co_return capy::io_result<>{ec}; } int seq = acquisition_order++; @@ -61,15 +61,17 @@ int main() std::cout << "Worker " << id << " acquired lock (sequence " << seq << ")\n"; - // Simulate work inside the critical section std::cout << "Worker " << id << " releasing lock\n"; - co_return; + co_return capy::io_result<>{}; }; auto run_all = [&]() -> capy::task<> { - co_await capy::when_all( + auto r = co_await capy::when_all( worker(0), worker(1), worker(2), worker(3), worker(4), worker(5)); + if(r.ec) + std::cerr << "when_all error: " + << r.ec.message() << "\n"; }; // Run on a strand so async_mutex operations are single-threaded @@ -116,7 +118,7 @@ auto [ec, guard] = co_await mtx.scoped_lock(); if (ec) { // Lock was canceled - co_return; + co_return capy::io_result<>{ec}; } ---- diff --git a/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc b/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc index 5da558441..a35250346 100644 --- a/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc +++ b/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc @@ -25,7 +25,7 @@ Distributing CPU-bound work across a thread pool and collecting results. namespace capy = boost::capy; // Sum integers in [lo, hi) -capy::task partial_sum(int lo, int hi) +capy::io_task partial_sum(int lo, int hi) { std::ostringstream oss; oss << " range [" << lo << ", " << hi @@ -35,7 +35,7 @@ capy::task partial_sum(int lo, int hi) long long sum = 0; for (int i = lo; i < hi; ++i) sum += i; - co_return sum; + co_return capy::io_result{{}, sum}; } int main() @@ -63,7 +63,7 @@ int main() std::cout << "Dispatching " << num_tasks << " parallel tasks...\n"; - auto [s0, s1, s2, s3] = co_await capy::when_all( + auto [ec, s0, s1, s2, s3] = co_await capy::when_all( partial_sum(0 * chunk, 1 * chunk), partial_sum(1 * chunk, 2 * chunk), partial_sum(2 * chunk, 3 * chunk), @@ -113,14 +113,14 @@ The range `[0, 10000)` is divided into 4 equal chunks, one per task. Each task c [source,cpp] ---- -auto [s0, s1, s2, s3] = co_await capy::when_all( +auto [ec, s0, s1, s2, s3] = co_await capy::when_all( partial_sum(0 * chunk, 1 * chunk), partial_sum(1 * chunk, 2 * chunk), partial_sum(2 * chunk, 3 * chunk), partial_sum(3 * chunk, 4 * chunk)); ---- -`when_all` launches all four tasks concurrently on the thread pool. Each task may run on a different thread. Results are returned via structured bindings in the same order as the input tasks. +`when_all` launches all four tasks concurrently on the thread pool. Each task may run on a different thread. The result is `io_result`, a single `ec` plus the four partial sums in input order. === Observing Thread IDs diff --git a/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc b/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc index 7647ddcc1..2c9183118 100644 --- a/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc +++ b/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc @@ -125,24 +125,23 @@ run_loop::get_executor() noexcept // Verify the concept is satisfied static_assert(capy::Executor); -capy::task compute(int x) +capy::io_task compute(int x) { std::cout << " computing " << x << " * " << x << "\n"; - co_return x * x; + co_return capy::io_result{{}, x * x}; } capy::task<> run_tasks() { std::cout << "Launching 3 tasks with when_all...\n"; - auto [a, b, c] = co_await capy::when_all( - compute(3), - compute(7), - compute(11)); + auto [ec, r1, r2, r3] = co_await capy::when_all( + compute(3), compute(7), compute(11)); - std::cout << "\nResults: " << a << ", " << b << ", " << c - << "\n"; - std::cout << "Sum of squares: " << a + b + c << "\n"; + std::cout << "\nResults: " << r1 << ", " << r2 + << ", " << r3 << "\n"; + std::cout << "Sum of squares: " + << r1 + r2 + r3 << "\n"; } int main() diff --git a/doc/unlisted/coroutines-when-all.adoc b/doc/unlisted/coroutines-when-all.adoc index 69d1912f1..e58644e55 100644 --- a/doc/unlisted/coroutines-when-all.adoc +++ b/doc/unlisted/coroutines-when-all.adoc @@ -19,11 +19,15 @@ Tasks are sequential by default. When you await multiple tasks: [source,cpp] ---- -task sequential() +io_task fetch_a(); +io_task fetch_b(); +io_task fetch_c(); + +task<> sequential() { - int a = co_await fetch_a(); // Wait for A - int b = co_await fetch_b(); // Then wait for B - int c = co_await fetch_c(); // Then wait for C + auto [ec_a, a] = co_await fetch_a(); // Wait for A + auto [ec_b, b] = co_await fetch_b(); // Then wait for B + auto [ec_c, c] = co_await fetch_c(); // Then wait for C // Total time: A + B + C } ---- @@ -40,9 +44,9 @@ all of them to complete: ---- #include -task concurrent() +task<> concurrent() { - auto [a, b, c] = co_await when_all( + auto [ec, a, b, c] = co_await when_all( fetch_a(), fetch_b(), fetch_c() @@ -52,55 +56,80 @@ task concurrent() ---- All three fetches run in parallel. The `co_await` completes when the slowest -one finishes. +one finishes. Children must return `io_result<...>` (i.e., be `io_task`). == Return Value -`when_all` returns a tuple of results. Void tasks contribute `std::monostate` to preserve the task-index-to-result-index mapping: +`when_all` returns `task>`, a single `ec` followed by each child's payload type. Each `Ri` is the child's payload flattened: `io_result` contributes `T`, `io_result<>` contributes `tuple<>`. [source,cpp] ---- -// All non-void: get a tuple of all results -auto [x, y] = co_await when_all( - task_returning_int(), // task - task_returning_string() // task +// All non-void: single ec + flattened payloads +auto [ec, x, y] = co_await when_all( + task_returning_int(), // io_task + task_returning_string() // io_task ); -// x is int, y is std::string +// ec is std::error_code, x is int, y is std::string -// Mixed with void: void tasks contribute monostate -auto [a, b, c] = co_await when_all( - task_returning_int(), // task — index 0 - task_void(), // task — index 1 → monostate - task_void() // task — index 2 → monostate +// Mixed with void: io_task<> contributes tuple<> +auto [ec2, a, b, c] = co_await when_all( + task_returning_int(), // io_task — a is int + task_void(), // io_task<> — b is tuple<> + task_void() // io_task<> — c is tuple<> ); -// a is int, b and c are std::monostate -// All void: returns tuple of monostate -auto [m, n] = co_await when_all( +// All void: just check ec +auto r = co_await when_all( task_void(), task_void() ); -// m and n are std::monostate +if (r.ec) + // handle error ---- -Results appear in the same order as the input tasks. +Results appear in the same order as the input tasks. Values are only meaningful when `!ec`. == Error Handling -Exceptions propagate from child tasks to the parent. When a task throws: +There are two error paths. I/O errors are reported through the `ec` field of the returned `io_result`. Thrown exceptions are captured separately. + +=== I/O Errors (ec) + +When a child returns a non-zero `ec`: + +1. Stop is requested for sibling tasks +2. All tasks are allowed to complete (or respond to stop) +3. The first `ec` is propagated in the outer `io_result` + +[source,cpp] +---- +task<> handle_errors() +{ + auto [ec, a, b] = co_await when_all( + might_fail_io(), + another_io_task() + ); + if (ec) + std::cerr << "I/O error: " << ec.message() << "\n"; +} +---- + +=== Exceptions + +When a child throws: 1. The exception is captured 2. Stop is requested for sibling tasks 3. All tasks are allowed to complete (or respond to stop) -4. The first exception is rethrown +4. The first exception is rethrown (exception takes priority over `ec`) [source,cpp] ---- -task handle_errors() +task<> handle_exceptions() { try { co_await when_all( - might_fail(), + might_throw(), another_task(), third_task() ); @@ -113,8 +142,9 @@ task handle_errors() === First-Error Semantics -Only the first exception is captured; subsequent exceptions are discarded. -This matches the behavior of most concurrent frameworks. +Only the first error wins, whether `ec` or exception. Subsequent errors +are discarded. If both an exception and an `ec` occur, the exception takes +priority. === Stop Propagation @@ -123,20 +153,21 @@ that support cancellation can respond by exiting early: [source,cpp] ---- -task cancellable_work() +io_task<> cancellable_work() { auto token = co_await this_coro::stop_token; for (int i = 0; i < 1000; ++i) { if (token.stop_requested()) - co_return; // Exit early + co_return io_result<>{}; // Exit early co_await do_chunk(i); } + co_return io_result<>{}; } -task example() +task<> example() { - // If failing_task throws, cancellable_work sees stop_requested + // If failing_task returns ec, cancellable_work sees stop_requested co_await when_all( failing_task(), cancellable_work() @@ -151,7 +182,7 @@ cancelled, all children see the request: [source,cpp] ---- -task parent() +task<> parent() { // Parent has a stop token from run_async co_await when_all( @@ -173,7 +204,7 @@ All child tasks inherit the parent's executor affinity: [source,cpp] ---- -task parent() // Running on executor ex +task<> parent() // Running on executor ex { co_await when_all( child_a(), // Runs on ex @@ -212,19 +243,26 @@ run_async(pool.get_executor())(parent()); [source,cpp] ---- -task fetch(http_client& client, std::string url) +io_task fetch(http_client& client, std::string url) { - co_return co_await client.get(url); + auto [ec, body] = co_await client.get(url); + co_return io_result{ec, std::move(body)}; } -task fetch_all(http_client& client) +task<> fetch_all(http_client& client) { - auto [home, about, contact] = co_await when_all( + auto [ec, home, about, contact] = co_await when_all( fetch(client, "https://example.com/"), fetch(client, "https://example.com/about"), fetch(client, "https://example.com/contact") ); + if (ec) + { + std::cerr << "Fetch failed: " << ec.message() << "\n"; + co_return; + } + std::cout << "Home: " << home.size() << " bytes\n"; std::cout << "About: " << about.size() << " bytes\n"; std::cout << "Contact: " << contact.size() << " bytes\n"; @@ -252,13 +290,13 @@ Do NOT use `when_all` when: | Feature | Description | `when_all(tasks...)` -| Launch tasks concurrently, wait for all +| Launch `io_task` children concurrently, wait for all | Return type -| Tuple of results in input order (`monostate` for void tasks) +| `task>` — single `ec` + flattened payloads | Error handling -| First exception propagated, siblings get stop +| First `ec` or exception propagated, siblings get stop | Affinity | Children inherit parent's executor diff --git a/doc/unlisted/coroutines-when-any.adoc b/doc/unlisted/coroutines-when-any.adoc index 0ecb560ea..eb21ca7c8 100644 --- a/doc/unlisted/coroutines-when-any.adoc +++ b/doc/unlisted/coroutines-when-any.adoc @@ -9,7 +9,7 @@ = Racing Tasks -In this tutorial, you will learn how to race multiple concurrent tasks using `when_any`, returning as soon as the first task completes. This pattern is essential for implementing timeouts, redundant requests, and speculative execution. +In this tutorial, you will learn how to race multiple concurrent tasks using `when_any`, returning as soon as the first task *succeeds* (`!ec`). This pattern is essential for implementing redundant requests and speculative execution. By the end of this page, you will understand how to use both the variadic and range-based overloads of `when_any`, handle the result types correctly, and manage cancellation of sibling tasks. @@ -26,152 +26,198 @@ NOTE: Code snippets assume `using namespace boost::capy;` is in effect. Sometimes you need the result from whichever task finishes first, not all of them. Common scenarios include: * Racing requests to multiple servers, using the first response -* Implementing timeouts by racing against a timer * Speculative execution of multiple algorithms * Waiting for first available resource from a pool == when_any The `when_any` function launches multiple tasks concurrently and returns when -the first one completes: +the first one *succeeds* (`!ec`): [source,cpp] ---- #include -task race() +task<> race() { auto result = co_await when_any( - fetch_from_primary(), - fetch_from_backup() + fetch_from_primary(), // io_task + fetch_from_backup() // io_task ); - // result.index() is 0 or 1 + // result is variant + // result.index() == 0: all tasks failed + // result.index() == 1: primary won + // result.index() == 2: backup won } ---- -The return value is a `std::variant` with one alternative per input task. Use `.index()` to determine which task completed first (0 for the first argument, 1 for the second). The active alternative holds the winning task's return value. +The return value is a `std::variant` with `error_code` at index 0 (failure/no winner) followed by one alternative per input task. Index 1 corresponds to the first argument, index 2 to the second, and so on. A task *wins* only if it returns `!ec`. Errors and exceptions do not count as winning. -The winning task's result is returned immediately. All sibling tasks receive -a stop request and are allowed to complete before `when_any` returns. +Once a winner is found, all sibling tasks receive a stop request. All tasks +complete before `when_any` returns. == Return Value -The variadic `when_any` returns a `std::variant` with positional correspondence to the input tasks. +The variadic `when_any` returns `task>`. Index 0 is the failure case (`error_code`). Indices 1..N correspond to the winning child's payload. === Heterogeneous Tasks (Variadic) -When racing tasks with different return types, the result is a variant: +When racing tasks with different return types: [source,cpp] ---- auto result = co_await when_any( - task_returning_int(), // task - task_returning_string() // task + task_returning_int(), // io_task + task_returning_string() // io_task ); if (result.index() == 0) - std::cout << "Got int: " << std::get<0>(result) << "\n"; + std::cerr << "All failed: " << std::get<0>(result).message() << "\n"; +else if (result.index() == 1) + std::cout << "Got int: " << std::get<1>(result) << "\n"; else - std::cout << "Got string: " << std::get<1>(result) << "\n"; + std::cout << "Got string: " << std::get<2>(result) << "\n"; ---- -The `result` variable is a `std::variant`. Use `.index()` to determine which alternative is active, then extract the value with `std::get`. +The `result` variable is a `std::variant`. Index 0 is the failure case. Use `result.index() - 1` to identify which child won. === Void Tasks -Void tasks contribute `std::monostate` to the variant: +`io_task<>` children contribute `std::tuple<>` to the variant: [source,cpp] ---- auto result = co_await when_any( - task_returning_int(), // task - task_void() // task + task_returning_int(), // io_task + task_void() // io_task<> ); if (result.index() == 0) - std::cout << "Got int: " << std::get<0>(result) << "\n"; + std::cerr << "All failed\n"; +else if (result.index() == 1) + std::cout << "Got int: " << std::get<1>(result) << "\n"; else - std::cout << "Void task completed\n"; + std::cout << "Void task won\n"; ---- -Tasks returning `void` contribute `std::monostate` to the variant. In this example, `result` has type `std::variant`. Use `.index()` to detect which task won. +Here `result` has type `std::variant>`. Index 0 is failure, index 1 is the int task, index 2 is the void task. === Same-Type Tasks -The variant preserves one alternative per task. +The variant preserves one alternative per task, plus `error_code` at index 0. Use `.index()` to identify which task won: [source,cpp] ---- auto result = co_await when_any( - fetch_from_server_a(), // task - fetch_from_server_b(), // task - fetch_from_server_c() // task + fetch_from_server_a(), // io_task + fetch_from_server_b(), // io_task + fetch_from_server_c() // io_task ); +// result: variant -auto index = result.index(); -auto response = std::visit([](auto&& v) -> Response { return v; }, result); -std::cout << "Server " << index << " responded first\n"; +if (result.index() == 0) +{ + std::cerr << "All servers failed\n"; +} +else +{ + std::visit([](auto const& v) { + if constexpr (!std::is_same_v, std::error_code>) + std::cout << "Winner: " << v << "\n"; + }, result); + std::cout << "Server " << (result.index() - 1) << " responded first\n"; +} ---- -When multiple tasks share the same return type, the variant contains one alternative per task. Here, `result` is `std::variant`. The `.index()` value (0, 1, or 2) tells you which server responded first. Use `std::visit` to extract the value. - === Homogeneous Tasks (Vector) -For a dynamic number of tasks with the same type, use the vector overload: +For a dynamic number of tasks with the same type, use the range overload: [source,cpp] ---- -std::vector> requests; +std::vector> requests; for (auto& server : servers) requests.push_back(fetch_from(server)); -auto [index, response] = co_await when_any(std::move(requests)); -std::cout << "Server " << index << " responded: " << response << "\n"; +auto result = co_await when_any(std::move(requests)); +// result: variant> + +if (result.index() == 0) +{ + std::cerr << "All failed: " << std::get<0>(result).message() << "\n"; +} +else +{ + auto& [index, response] = std::get<1>(result); + std::cout << "Server " << index << " responded: " << response << "\n"; +} ---- -The vector overload accepts any sized input range of awaitables with the same result type. Since all tasks return `Response`, the result is `std::pair` directly—no variant wrapper is needed. +The range overload returns `variant>`. Index 0 is the failure case. Index 1 holds the winner's index and payload. -For void tasks in a vector, only the winner's index is returned: +For void tasks in a range: [source,cpp] ---- -std::vector> tasks; +std::vector> tasks; // ... populate tasks -std::size_t winner = co_await when_any(std::move(tasks)); -std::cout << "Task " << winner << " completed first\n"; +auto result = co_await when_any(std::move(tasks)); +// result: variant + +if (result.index() == 1) + std::cout << "Task " << std::get<1>(result) << " completed first\n"; ---- -Since void tasks produce no result value, the return type is `std::size_t` rather than a pair. +Since void tasks produce no payload, the success alternative is just `std::size_t` (the winner's index). == Error Handling -Exceptions are treated as valid completions. If the winning task throws, -that exception is rethrown from `when_any`: +Only a successful task (`!ec`) can win. Tasks that return an error code or throw an exception do *not* win. `when_any` keeps waiting for a success. + +=== All Tasks Fail + +If every task returns `ec`, the variant holds `error_code` at index 0: [source,cpp] ---- -task handle_errors() +task<> handle_all_fail() +{ + auto result = co_await when_any( + might_fail_a(), + might_fail_b() + ); + if (result.index() == 0) + std::cerr << "No winner: " << std::get<0>(result).message() << "\n"; +} +---- + +=== All Tasks Throw + +If every task throws, the first exception is rethrown: + +[source,cpp] +---- +task<> handle_all_throw() { try { - auto result = co_await when_any( - might_fail(), - might_succeed() + co_await when_any( + throws_a(), + throws_b() ); - // If we get here, the winner succeeded } catch (std::exception const& e) { - // The winning task threw this exception - std::cerr << "Winner failed: " << e.what() << "\n"; + // All threw — first exception is rethrown + std::cerr << "Error: " << e.what() << "\n"; } } ---- -=== First-Completion Semantics +=== Success-Only Winner Semantics -Unlike `when_all` (which captures the first _error_), `when_any` returns -whichever task completes first, whether it succeeds or fails. Exceptions -from non-winning tasks are discarded. +Unlike `when_all` (which propagates the first error), `when_any` treats errors +as non-winning. A failed task does not disqualify pending siblings; they keep +running until one succeeds or all complete. === Stop Propagation @@ -180,23 +226,23 @@ Tasks that support cancellation can exit early: [source,cpp] ---- -task fetch_with_cancel_support() +io_task fetch_with_cancel_support() { - auto token = co_await get_stop_token(); + auto token = co_await this_coro::stop_token; for (auto& chunk : data_source) { if (token.stop_requested()) - co_return partial_response(); // Exit early + co_return io_result{{}, partial_response()}; co_await send_chunk(chunk); } - co_return complete_response(); + co_return io_result{{}, complete_response()}; } -task example() +task<> example() { // When one fetch wins, the other sees stop_requested - auto response = co_await when_any( + auto result = co_await when_any( fetch_with_cancel_support(), fetch_with_cancel_support() ); @@ -213,7 +259,7 @@ cancelled, all children see the request: [source,cpp] ---- -task parent() +task<> parent() { auto result = co_await when_any( child_a(), // Sees parent's stop token @@ -224,7 +270,7 @@ task parent() std::stop_source source; run_async(ex, source.get_token())(parent()); -// Later: cancel everything +// Later: cancel everything — variant holds error_code at index 0 source.request_stop(); ---- @@ -234,7 +280,7 @@ All child tasks inherit the parent's executor affinity: [source,cpp] ---- -task parent() // Running on executor ex +task<> parent() // Running on executor ex { auto result = co_await when_any( child_a(), // Runs on ex @@ -267,69 +313,55 @@ Race requests to multiple servers for reliability: [source,cpp] ---- -task fetch_with_redundancy(Request req) +io_task fetch_with_redundancy(Request req) { auto result = co_await when_any( fetch_from(primary_server, req), fetch_from(backup_server, req) ); + // result: variant - std::cout << (result.index() == 0 ? "Primary" : "Backup") - << " server responded\n"; - co_return std::visit([](auto&& v) -> Response { return v; }, result); -} ----- - -== Example: Timeout Pattern - -Race an operation against a timer: + if (result.index() == 0) + co_return io_result{std::get<0>(result), {}}; -[source,cpp] ----- -task fetch_with_timeout(Request req) -{ - auto result = co_await when_any( - fetch_data(req), - timeout_after(100ms) - ); - - if (result.index() == 1) - throw timeout_error{"Request timed out"}; + std::cout << (result.index() == 1 ? "Primary" : "Backup") + << " server responded\n"; - co_return std::get<0>(result); -} - -// Helper that waits then throws -template -task timeout_after(std::chrono::milliseconds ms) -{ - co_await sleep(ms); - throw timeout_error{"Timeout"}; + // Extract the winner's Response + Response resp; + std::visit([&](auto const& v) { + if constexpr (!std::is_same_v, std::error_code>) + resp = v; + }, result); + co_return io_result{{}, std::move(resp)}; } ---- -The `timeout_after` helper waits for the specified duration then throws an exception. If `fetch_data` completes before the timer, its result is returned. If the timer wins, the timeout exception propagates from `when_any`. - == Example: First Available Resource Wait for the first available connection from a pool: [source,cpp] ---- -task get_connection(std::vector& pools) +io_task get_connection(std::vector& pools) { - std::vector> attempts; + std::vector> attempts; for (auto& pool : pools) attempts.push_back(pool.acquire()); - auto [index, conn] = co_await when_any(std::move(attempts)); + auto result = co_await when_any(std::move(attempts)); + // result: variant> + + if (result.index() == 0) + co_return io_result{std::get<0>(result), {}}; + auto& [index, conn] = std::get<1>(result); std::cout << "Got connection from pool " << index << "\n"; - co_return conn; + co_return io_result{{}, std::move(conn)}; } ---- -This function creates an acquire task for each pool, then races them. Whichever pool provides a connection first wins, and the remaining acquire attempts are cancelled. The `index` indicates which pool provided the connection. +This function creates an acquire task for each pool, then races them. Whichever pool provides a connection first (with `!ec`) wins, and the remaining acquire attempts are cancelled. The `index` indicates which pool provided the connection. == Comparison with when_all @@ -339,19 +371,19 @@ This function creates an acquire task for each pool, then races them. Whichever | Completion | Waits for all tasks -| Returns on first completion +| Returns on first success (`!ec`) | Return type -| Tuple of results -| Variant (variadic) or pair (range) +| `task>` +| `task>` | Error handling -| First exception wins, siblings get stop -| Exceptions are valid completions +| First `ec` or exception wins, siblings get stop +| Only `!ec` wins; errors do not win | Use case | Need all results -| Need fastest result +| Need fastest successful result |=== == Summary @@ -361,22 +393,25 @@ This function creates an acquire task for each pool, then races them. Whichever | Feature | Description | `when_any(tasks...)` -| Race tasks, return first completion +| Race `io_task` children, return first success -| `when_any(vector>)` -| Race homogeneous tasks from a vector +| `when_any(range)` +| Race homogeneous `io_task` children from a range | Return type (variadic) -| `variant<...>` with positional alternatives (use `.index()` for winner) +| `variant` — index 0 is failure, 1..N is winner -| Return type (vector) -| `pair` or `size_t` for void +| Return type (range, non-void) +| `variant>` -| Error handling -| Winner's exception propagated, others discarded +| Return type (range, void) +| `variant` + +| Winner selection +| Only `!ec` wins; errors and exceptions do not count | Stop propagation -| Siblings receive stop request on winner +| Siblings receive stop request when winner found | Cleanup | All tasks complete before returning diff --git a/example/when-any-cancellation/when_any_cancellation.cpp b/example/when-any-cancellation/when_any_cancellation.cpp index 031e6d2ee..020e45f92 100644 --- a/example/when-any-cancellation/when_any_cancellation.cpp +++ b/example/when-any-cancellation/when_any_cancellation.cpp @@ -114,22 +114,6 @@ capy::io_task<> timeout(int ms) co_return capy::io_result<>{}; } -// Use when_any with a timeout to bound the lifetime of a background worker. -// With void tasks, the range overload returns size_t (the winner's index). -capy::task<> timeout_a_worker() -{ - std::cout << "\n=== Timeout a background worker ===\n\n"; - - auto result = co_await capy::when_any( - background_worker("worker", 30), - timeout(100)); - - if (result.index() == 1) - std::cout << "\nWorker finished before timeout\n"; - else if (result.index() == 2) - std::cout << "\nTimeout fired — worker was cancelled\n"; -} - // Race three replicas using variadic overload. capy::task<> race_vector_of_sources() { @@ -154,7 +138,6 @@ capy::task<> race_vector_of_sources() capy::task<> run_demos() { co_await race_data_sources(); - co_await timeout_a_worker(); co_await race_vector_of_sources(); }