Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 87 additions & 66 deletions doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,82 +34,95 @@ task<> concurrent()

== when_all: Wait for All Tasks

`when_all` launches multiple tasks concurrently and waits for all of them to complete:
`when_all` launches multiple `io_task` children concurrently and waits for all of them to complete. It returns `task<io_result<R1, R2, ..., Rn>>`, a single `ec` plus the flattened payloads:

[source,cpp]
----
#include <boost/capy/when_all.hpp>

task<int> fetch_a() { co_return 1; }
task<int> fetch_b() { co_return 2; }
task<std::string> fetch_c() { co_return "hello"; }
io_task<int> fetch_a() { co_return io_result<int>{{}, 1}; }
io_task<int> fetch_b() { co_return io_result<int>{{}, 2}; }
io_task<std::string> fetch_c() { co_return io_result<std::string>{{}, "hello"}; }

task<> example()
{
auto [a, b, c] = co_await when_all(fetch_a(), fetch_b(), fetch_c());

auto [ec, a, b, c] = co_await when_all(fetch_a(), fetch_b(), fetch_c());

// ec == std::error_code{} (success)
// a == 1
// b == 2
// c == "hello"
}
----

=== Result Tuple
=== Result Type

`when_all` returns a tuple of results in the same order as the input tasks. Use structured bindings to unpack them.
`when_all` returns `io_result<R1, ..., Rn>` where each `Ri` is the child's payload flattened: `io_result<T>` contributes `T`, `io_result<>` contributes `tuple<>`. Check `ec` first; values are only meaningful when `!ec`.

=== Void Tasks
=== Void io_tasks

Tasks returning `void` contribute `std::monostate` to the result tuple, preserving the task-index-to-result-index mapping:
`io_task<>` children contribute `tuple<>` to the result:

[source,cpp]
----
task<> void_task() { co_return; }
task<int> int_task() { co_return 42; }
io_task<> void_task() { co_return io_result<>{}; }
io_task<int> int_task() { co_return io_result<int>{{}, 42}; }

task<> example()
{
auto [a, b, c] = co_await when_all(int_task(), void_task(), int_task());
// a == 42 (int, index 0)
// b == monostate (monostate, index 1)
// c == 42 (int, index 2)
auto [ec, a, b, c] = co_await when_all(int_task(), void_task(), int_task());
// a == 42 (int)
// b == tuple<> (from void io_task)
// c == 42 (int)
}
----

If all tasks return `void`, `when_all` returns a tuple of `std::monostate`:
When all children are `io_task<>`, just check `r.ec`:

[source,cpp]
----
task<> example()
{
auto [a, b] = co_await when_all(void_task_a(), void_task_b());
// a and b are std::monostate
auto r = co_await when_all(void_task_a(), void_task_b());
if (r.ec)
// handle error
}
----

=== Error Handling

If any task throws an exception:
I/O errors are reported through the `ec` field of the `io_result`. When any child returns a non-zero `ec`:

1. Stop is requested for sibling tasks
2. All tasks complete (or respond to stop)
3. The first `ec` is propagated in the outer `io_result`

[source,cpp]
----
task<> example()
{
auto [ec, a, b] = co_await when_all(task_a(), task_b());
if (ec)
std::cerr << "Error: " << ec.message() << "\n";
}
----

1. The exception is captured
2. Stop is requested for sibling tasks
3. All tasks are allowed to complete (or respond to stop)
4. The *first* exception is rethrown; later exceptions are discarded
If a task throws an exception, it is captured and rethrown after all tasks complete. Exceptions take priority over `ec`.

[source,cpp]
----
task<int> might_fail(bool fail)
io_task<int> might_throw(bool fail)
{
if (fail)
throw std::runtime_error("failed");
co_return 42;
co_return io_result<int>{{}, 42};
}

task<> example()
{
try
{
co_await when_all(might_fail(true), might_fail(false));
co_await when_all(might_throw(true), might_throw(false));
}
catch (std::runtime_error const& e)
{
Expand All @@ -124,23 +137,24 @@ When one task fails, `when_all` requests stop for its siblings. Well-behaved tas

[source,cpp]
----
task<> long_running()
io_task<> long_running()
{
auto token = co_await get_stop_token();
auto token = co_await this_coro::stop_token;

for (int i = 0; i < 1000; ++i)
{
if (token.stop_requested())
co_return; // Exit early when sibling fails
co_return io_result<>{}; // Exit early when sibling fails

co_await do_iteration();
}
co_return io_result<>{};
}
----

== when_any: First-to-Finish Wins
== when_any: First-to-Succeed Wins

`when_any` launches multiple tasks concurrently and returns when the *first* one completes:
`when_any` launches multiple `io_task` children concurrently and returns when the first one *succeeds* (`!ec`):

[source,cpp]
----
Expand All @@ -149,17 +163,19 @@ task<> long_running()
task<> example()
{
auto result = co_await when_any(
fetch_int(), // task<int>
fetch_string() // task<std::string>
fetch_int(), // io_task<int>
fetch_string() // io_task<std::string>
);
// result.index() indicates which task won (0 or 1)
// result is std::variant<int, std::string>
// result is std::variant<std::error_code, int, std::string>
// index 0: all tasks failed (error_code)
// index 1: fetch_int won
// index 2: fetch_string won
}
----

The result is a variant with one alternative per input task, preserving positional correspondence. Use `.index()` to determine the winner. When a winner is determined, stop is requested for all siblings. All tasks complete before `when_any` returns.
The result is a `variant` with `error_code` at index 0 (failure/no winner) and one alternative per input task at indices 1..N. Only tasks returning `!ec` can win; errors and exceptions do not count as winning. When a winner is found, stop is requested for all siblings. All tasks complete before `when_any` returns.

For detailed coverage including error handling, cancellation, and the vector overload, see Racing Tasks.
For detailed coverage including error handling, cancellation, and the range overload, see Racing Tasks.

== Practical Patterns

Expand All @@ -169,66 +185,68 @@ Fetch multiple resources simultaneously:

[source,cpp]
----
task<page_data> fetch_page_data(std::string url)
io_task<page_data> fetch_page_data(std::string url)
{
auto [header, body, sidebar] = co_await when_all(
auto [ec, header, body, sidebar] = co_await when_all(
fetch_header(url),
fetch_body(url),
fetch_sidebar(url)
);

co_return page_data{
if (ec)
co_return io_result<page_data>{ec, {}};

co_return io_result<page_data>{{}, {
std::move(header),
std::move(body),
std::move(sidebar)
};
}};
}
----

=== Fan-Out/Fan-In

Process items in parallel, then combine results:
Process items in parallel, then combine results using the range overload:

[source,cpp]
----
task<int> process_item(item const& i);
io_task<int> process_item(item const& i);

task<int> process_all(std::vector<item> const& items)
{
std::vector<task<int>> tasks;
std::vector<io_task<int>> tasks;
for (auto const& item : items)
tasks.push_back(process_item(item));

// This requires a range-based when_all (not yet available)
// For now, use fixed-arity when_all


auto [ec, results] = co_await when_all(std::move(tasks));
if (ec)
co_return 0;

int total = 0;
// ... accumulate results
for (auto v : results)
total += v;
co_return total;
}
----

=== Timeout with Fallback
=== Timeout

Use `when_any` to implement timeout with fallback:
The `timeout` combinator races an awaitable against a deadline:

[source,cpp]
----
task<Response> fetch_with_timeout(Request req)
{
auto result = co_await when_any(
fetch_data(req),
timeout_after<Response>(100ms)
);

if (result.index() == 1)
throw timeout_error{"Request timed out"};
#include <boost/capy/timeout.hpp>

co_return std::get<0>(result);
task<> example()
{
auto [ec, n] = co_await timeout(sock.read_some(buf), 50ms);
if (ec == cond::timeout)
{
// deadline expired before read completed
}
}
----

The `timeout_after` helper waits for the specified duration then throws. If `fetch_data` completes first, its result is returned. If the timer wins, the timeout exception propagates.
`timeout` returns the same `io_result` type as the inner awaitable. On timeout, `ec` is set to `error::timeout` and payload values are default-initialized. Unlike `when_any`, exceptions from the inner awaitable are always propagated and never swallowed by the timer.

== Implementation Notes

Expand Down Expand Up @@ -262,6 +280,9 @@ This design ensures proper context propagation to all children.

| `<boost/capy/when_any.hpp>`
| First-completion racing with when_any

| `<boost/capy/timeout.hpp>`
| Race an awaitable against a deadline
|===

You have now learned how to compose tasks concurrently with `when_all` and `when_any`. In the next section, you will learn about frame allocators for customizing coroutine memory allocation.
20 changes: 10 additions & 10 deletions doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,27 @@ int main()
capy::async_event data_ready;
int shared_value = 0;

auto producer = [&]() -> capy::task<> {
auto producer = [&]() -> capy::io_task<> {
std::cout << "Producer: preparing data...\n";
shared_value = 42;
std::cout << "Producer: data ready, signaling\n";
data_ready.set();
co_return;
co_return capy::io_result<>{};
};

auto consumer = [&]() -> capy::task<> {
auto consumer = [&]() -> capy::io_task<> {
std::cout << "Consumer: waiting for data...\n";
auto [ec] = co_await data_ready.wait();
(void)ec;
std::cout << "Consumer: received value " << shared_value << "\n";
co_return;
co_return capy::io_result<>{};
};

// Run both tasks concurrently using when_all, through a strand.
// The strand serializes execution, ensuring thread-safe access
// to the shared async_event and shared_value.
auto run_both = [&]() -> capy::task<> {
co_await capy::when_all(producer(), consumer());
(void) co_await capy::when_all(producer(), consumer());
};

capy::run_async(s, on_complete, on_error)(run_both());
Expand Down Expand Up @@ -99,12 +99,12 @@ capy::async_event data_ready;

[source,cpp]
----
auto producer = [&]() -> capy::task<> {
auto producer = [&]() -> capy::io_task<> {
std::cout << "Producer: preparing data...\n";
shared_value = 42;
std::cout << "Producer: data ready, signaling\n";
data_ready.set();
co_return;
co_return capy::io_result<>{};
};
----

Expand All @@ -114,12 +114,12 @@ The producer prepares data and signals completion by calling `set()`.

[source,cpp]
----
auto consumer = [&]() -> capy::task<> {
auto consumer = [&]() -> capy::io_task<> {
std::cout << "Consumer: waiting for data...\n";
auto [ec] = co_await data_ready.wait();
(void)ec;
std::cout << "Consumer: received value " << shared_value << "\n";
co_return;
co_return capy::io_result<>{};
};
----

Expand All @@ -133,7 +133,7 @@ The consumer waits until the event is set. The `co_await data_ready.wait()` susp
// The strand serializes execution, ensuring thread-safe access
// to the shared async_event and shared_value.
auto run_both = [&]() -> capy::task<> {
co_await capy::when_all(producer(), consumer());
(void) co_await capy::when_all(producer(), consumer());
};

capy::run_async(s, on_complete, on_error)(run_both());
Expand Down
Loading
Loading