From 76965e69cf13f91754a7ea7cf10b33b4dc9b72c8 Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Mon, 30 Mar 2026 13:38:23 -0600 Subject: [PATCH] Add io_context_options for runtime scheduler and service tuning Introduce io_context_options with seven configurable knobs: max_events_per_poll, inline_budget_initial/max, unassisted_budget, gqcs_timeout_ms, thread_pool_size, and single_threaded. The single_threaded option disables all scheduler and descriptor mutex/condvar operations via conditionally_enabled_mutex/event wrappers, following Asio's model. Cross-thread post is UB when enabled; DNS and file I/O return operation_not_supported. Benchmarks show 2x throughput on the single-threaded post path with zero regression on multi-threaded paths. Add lockless benchmark variants across all single-threaded suites: io_context, socket_throughput, socket_latency, http_server, timer, accept_churn, and fan_out. Add Asio lockless benchmarks for comparison (concurrency_hint=1). --- doc/modules/ROOT/nav.adoc | 1 + .../ROOT/pages/4.guide/4c2.configuration.adoc | 157 +++++++++++ .../detail/conditionally_enabled_event.hpp | 77 ++++++ .../detail/conditionally_enabled_mutex.hpp | 101 +++++++ include/boost/corosio/io_context.hpp | 130 +++++++++ .../native/detail/epoll/epoll_scheduler.hpp | 46 +++- .../native/detail/iocp/win_scheduler.hpp | 18 +- .../native/detail/kqueue/kqueue_scheduler.hpp | 53 +++- .../posix_random_access_file_service.hpp | 3 + .../detail/posix/posix_resolver_service.hpp | 22 ++ .../posix/posix_stream_file_service.hpp | 3 + .../reactor/reactor_descriptor_state.hpp | 8 +- .../detail/reactor/reactor_scheduler.hpp | 191 +++++++++---- .../native/detail/select/select_scheduler.hpp | 13 +- .../corosio/native/native_io_context.hpp | 14 + .../boost/corosio/native/native_scheduler.hpp | 3 + perf/bench/asio/callback/io_context_bench.cpp | 58 +++- .../bench/asio/coroutine/io_context_bench.cpp | 58 +++- perf/bench/corosio/accept_churn_bench.cpp | 172 ++++++++++++ perf/bench/corosio/fan_out_bench.cpp | 260 ++++++++++++++++++ perf/bench/corosio/http_server_bench.cpp | 34 +++ perf/bench/corosio/io_context_bench.cpp | 135 ++++++++- perf/bench/corosio/socket_latency_bench.cpp | 93 +++++++ .../bench/corosio/socket_throughput_bench.cpp | 158 +++++++++++ perf/bench/corosio/timer_bench.cpp | 79 ++++++ src/corosio/src/io_context.cpp | 99 ++++++- 26 files changed, 1887 insertions(+), 99 deletions(-) create mode 100644 doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc create mode 100644 include/boost/corosio/detail/conditionally_enabled_event.hpp create mode 100644 include/boost/corosio/detail/conditionally_enabled_mutex.hpp diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index a32ba9bed..a543e96d4 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -23,6 +23,7 @@ ** xref:4.guide/4a.tcp-networking.adoc[TCP/IP Networking] ** xref:4.guide/4b.concurrent-programming.adoc[Concurrent Programming] ** xref:4.guide/4c.io-context.adoc[I/O Context] +*** xref:4.guide/4c2.configuration.adoc[Configuration] ** xref:4.guide/4d.sockets.adoc[Sockets] ** xref:4.guide/4e.tcp-acceptor.adoc[Acceptors] ** xref:4.guide/4f.endpoints.adoc[Endpoints] diff --git a/doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc b/doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc new file mode 100644 index 000000000..bf6708f28 --- /dev/null +++ b/doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc @@ -0,0 +1,157 @@ += Configuration +:navtitle: Configuration + +The `io_context_options` struct provides runtime tuning knobs for the +I/O context and its backend scheduler. All defaults match the +library's built-in values, so an unconfigured context behaves +identically to previous releases. + +[source,cpp] +---- +#include + +corosio::io_context_options opts; +opts.max_events_per_poll = 256; +opts.inline_budget_max = 32; + +corosio::io_context ioc(opts); +---- + +Both `io_context` and `native_io_context` accept options: + +[source,cpp] +---- +#include + +corosio::io_context_options opts; +opts.max_events_per_poll = 512; + +corosio::native_io_context ioc(opts); +---- + +== Available Options + +[cols="1,1,1,3"] +|=== +| Option | Default | Backends | Description + +| `max_events_per_poll` +| 128 +| epoll, kqueue +| Number of events fetched per reactor poll call. Larger values + reduce syscall frequency under high load; smaller values improve + fairness between connections. + +| `inline_budget_initial` +| 2 +| epoll, kqueue, select +| Starting inline completion budget per handler chain. After a + posted handler executes, the reactor grants this many speculative + inline completions before forcing a re-queue. + +| `inline_budget_max` +| 16 +| epoll, kqueue, select +| Hard ceiling on adaptive inline budget ramp-up. The budget + doubles each cycle it is fully consumed, up to this limit. + +| `unassisted_budget` +| 4 +| epoll, kqueue, select +| Inline budget when no other thread is running the event loop. + Prevents a single-threaded context from starving connections. + +| `gqcs_timeout_ms` +| 500 +| IOCP +| Maximum `GetQueuedCompletionStatus` blocking time in + milliseconds. Lower values improve timer responsiveness at the + cost of more syscalls. + +| `thread_pool_size` +| 1 +| POSIX (epoll, kqueue, select) +| Number of worker threads in the shared thread pool used for + blocking file I/O and DNS resolution. Ignored on IOCP where + file I/O uses native overlapped I/O. + +| `single_threaded` +| false +| all +| Disable all scheduler mutex and condition variable operations. + Eliminates synchronization overhead when only one thread calls + `run()`. See <> for restrictions. +|=== + +Options that do not apply to the active backend are silently ignored. + +== Tuning Guidelines + +=== Event Buffer Size (`max_events_per_poll`) + +The event buffer controls how many I/O events are fetched in a single +`epoll_wait()` or `kevent()` call. + +* *High-throughput streaming* (few connections, high bandwidth): + increase to 256-512 to reduce syscall overhead. +* *Many idle connections* (chat servers, WebSocket hubs): + keep at 128 or lower for better fairness. + +=== Inline Completion Budget + +The inline budget controls how many I/O completions the reactor +completes speculatively within a single handler chain before forcing +a re-queue through the scheduler. + +* *Streaming workloads* (file transfer, video): + `inline_budget_max = 32` or higher reduces context switches. +* *Request-response workloads* (HTTP, RPC): + keep at 16 to prevent one connection from monopolizing a thread. +* *Single-threaded contexts*: + `unassisted_budget` caps the budget when only one thread is + running the event loop, preserving fairness. + +=== IOCP Timeout (`gqcs_timeout_ms`) + +On Windows, the IOCP scheduler periodically wakes to recheck timers. +The default 500ms balances responsiveness with efficiency. + +* *Sub-second timer precision*: reduce to 50-100ms. +* *Minimal syscall overhead*: increase to 1000ms or higher. + +=== Thread Pool Size (`thread_pool_size`) + +On POSIX platforms, file I/O (`stream_file`, `random_access_file`) +and DNS resolution use a shared thread pool. + +* *Concurrent file operations*: increase to match expected + parallelism (e.g. 4 for four concurrent file reads). +* *No file I/O*: leave at 1 (the pool is created lazily). + +[#single-threaded-mode] +=== Single-Threaded Mode (`single_threaded`) + +Disables all mutex and condition variable operations inside the +scheduler and per-socket descriptor states. This eliminates +15-25% of overhead on the post-and-dispatch hot path. + +[source,cpp] +---- +corosio::io_context_options opts; +opts.single_threaded = true; + +corosio::io_context ioc(opts); +ioc.run(); // only one thread may call this +---- + +WARNING: Single-threaded mode imposes hard restrictions. +Violating them is undefined behavior. + +* Only **one thread** may call `run()` (or any run/poll variant). +* **Posting work from another thread** is undefined behavior. +* **DNS resolution** returns `operation_not_supported`. +* **POSIX file I/O** (`stream_file`, `random_access_file`) returns + `operation_not_supported` on `open()`. +* **Signal sets** should not be shared across contexts. +* **Timer cancellation via `stop_token`** from another thread + remains safe (the timer service retains its own mutex). diff --git a/include/boost/corosio/detail/conditionally_enabled_event.hpp b/include/boost/corosio/detail/conditionally_enabled_event.hpp new file mode 100644 index 000000000..7d58b4e7f --- /dev/null +++ b/include/boost/corosio/detail/conditionally_enabled_event.hpp @@ -0,0 +1,77 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_CONDITIONALLY_ENABLED_EVENT_HPP +#define BOOST_COROSIO_DETAIL_CONDITIONALLY_ENABLED_EVENT_HPP + +#include + +#include +#include + +namespace boost::corosio::detail { + +/* Condition variable wrapper that becomes a no-op when disabled. + + When enabled, notify/wait delegate to an underlying + std::condition_variable. When disabled, all operations + are no-ops. The wait paths are unreachable in + single-threaded mode because the task sentinel prevents + the empty-queue state in do_one(). +*/ +class conditionally_enabled_event +{ + std::condition_variable cond_; + bool enabled_; + +public: + explicit conditionally_enabled_event(bool enabled = true) noexcept + : enabled_(enabled) + { + } + + conditionally_enabled_event(conditionally_enabled_event const&) = delete; + conditionally_enabled_event& operator=(conditionally_enabled_event const&) = delete; + + void set_enabled(bool v) noexcept + { + enabled_ = v; + } + + void notify_one() + { + if (enabled_) + cond_.notify_one(); + } + + void notify_all() + { + if (enabled_) + cond_.notify_all(); + } + + void wait(conditionally_enabled_mutex::scoped_lock& lock) + { + if (enabled_) + cond_.wait(lock.underlying()); + } + + template + void wait_for( + conditionally_enabled_mutex::scoped_lock& lock, + std::chrono::duration const& d) + { + if (enabled_) + cond_.wait_for(lock.underlying(), d); + } +}; + +} // namespace boost::corosio::detail + +#endif // BOOST_COROSIO_DETAIL_CONDITIONALLY_ENABLED_EVENT_HPP diff --git a/include/boost/corosio/detail/conditionally_enabled_mutex.hpp b/include/boost/corosio/detail/conditionally_enabled_mutex.hpp new file mode 100644 index 000000000..37e6c3be1 --- /dev/null +++ b/include/boost/corosio/detail/conditionally_enabled_mutex.hpp @@ -0,0 +1,101 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_CONDITIONALLY_ENABLED_MUTEX_HPP +#define BOOST_COROSIO_DETAIL_CONDITIONALLY_ENABLED_MUTEX_HPP + +#include + +namespace boost::corosio::detail { + +/* Mutex wrapper that becomes a no-op when disabled. + + When enabled (the default), lock/unlock delegate to an + underlying std::mutex. When disabled, all operations are + no-ops. The enabled flag is fixed after construction. + + scoped_lock wraps std::unique_lock internally + so that condvar wait paths (which require the real lock + type) compile and work in multi-threaded mode. +*/ +class conditionally_enabled_mutex +{ + std::mutex mutex_; + bool enabled_; + +public: + explicit conditionally_enabled_mutex(bool enabled = true) noexcept + : enabled_(enabled) + { + } + + conditionally_enabled_mutex(conditionally_enabled_mutex const&) = delete; + conditionally_enabled_mutex& operator=(conditionally_enabled_mutex const&) = delete; + + bool enabled() const noexcept + { + return enabled_; + } + + void set_enabled(bool v) noexcept + { + enabled_ = v; + } + + // Lockable interface — allows std::lock_guard + void lock() { if (enabled_) mutex_.lock(); } + void unlock() { if (enabled_) mutex_.unlock(); } + bool try_lock() { return !enabled_ || mutex_.try_lock(); } + + class scoped_lock + { + std::unique_lock lock_; + bool enabled_; + + public: + explicit scoped_lock(conditionally_enabled_mutex& m) + : lock_(m.mutex_, std::defer_lock) + , enabled_(m.enabled_) + { + if (enabled_) + lock_.lock(); + } + + scoped_lock(scoped_lock const&) = delete; + scoped_lock& operator=(scoped_lock const&) = delete; + + void lock() + { + if (enabled_) + lock_.lock(); + } + + void unlock() + { + if (enabled_) + lock_.unlock(); + } + + bool owns_lock() const noexcept + { + return enabled_ && lock_.owns_lock(); + } + + // Access the underlying unique_lock for condvar wait(). + // Only called when locking is enabled. + std::unique_lock& underlying() noexcept + { + return lock_; + } + }; +}; + +} // namespace boost::corosio::detail + +#endif // BOOST_COROSIO_DETAIL_CONDITIONALLY_ENABLED_MUTEX_HPP diff --git a/include/boost/corosio/io_context.hpp b/include/boost/corosio/io_context.hpp index d8e58a81f..3e4ad2a35 100644 --- a/include/boost/corosio/io_context.hpp +++ b/include/boost/corosio/io_context.hpp @@ -27,6 +27,95 @@ namespace boost::corosio { +/** Runtime tuning options for @ref io_context. + + All fields have defaults that match the library's built-in + values, so constructing a default `io_context_options` produces + identical behavior to an unconfigured context. + + Options that apply only to a specific backend family are + silently ignored when the active backend does not support them. + + @par Example + @code + io_context_options opts; + opts.max_events_per_poll = 256; // larger batch per syscall + opts.inline_budget_max = 32; // more speculative completions + opts.thread_pool_size = 4; // more file-I/O workers + + io_context ioc(opts); + @endcode + + @see io_context, native_io_context +*/ +struct io_context_options +{ + /** Maximum events fetched per reactor poll call. + + Controls the buffer size passed to `epoll_wait()` or + `kevent()`. Larger values reduce syscall frequency under + high load; smaller values improve fairness between + connections. Ignored on IOCP and select backends. + */ + unsigned max_events_per_poll = 128; + + /** Starting inline completion budget per handler chain. + + After a posted handler executes, the reactor grants this + many speculative inline completions before forcing a + re-queue. Applies to reactor backends only. + */ + unsigned inline_budget_initial = 2; + + /** Hard ceiling on adaptive inline budget ramp-up. + + The budget doubles each cycle it is fully consumed, up to + this limit. Applies to reactor backends only. + */ + unsigned inline_budget_max = 16; + + /** Inline budget when no other thread assists the reactor. + + When only one thread is running the event loop, this + value caps the inline budget to preserve fairness. + Applies to reactor backends only. + */ + unsigned unassisted_budget = 4; + + /** Maximum `GetQueuedCompletionStatus` timeout in milliseconds. + + Bounds how long the IOCP scheduler blocks between timer + rechecks. Lower values improve timer responsiveness at the + cost of more syscalls. Applies to IOCP only. + */ + unsigned gqcs_timeout_ms = 500; + + /** Thread pool size for blocking I/O (file I/O, DNS resolution). + + Sets the number of worker threads in the shared thread pool + used by POSIX file services and DNS resolution. Must be at + least 1. Applies to POSIX backends only; ignored on IOCP + where file I/O uses native overlapped I/O. + */ + unsigned thread_pool_size = 1; + + /** Enable single-threaded mode (disable scheduler locking). + + When true, the scheduler skips all mutex lock/unlock and + condition variable operations on the hot path. This + eliminates synchronization overhead when only one thread + calls `run()`. + + @par Restrictions + - Only one thread may call `run()` (or any run variant). + - Posting work from another thread is undefined behavior. + - DNS resolution returns `operation_not_supported`. + - POSIX file I/O returns `operation_not_supported`. + - Signal sets should not be shared across contexts. + */ + bool single_threaded = false; +}; + namespace detail { struct timer_service_access; } // namespace detail @@ -64,6 +153,12 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context { friend struct detail::timer_service_access; + /// Pre-create services that depend on options (before construct). + void apply_options_pre_(io_context_options const& opts); + + /// Apply runtime tuning to the scheduler (after construct). + void apply_options_post_(io_context_options const& opts); + protected: detail::scheduler* sched_; @@ -81,6 +176,17 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context */ explicit io_context(unsigned concurrency_hint); + /** Construct with runtime tuning options and platform backend. + + @param opts Runtime options controlling scheduler and + service behavior. + @param concurrency_hint Hint for the number of threads + that will call `run()`. + */ + explicit io_context( + io_context_options const& opts, + unsigned concurrency_hint = std::thread::hardware_concurrency()); + /** Construct with an explicit backend tag. @param backend The backend tag value selecting the I/O @@ -100,6 +206,30 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context sched_ = &Backend::construct(*this, concurrency_hint); } + /** Construct with an explicit backend tag and runtime options. + + @param backend The backend tag value selecting the I/O + multiplexer (e.g. `corosio::epoll`). + @param opts Runtime options controlling scheduler and + service behavior. + @param concurrency_hint Hint for the number of threads + that will call `run()`. + */ + template + requires requires { Backend::construct; } + explicit io_context( + Backend backend, + io_context_options const& opts, + unsigned concurrency_hint = std::thread::hardware_concurrency()) + : capy::execution_context(this) + , sched_(nullptr) + { + (void)backend; + apply_options_pre_(opts); + sched_ = &Backend::construct(*this, concurrency_hint); + apply_options_post_(opts); + } + ~io_context(); io_context(io_context const&) = delete; diff --git a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp index ff5fb71b3..454c40712 100644 --- a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp +++ b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -86,6 +87,13 @@ class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base /// Shut down the scheduler, draining pending operations. void shutdown() override; + /// Apply runtime configuration, resizing the event buffer. + void configure_reactor( + unsigned max_events, + unsigned budget_init, + unsigned budget_max, + unsigned unassisted) override; + /** Return the epoll file descriptor. Used by socket services to register file descriptors @@ -117,7 +125,7 @@ class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base private: void - run_task(std::unique_lock& lock, context_type* ctx, + run_task(lock_type& lock, context_type* ctx, long timeout_us) override; void interrupt_reactor() const override; void update_timerfd() const; @@ -131,12 +139,17 @@ class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base // Set when the earliest timer changes; flushed before epoll_wait mutable std::atomic timerfd_stale_{false}; + + // Event buffer sized from max_events_per_poll_ (set at construction, + // resized by configure_reactor via io_context_options). + std::vector event_buffer_; }; inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int) : epoll_fd_(-1) , event_fd_(-1) , timer_fd_(-1) + , event_buffer_(max_events_per_poll_) { epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); if (epoll_fd_ < 0) @@ -218,6 +231,18 @@ epoll_scheduler::shutdown() interrupt_reactor(); } +inline void +epoll_scheduler::configure_reactor( + unsigned max_events, + unsigned budget_init, + unsigned budget_max, + unsigned unassisted) +{ + reactor_scheduler_base::configure_reactor( + max_events, budget_init, budget_max, unassisted); + event_buffer_.resize(max_events_per_poll_); +} + inline void epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const { @@ -231,9 +256,10 @@ epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const desc->registered_events = ev.events; desc->fd = fd; desc->scheduler_ = this; + desc->mutex.set_enabled(!single_threaded_); desc->ready_events_.store(0, std::memory_order_relaxed); - std::lock_guard lock(desc->mutex); + conditionally_enabled_mutex::scoped_lock lock(desc->mutex); desc->impl_ref_.reset(); desc->read_ready = false; desc->write_ready = false; @@ -296,7 +322,7 @@ epoll_scheduler::update_timerfd() const inline void epoll_scheduler::run_task( - std::unique_lock& lock, context_type* ctx, long timeout_us) + lock_type& lock, context_type* ctx, long timeout_us) { int timeout_ms; if (task_interrupted_) @@ -315,8 +341,9 @@ epoll_scheduler::run_task( if (timerfd_stale_.exchange(false, std::memory_order_acquire)) update_timerfd(); - epoll_event events[128]; - int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms); + int nfds = ::epoll_wait( + epoll_fd_, event_buffer_.data(), + static_cast(event_buffer_.size()), timeout_ms); if (nfds < 0 && errno != EINTR) detail::throw_system_error(make_err(errno), "epoll_wait"); @@ -326,7 +353,7 @@ epoll_scheduler::run_task( for (int i = 0; i < nfds; ++i) { - if (events[i].data.ptr == nullptr) + if (event_buffer_[i].data.ptr == nullptr) { std::uint64_t val; // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) @@ -335,7 +362,7 @@ epoll_scheduler::run_task( continue; } - if (events[i].data.ptr == &timer_fd_) + if (event_buffer_[i].data.ptr == &timer_fd_) { std::uint64_t expirations; // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) @@ -345,8 +372,9 @@ epoll_scheduler::run_task( continue; } - auto* desc = static_cast(events[i].data.ptr); - desc->add_ready_events(events[i].events); + auto* desc = + static_cast(event_buffer_[i].data.ptr); + desc->add_ready_events(event_buffer_[i].events); bool expected = false; if (desc->is_enqueued_.compare_exchange_strong( diff --git a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp index ae8f568d5..efef0914b 100644 --- a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp +++ b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp @@ -80,6 +80,15 @@ class BOOST_COROSIO_DECL win_scheduler final void work_started() noexcept override; void work_finished() noexcept override; + /** Apply runtime IOCP configuration. + + @param gqcs_timeout_ms Max GQCS blocking time in milliseconds. + */ + void configure_iocp(unsigned gqcs_timeout_ms) noexcept + { + gqcs_timeout_ms_ = gqcs_timeout_ms; + } + /** Signal that an overlapped I/O operation is now pending. Coordinates with do_one() via the ready_ CAS protocol. */ void on_pending(overlapped_op* op) const; @@ -102,6 +111,7 @@ class BOOST_COROSIO_DECL win_scheduler final mutable long stopped_; long stop_event_posted_; mutable long dispatch_required_; + unsigned long gqcs_timeout_ms_ = 500; mutable win_mutex dispatch_mutex_; mutable op_queue completed_ops_; @@ -226,7 +236,7 @@ win_scheduler::shutdown() ULONG_PTR key; LPOVERLAPPED overlapped; ::GetQueuedCompletionStatus( - iocp_, &bytes, &key, &overlapped, iocp::max_gqcs_timeout); + iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_); if (overlapped) { ::InterlockedDecrement(&outstanding_work_); @@ -389,7 +399,7 @@ win_scheduler::stop() { // PQCS failure is non-fatal: stopped_ is already set. // The run() loop will notice via the GQCS timeout - // (max_gqcs_timeout = 500ms) and exit. + // (gqcs_timeout_ms_, default 500ms) and exit. ::InterlockedExchange(&dispatch_required_, 1); } } @@ -547,8 +557,8 @@ win_scheduler::do_one(unsigned long timeout_ms) BOOL result = ::GetQueuedCompletionStatus( iocp_, &bytes, &key, &overlapped, - timeout_ms < iocp::max_gqcs_timeout ? timeout_ms - : iocp::max_gqcs_timeout); + timeout_ms < gqcs_timeout_ms_ ? timeout_ms + : gqcs_timeout_ms_); DWORD dwError = ::GetLastError(); // Handle based on completion key diff --git a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp index 018d7d2b5..025c15a3c 100644 --- a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp +++ b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -102,6 +103,13 @@ class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler_base /// Shut down the scheduler, draining pending operations. void shutdown() override; + /// Apply runtime configuration, resizing the event buffer. + void configure_reactor( + unsigned max_events, + unsigned budget_init, + unsigned budget_max, + unsigned unassisted) override; + /** Return the kqueue file descriptor. Used by socket services to register file descriptors @@ -139,7 +147,7 @@ class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler_base private: void - run_task(std::unique_lock& lock, context_type* ctx, + run_task(lock_type& lock, context_type* ctx, long timeout_us) override; void interrupt_reactor() const override; long calculate_timeout(long requested_timeout_us) const; @@ -148,10 +156,14 @@ class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler_base // EVFILT_USER idempotency mutable std::atomic user_event_armed_{false}; + + // Event buffer sized from max_events_per_poll_. + std::vector event_buffer_; }; inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int) : kq_fd_(-1) + , event_buffer_(max_events_per_poll_) { kq_fd_ = ::kqueue(); if (kq_fd_ < 0) @@ -202,6 +214,18 @@ kqueue_scheduler::shutdown() interrupt_reactor(); } +inline void +kqueue_scheduler::configure_reactor( + unsigned max_events, + unsigned budget_init, + unsigned budget_max, + unsigned unassisted) +{ + reactor_scheduler_base::configure_reactor( + max_events, budget_init, budget_max, unassisted); + event_buffer_.resize(max_events_per_poll_); +} + inline void kqueue_scheduler::register_descriptor(int fd, descriptor_state* desc) const { @@ -219,9 +243,10 @@ kqueue_scheduler::register_descriptor(int fd, descriptor_state* desc) const desc->registered_events = kqueue_event_read | kqueue_event_write; desc->fd = fd; desc->scheduler_ = this; + desc->mutex.set_enabled(!single_threaded_); desc->ready_events_.store(0, std::memory_order_relaxed); - std::lock_guard lock(desc->mutex); + conditionally_enabled_mutex::scoped_lock lock(desc->mutex); desc->impl_ref_.reset(); desc->read_ready = false; desc->write_ready = false; @@ -286,7 +311,7 @@ kqueue_scheduler::calculate_timeout(long requested_timeout_us) const inline void kqueue_scheduler::run_task( - std::unique_lock& lock, context_type* ctx, long timeout_us) + lock_type& lock, context_type* ctx, long timeout_us) { long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(timeout_us); @@ -305,8 +330,9 @@ kqueue_scheduler::run_task( ts_ptr = &ts; } - struct kevent events[128]; - int nev = ::kevent(kq_fd_, nullptr, 0, events, 128, ts_ptr); + int nev = ::kevent( + kq_fd_, nullptr, 0, event_buffer_.data(), + static_cast(event_buffer_.size()), ts_ptr); int saved_errno = errno; if (nev < 0 && saved_errno != EINTR) @@ -316,31 +342,32 @@ kqueue_scheduler::run_task( for (int i = 0; i < nev; ++i) { - if (events[i].filter == EVFILT_USER) + if (event_buffer_[i].filter == EVFILT_USER) { user_event_armed_.store(false, std::memory_order_release); continue; } - auto* desc = static_cast(events[i].udata); + auto* desc = + static_cast(event_buffer_[i].udata); if (!desc) continue; std::uint32_t ready = 0; - if (events[i].filter == EVFILT_READ) + if (event_buffer_[i].filter == EVFILT_READ) ready |= kqueue_event_read; - else if (events[i].filter == EVFILT_WRITE) + else if (event_buffer_[i].filter == EVFILT_WRITE) ready |= kqueue_event_write; - if (events[i].flags & EV_ERROR) + if (event_buffer_[i].flags & EV_ERROR) ready |= kqueue_event_error; - if (events[i].flags & EV_EOF) + if (event_buffer_[i].flags & EV_EOF) { - if (events[i].filter == EVFILT_READ) + if (event_buffer_[i].filter == EVFILT_READ) ready |= kqueue_event_read; - if (events[i].fflags != 0) + if (event_buffer_[i].fflags != 0) ready |= kqueue_event_error; } diff --git a/include/boost/corosio/native/detail/posix/posix_random_access_file_service.hpp b/include/boost/corosio/native/detail/posix/posix_random_access_file_service.hpp index 6802108bb..c187b06b3 100644 --- a/include/boost/corosio/native/detail/posix/posix_random_access_file_service.hpp +++ b/include/boost/corosio/native/detail/posix/posix_random_access_file_service.hpp @@ -15,6 +15,7 @@ #if BOOST_COROSIO_POSIX #include +#include #include #include @@ -80,6 +81,8 @@ class BOOST_COROSIO_DECL posix_random_access_file_service final std::filesystem::path const& path, file_base::flags mode) override { + if (static_cast(sched_)->single_threaded_) + return std::make_error_code(std::errc::operation_not_supported); return static_cast(impl).open_file( path, mode); } diff --git a/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp b/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp index c6cbe9c2a..dca15bb7a 100644 --- a/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp +++ b/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp @@ -15,6 +15,7 @@ #if BOOST_COROSIO_POSIX #include +#include #include #include @@ -66,6 +67,13 @@ class BOOST_COROSIO_DECL posix_resolver_service final return pool_; } + /** Return true if single-threaded mode is active. */ + bool single_threaded() const noexcept + { + return static_cast(sched_) + ->single_threaded_; + } + private: scheduler* sched_; thread_pool& pool_; @@ -370,6 +378,13 @@ posix_resolver::resolve( std::error_code* ec, resolver_results* out) { + if (svc_.single_threaded()) + { + *ec = std::make_error_code(std::errc::operation_not_supported); + op_.cont_op.cont.h = h; + return dispatch_coro(ex, op_.cont_op.cont); + } + auto& op = op_; op.reset(); op.h = h; @@ -409,6 +424,13 @@ posix_resolver::reverse_resolve( std::error_code* ec, reverse_resolver_result* result_out) { + if (svc_.single_threaded()) + { + *ec = std::make_error_code(std::errc::operation_not_supported); + reverse_op_.cont_op.cont.h = h; + return dispatch_coro(ex, reverse_op_.cont_op.cont); + } + auto& op = reverse_op_; op.reset(); op.h = h; diff --git a/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp b/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp index ef13807c1..f1bc13d57 100644 --- a/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp +++ b/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp @@ -15,6 +15,7 @@ #if BOOST_COROSIO_POSIX #include +#include #include #include @@ -81,6 +82,8 @@ class BOOST_COROSIO_DECL posix_stream_file_service final std::filesystem::path const& path, file_base::flags mode) override { + if (static_cast(sched_)->single_threaded_) + return std::make_error_code(std::errc::operation_not_supported); return static_cast(impl).open_file(path, mode); } diff --git a/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp b/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp index 56c653018..c09a52567 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp @@ -13,10 +13,11 @@ #include #include +#include + #include #include #include -#include #include #include @@ -48,7 +49,8 @@ static constexpr std::uint32_t reactor_event_error = 0x008; struct reactor_descriptor_state : scheduler_op { /// Protects operation pointers and ready/cancel flags. - std::mutex mutex; + /// Becomes a no-op in single-threaded mode. + conditionally_enabled_mutex mutex{true}; /// Pending read operation (guarded by `mutex`). reactor_op_base* read_op = nullptr; @@ -131,7 +133,7 @@ reactor_descriptor_state::invoke_deferred_io() op_queue local_ops; { - std::lock_guard lock(mutex); + conditionally_enabled_mutex::scoped_lock lock(mutex); // Must clear is_enqueued_ and move impl_ref_ under the same // lock that processes I/O. close_socket() checks is_enqueued_ diff --git a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp index d1fd2e798..dbd1b9c73 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp @@ -19,13 +19,15 @@ #include #include -#include #include #include #include #include #include -#include +#include + +#include +#include namespace boost::corosio::detail { @@ -63,15 +65,8 @@ struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context /// Construct a context frame linked to @a n. reactor_scheduler_context( - reactor_scheduler_base const* k, reactor_scheduler_context* n) - : key(k) - , next(n) - , private_outstanding_work(0) - , inline_budget(0) - , inline_budget_max(2) - , unassisted(false) - { - } + reactor_scheduler_base const* k, + reactor_scheduler_context* n); }; /// Thread-local context stack for reactor schedulers. @@ -148,6 +143,9 @@ class reactor_scheduler_base public: using key_type = scheduler; using context_type = reactor_scheduler_context; + using mutex_type = conditionally_enabled_mutex; + using lock_type = mutex_type::scoped_lock; + using event_type = conditionally_enabled_event; /// Post a coroutine for deferred execution. void post(std::coroutine_handle<> h) const override; @@ -233,6 +231,41 @@ class reactor_scheduler_base */ void post_deferred_completions(op_queue& ops) const; + /** Apply runtime configuration to the scheduler. + + Called by `io_context` after construction. Values that do + not apply to this backend are silently ignored. + + @param max_events Event buffer size for epoll/kqueue. + @param budget_init Starting inline completion budget. + @param budget_max Hard ceiling on adaptive budget ramp-up. + @param unassisted Budget when single-threaded. + */ + virtual void configure_reactor( + unsigned max_events, + unsigned budget_init, + unsigned budget_max, + unsigned unassisted); + + /// Return the configured initial inline budget. + unsigned inline_budget_initial() const noexcept + { + return inline_budget_initial_; + } + + /** Enable or disable single-threaded (lockless) mode. + + When enabled, all scheduler mutex and condition variable + operations become no-ops. Cross-thread post() is + undefined behavior. + */ + void configure_single_threaded(bool v) noexcept + { + single_threaded_ = v; + mutex_.set_enabled(!v); + cond_.set_enabled(!v); + } + protected: reactor_scheduler_base() = default; @@ -249,19 +282,26 @@ class reactor_scheduler_base struct task_cleanup { reactor_scheduler_base const* sched; - std::unique_lock* lock; + lock_type* lock; context_type* ctx; ~task_cleanup(); }; - mutable std::mutex mutex_; - mutable std::condition_variable cond_; + mutable mutex_type mutex_{true}; + mutable event_type cond_{true}; mutable op_queue completed_ops_; mutable std::atomic outstanding_work_{0}; - bool stopped_ = false; + std::atomic stopped_{false}; mutable std::atomic task_running_{false}; mutable bool task_interrupted_ = false; + // Runtime-configurable reactor tuning parameters. + // Defaults match the library's built-in values. + unsigned max_events_per_poll_ = 128; + unsigned inline_budget_initial_ = 2; + unsigned inline_budget_max_ = 16; + unsigned unassisted_budget_ = 4; + /// Bit 0 of `state_`: set when the condvar should be signaled. static constexpr std::size_t signaled_bit = 1; @@ -279,7 +319,7 @@ class reactor_scheduler_base /// Run the platform-specific reactor poll. virtual void - run_task(std::unique_lock& lock, context_type* ctx, + run_task(lock_type& lock, context_type* ctx, long timeout_us) = 0; /// Wake a blocked reactor (e.g. write to eventfd or pipe). @@ -289,22 +329,22 @@ class reactor_scheduler_base struct work_cleanup { reactor_scheduler_base* sched; - std::unique_lock* lock; + lock_type* lock; context_type* ctx; ~work_cleanup(); }; std::size_t do_one( - std::unique_lock& lock, long timeout_us, context_type* ctx); + lock_type& lock, long timeout_us, context_type* ctx); - void signal_all(std::unique_lock& lock) const; - bool maybe_unlock_and_signal_one(std::unique_lock& lock) const; - bool unlock_and_signal_one(std::unique_lock& lock) const; + void signal_all(lock_type& lock) const; + bool maybe_unlock_and_signal_one(lock_type& lock) const; + bool unlock_and_signal_one(lock_type& lock) const; void clear_signal() const; - void wait_for_signal(std::unique_lock& lock) const; + void wait_for_signal(lock_type& lock) const; void wait_for_signal_for( - std::unique_lock& lock, long timeout_us) const; - void wake_one_thread_and_unlock(std::unique_lock& lock) const; + lock_type& lock, long timeout_us) const; + void wake_one_thread_and_unlock(lock_type& lock) const; }; /** RAII guard that pushes/pops a scheduler context frame. @@ -338,6 +378,48 @@ struct reactor_thread_context_guard // ---- Inline implementations ------------------------------------------------ +inline +reactor_scheduler_context::reactor_scheduler_context( + reactor_scheduler_base const* k, + reactor_scheduler_context* n) + : key(k) + , next(n) + , private_outstanding_work(0) + , inline_budget(0) + , inline_budget_max( + static_cast(k->inline_budget_initial())) + , unassisted(false) +{ +} + +inline void +reactor_scheduler_base::configure_reactor( + unsigned max_events, + unsigned budget_init, + unsigned budget_max, + unsigned unassisted) +{ + if (max_events < 1 || + max_events > static_cast(std::numeric_limits::max())) + throw std::out_of_range( + "max_events_per_poll must be in [1, INT_MAX]"); + if (budget_max < 1 || + budget_max > static_cast(std::numeric_limits::max())) + throw std::out_of_range( + "inline_budget_max must be in [1, INT_MAX]"); + + // Clamp initial and unassisted to budget_max. + if (budget_init > budget_max) + budget_init = budget_max; + if (unassisted > budget_max) + unassisted = budget_max; + + max_events_per_poll_ = max_events; + inline_budget_initial_ = budget_init; + inline_budget_max_ = budget_max; + unassisted_budget_ = unassisted; +} + inline void reactor_scheduler_base::reset_inline_budget() const noexcept { @@ -346,15 +428,20 @@ reactor_scheduler_base::reset_inline_budget() const noexcept // Cap when no other thread absorbed queued work if (ctx->unassisted) { - ctx->inline_budget_max = 4; - ctx->inline_budget = 4; + ctx->inline_budget_max = + static_cast(unassisted_budget_); + ctx->inline_budget = + static_cast(unassisted_budget_); return; } // Ramp up when previous cycle fully consumed budget if (ctx->inline_budget == 0) - ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16); + ctx->inline_budget_max = (std::min)( + ctx->inline_budget_max * 2, + static_cast(inline_budget_max_)); else if (ctx->inline_budget < ctx->inline_budget_max) - ctx->inline_budget_max = 2; + ctx->inline_budget_max = + static_cast(inline_budget_initial_); ctx->inline_budget = ctx->inline_budget_max; } } @@ -411,7 +498,7 @@ reactor_scheduler_base::post(std::coroutine_handle<> h) const outstanding_work_.fetch_add(1, std::memory_order_relaxed); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); completed_ops_.push(ph.release()); wake_one_thread_and_unlock(lock); } @@ -428,7 +515,7 @@ reactor_scheduler_base::post(scheduler_op* h) const outstanding_work_.fetch_add(1, std::memory_order_relaxed); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); completed_ops_.push(h); wake_one_thread_and_unlock(lock); } @@ -442,10 +529,10 @@ reactor_scheduler_base::running_in_this_thread() const noexcept inline void reactor_scheduler_base::stop() { - std::unique_lock lock(mutex_); - if (!stopped_) + lock_type lock(mutex_); + if (!stopped_.load(std::memory_order_acquire)) { - stopped_ = true; + stopped_.store(true, std::memory_order_release); signal_all(lock); interrupt_reactor(); } @@ -454,15 +541,13 @@ reactor_scheduler_base::stop() inline bool reactor_scheduler_base::stopped() const noexcept { - std::unique_lock lock(mutex_); - return stopped_; + return stopped_.load(std::memory_order_acquire); } inline void reactor_scheduler_base::restart() { - std::unique_lock lock(mutex_); - stopped_ = false; + stopped_.store(false, std::memory_order_release); } inline std::size_t @@ -475,7 +560,7 @@ reactor_scheduler_base::run() } reactor_thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); std::size_t n = 0; for (;;) @@ -500,7 +585,7 @@ reactor_scheduler_base::run_one() } reactor_thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); return do_one(lock, -1, &ctx.frame_); } @@ -514,7 +599,7 @@ reactor_scheduler_base::wait_one(long usec) } reactor_thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); return do_one(lock, usec, &ctx.frame_); } @@ -528,7 +613,7 @@ reactor_scheduler_base::poll() } reactor_thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); std::size_t n = 0; for (;;) @@ -553,7 +638,7 @@ reactor_scheduler_base::poll_one() } reactor_thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); return do_one(lock, 0, &ctx.frame_); } @@ -585,7 +670,7 @@ reactor_scheduler_base::drain_thread_queue( if (count > 0) outstanding_work_.fetch_add(count, std::memory_order_relaxed); - std::unique_lock lock(mutex_); + lock_type lock(mutex_); completed_ops_.splice(queue); if (count > 0) maybe_unlock_and_signal_one(lock); @@ -603,7 +688,7 @@ reactor_scheduler_base::post_deferred_completions(op_queue& ops) const return; } - std::unique_lock lock(mutex_); + lock_type lock(mutex_); completed_ops_.splice(ops); wake_one_thread_and_unlock(lock); } @@ -611,7 +696,7 @@ reactor_scheduler_base::post_deferred_completions(op_queue& ops) const inline void reactor_scheduler_base::shutdown_drain() { - std::unique_lock lock(mutex_); + lock_type lock(mutex_); while (auto* h = completed_ops_.pop()) { @@ -626,7 +711,7 @@ reactor_scheduler_base::shutdown_drain() } inline void -reactor_scheduler_base::signal_all(std::unique_lock&) const +reactor_scheduler_base::signal_all(lock_type&) const { state_ |= signaled_bit; cond_.notify_all(); @@ -634,7 +719,7 @@ reactor_scheduler_base::signal_all(std::unique_lock&) const inline bool reactor_scheduler_base::maybe_unlock_and_signal_one( - std::unique_lock& lock) const + lock_type& lock) const { state_ |= signaled_bit; if (state_ > signaled_bit) @@ -648,7 +733,7 @@ reactor_scheduler_base::maybe_unlock_and_signal_one( inline bool reactor_scheduler_base::unlock_and_signal_one( - std::unique_lock& lock) const + lock_type& lock) const { state_ |= signaled_bit; bool have_waiters = state_ > signaled_bit; @@ -666,7 +751,7 @@ reactor_scheduler_base::clear_signal() const inline void reactor_scheduler_base::wait_for_signal( - std::unique_lock& lock) const + lock_type& lock) const { while ((state_ & signaled_bit) == 0) { @@ -678,7 +763,7 @@ reactor_scheduler_base::wait_for_signal( inline void reactor_scheduler_base::wait_for_signal_for( - std::unique_lock& lock, long timeout_us) const + lock_type& lock, long timeout_us) const { if ((state_ & signaled_bit) == 0) { @@ -690,7 +775,7 @@ reactor_scheduler_base::wait_for_signal_for( inline void reactor_scheduler_base::wake_one_thread_and_unlock( - std::unique_lock& lock) const + lock_type& lock) const { if (maybe_unlock_and_signal_one(lock)) return; @@ -753,11 +838,11 @@ inline reactor_scheduler_base::task_cleanup::~task_cleanup() inline std::size_t reactor_scheduler_base::do_one( - std::unique_lock& lock, long timeout_us, context_type* ctx) + lock_type& lock, long timeout_us, context_type* ctx) { for (;;) { - if (stopped_) + if (stopped_.load(std::memory_order_acquire)) return 0; scheduler_op* op = completed_ops_.pop(); diff --git a/include/boost/corosio/native/detail/select/select_scheduler.hpp b/include/boost/corosio/native/detail/select/select_scheduler.hpp index f87858659..ad0285ed2 100644 --- a/include/boost/corosio/native/detail/select/select_scheduler.hpp +++ b/include/boost/corosio/native/detail/select/select_scheduler.hpp @@ -127,7 +127,7 @@ class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base private: void - run_task(std::unique_lock& lock, context_type* ctx, + run_task(lock_type& lock, context_type* ctx, long timeout_us) override; void interrupt_reactor() const override; long calculate_timeout(long requested_timeout_us) const; @@ -214,17 +214,18 @@ select_scheduler::register_descriptor( desc->registered_events = reactor_event_read | reactor_event_write; desc->fd = fd; desc->scheduler_ = this; + desc->mutex.set_enabled(!single_threaded_); desc->ready_events_.store(0, std::memory_order_relaxed); { - std::lock_guard lock(desc->mutex); + conditionally_enabled_mutex::scoped_lock lock(desc->mutex); desc->impl_ref_.reset(); desc->read_ready = false; desc->write_ready = false; } { - std::lock_guard lock(mutex_); + mutex_type::scoped_lock lock(mutex_); registered_descs_[fd] = desc; if (fd > max_fd_) max_fd_ = fd; @@ -236,7 +237,7 @@ select_scheduler::register_descriptor( inline void select_scheduler::deregister_descriptor(int fd) const { - std::lock_guard lock(mutex_); + mutex_type::scoped_lock lock(mutex_); auto it = registered_descs_.find(fd); if (it == registered_descs_.end()) @@ -303,7 +304,7 @@ select_scheduler::calculate_timeout(long requested_timeout_us) const inline void select_scheduler::run_task( - std::unique_lock& lock, context_type* ctx, long timeout_us) + lock_type& lock, context_type* ctx, long timeout_us) { long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(timeout_us); @@ -326,7 +327,7 @@ select_scheduler::run_task( { if (snapshot_count < FD_SETSIZE) { - std::lock_guard desc_lock(desc->mutex); + conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex); snapshot[snapshot_count].fd = fd; snapshot[snapshot_count].desc = desc; snapshot[snapshot_count].needs_write = diff --git a/include/boost/corosio/native/native_io_context.hpp b/include/boost/corosio/native/native_io_context.hpp index 606a176c3..14daefd6c 100644 --- a/include/boost/corosio/native/native_io_context.hpp +++ b/include/boost/corosio/native/native_io_context.hpp @@ -85,6 +85,20 @@ class native_io_context : public io_context { } + /** Construct with runtime tuning options. + + @param opts Runtime options controlling scheduler and + service behavior. + @param concurrency_hint Hint for the number of threads that + will call `run()`. + */ + explicit native_io_context( + io_context_options const& opts, + unsigned concurrency_hint = std::thread::hardware_concurrency()) + : io_context(Backend, opts, concurrency_hint) + { + } + // Non-copyable, non-movable native_io_context(native_io_context const&) = delete; native_io_context& operator=(native_io_context const&) = delete; diff --git a/include/boost/corosio/native/native_scheduler.hpp b/include/boost/corosio/native/native_scheduler.hpp index 0e8d723b8..e0b35e629 100644 --- a/include/boost/corosio/native/native_scheduler.hpp +++ b/include/boost/corosio/native/native_scheduler.hpp @@ -28,6 +28,9 @@ struct native_scheduler : scheduler { /// Store the timer service pointer, set during construction. timer_service* timer_svc_ = nullptr; + + /// True when single-threaded (lockless) mode is active. + bool single_threaded_ = false; }; } // namespace boost::corosio::detail diff --git a/perf/bench/asio/callback/io_context_bench.cpp b/perf/bench/asio/callback/io_context_bench.cpp index 9daaafad6..b279fa33a 100644 --- a/perf/bench/asio/callback/io_context_bench.cpp +++ b/perf/bench/asio/callback/io_context_bench.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -175,6 +176,59 @@ bench_concurrent_post_run(bench::state& state) state.counters["threads"] = num_threads; } +void +bench_single_threaded_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + asio::post(ioc, [&counter] { ++counter; }); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +void +bench_interleaved_lockless(bench::state& state) +{ + int handlers_per_iteration = 100; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + int64_t counter = 0; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < handlers_per_iteration; ++i) + asio::post(ioc, [&counter] { ++counter; }); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + } // anonymous namespace bench::benchmark_suite @@ -194,7 +248,9 @@ make_io_context_suite() .args({8}) .add("interleaved", bench_interleaved_post_run) .add("concurrent", bench_concurrent_post_run) - .args({4}); + .args({4}) + .add("single_threaded_lockless", bench_single_threaded_lockless) + .add("interleaved_lockless", bench_interleaved_lockless); } } // namespace asio_callback_bench diff --git a/perf/bench/asio/coroutine/io_context_bench.cpp b/perf/bench/asio/coroutine/io_context_bench.cpp index e502fd5fd..851f1d6dc 100644 --- a/perf/bench/asio/coroutine/io_context_bench.cpp +++ b/perf/bench/asio/coroutine/io_context_bench.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -187,6 +188,59 @@ bench_concurrent_post_run(bench::state& state) state.counters["threads"] = num_threads; } +void +bench_single_threaded_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + asio::co_spawn(ioc, increment_task(counter), asio::detached); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +void +bench_interleaved_lockless(bench::state& state) +{ + int handlers_per_iteration = 100; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + int64_t counter = 0; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < handlers_per_iteration; ++i) + asio::co_spawn(ioc, increment_task(counter), asio::detached); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + } // anonymous namespace bench::benchmark_suite @@ -206,7 +260,9 @@ make_io_context_suite() .args({8}) .add("interleaved", bench_interleaved_post_run) .add("concurrent", bench_concurrent_post_run) - .args({4}); + .args({4}) + .add("single_threaded_lockless", bench_single_threaded_lockless) + .add("interleaved_lockless", bench_interleaved_lockless); } } // namespace asio_bench diff --git a/perf/bench/corosio/accept_churn_bench.cpp b/perf/bench/corosio/accept_churn_bench.cpp index 1506a4c3b..a43efd468 100644 --- a/perf/bench/corosio/accept_churn_bench.cpp +++ b/perf/bench/corosio/accept_churn_bench.cpp @@ -124,6 +124,89 @@ bench_sequential_churn(bench::state& state) acc.close(); } +template +void +bench_sequential_churn_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + using acceptor_type = corosio::native_tcp_acceptor; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + acceptor_type acc(ioc); + acc.open(); + acc.set_option(corosio::native_socket_option::reuse_address(true)); + + if (auto ec = + acc.bind(corosio::endpoint(corosio::ipv4_address::loopback(), 0))) + { + std::cerr << " Bind failed: " << ec.message() << "\n"; + return; + } + if (auto ec = acc.listen()) + { + std::cerr << " Listen failed: " << ec.message() << "\n"; + return; + } + + auto ep = acc.local_endpoint(); + + auto task = [&]() -> capy::task<> { + while (state.running()) + { + auto lp = state.lap(); + + socket_type client(ioc); + socket_type server(ioc); + client.open(); + configure_churn_socket(client); + + capy::run_async(ioc.get_executor())( + [](socket_type& c, corosio::endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + (void)ec; + }(client, ep)); + + auto [aec] = co_await acc.accept(server); + if (aec) + co_return; + + char byte = 'X'; + auto [wec, wn] = + co_await capy::write(client, capy::const_buffer(&byte, 1)); + if (wec) + co_return; + + char recv = 0; + auto [rec, rn] = + co_await capy::read(server, capy::mutable_buffer(&recv, 1)); + if (rec) + co_return; + + client.close(); + server.close(); + } + }; + + perf::stopwatch sw; + + capy::run_async(ioc.get_executor())(task()); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + ioc.stop(); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + acc.close(); +} + // N independent accept loops on separate listeners template void @@ -303,6 +386,92 @@ bench_burst_churn(bench::state& state) acc.close(); } +template +void +bench_burst_churn_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + using acceptor_type = corosio::native_tcp_acceptor; + + int burst_size = static_cast(state.range(0)); + state.counters["burst_size"] = burst_size; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + acceptor_type acc(ioc); + acc.open(); + acc.set_option(corosio::native_socket_option::reuse_address(true)); + + if (auto ec = + acc.bind(corosio::endpoint(corosio::ipv4_address::loopback(), 0))) + { + std::cerr << " Bind failed: " << ec.message() << "\n"; + return; + } + if (auto ec = acc.listen()) + { + std::cerr << " Listen failed: " << ec.message() << "\n"; + return; + } + + auto ep = acc.local_endpoint(); + + auto task = [&]() -> capy::task<> { + while (state.running()) + { + auto lp = state.lap(); + + std::vector clients; + std::vector servers; + clients.reserve(burst_size); + servers.reserve(burst_size); + + for (int i = 0; i < burst_size; ++i) + { + clients.emplace_back(ioc); + clients.back().open(); + configure_churn_socket(clients.back()); + capy::run_async(ioc.get_executor())( + [](socket_type& c, corosio::endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + (void)ec; + }(clients.back(), ep)); + } + + for (int i = 0; i < burst_size; ++i) + { + servers.emplace_back(ioc); + auto [aec] = co_await acc.accept(servers.back()); + if (aec) + co_return; + } + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); + } + }; + + perf::stopwatch sw; + + capy::run_async(ioc.get_executor())(task()); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + ioc.stop(); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); + acc.close(); +} + } // anonymous namespace template @@ -312,9 +481,12 @@ make_accept_churn_suite() using F = bench::bench_flags; return bench::benchmark_suite("accept_churn", F::needs_conntrack_drain) .add("sequential", bench_sequential_churn) + .add("sequential_lockless", bench_sequential_churn_lockless) .add("concurrent", bench_concurrent_churn) .args({1, 4, 16}) .add("burst", bench_burst_churn) + .args({10, 100}) + .add("burst_lockless", bench_burst_churn_lockless) .args({10, 100}); } diff --git a/perf/bench/corosio/fan_out_bench.cpp b/perf/bench/corosio/fan_out_bench.cpp index 5f497cdd2..a18c754be 100644 --- a/perf/bench/corosio/fan_out_bench.cpp +++ b/perf/bench/corosio/fan_out_bench.cpp @@ -325,6 +325,260 @@ bench_concurrent_parents(bench::state& state) state.set_elapsed(sw.elapsed_seconds()); } +template +void +bench_fork_join_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + using timer_type = corosio::native_timer; + + int fan_out = static_cast(state.range(0)); + state.counters["fan_out"] = fan_out; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + + std::vector clients; + std::vector servers; + clients.reserve(fan_out); + servers.reserve(fan_out); + + for (int i = 0; i < fan_out; ++i) + { + auto [c, s] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + c.set_option(corosio::native_socket_option::no_delay(true)); + s.set_option(corosio::native_socket_option::no_delay(true)); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < fan_out; ++i) + capy::run_async(ioc.get_executor())(echo_server(servers[i])); + + auto parent = [&]() -> capy::task<> { + timer_type t(ioc); + while (state.running()) + { + auto lp = state.lap(); + + std::atomic remaining{fan_out}; + for (int i = 0; i < fan_out; ++i) + capy::run_async(ioc.get_executor())( + sub_request(clients[i], remaining)); + + while (remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + auto [ec] = co_await t.wait(); + (void)ec; + } + } + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); + }; + + perf::stopwatch sw; + + capy::run_async(ioc.get_executor())(parent()); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + +template +void +bench_nested_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + using timer_type = corosio::native_timer; + + int groups = static_cast(state.range(0)); + int subs_per_group = 4; + int total_subs = groups * subs_per_group; + + state.counters["groups"] = groups; + state.counters["subs_per_group"] = subs_per_group; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + + std::vector clients; + std::vector servers; + clients.reserve(total_subs); + servers.reserve(total_subs); + + for (int i = 0; i < total_subs; ++i) + { + auto [c, s] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + c.set_option(corosio::native_socket_option::no_delay(true)); + s.set_option(corosio::native_socket_option::no_delay(true)); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < total_subs; ++i) + capy::run_async(ioc.get_executor())(echo_server(servers[i])); + + auto group_task = [&](int base_idx, int n, + std::atomic& groups_remaining) -> capy::task<> { + std::atomic subs_remaining{n}; + for (int i = 0; i < n; ++i) + capy::run_async(ioc.get_executor())( + sub_request(clients[base_idx + i], subs_remaining)); + + timer_type t(ioc); + while (subs_remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + auto [ec] = co_await t.wait(); + (void)ec; + } + + groups_remaining.fetch_sub(1, std::memory_order_release); + }; + + auto parent = [&]() -> capy::task<> { + timer_type t(ioc); + while (state.running()) + { + auto lp = state.lap(); + + std::atomic groups_remaining{groups}; + for (int g = 0; g < groups; ++g) + capy::run_async(ioc.get_executor())(group_task( + g * subs_per_group, subs_per_group, groups_remaining)); + + while (groups_remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + auto [ec] = co_await t.wait(); + (void)ec; + } + } + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); + }; + + perf::stopwatch sw; + + capy::run_async(ioc.get_executor())(parent()); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + +template +void +bench_concurrent_parents_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + using timer_type = corosio::native_timer; + + int num_parents = static_cast(state.range(0)); + int fan_out = 16; + int total_subs = num_parents * fan_out; + + state.counters["num_parents"] = num_parents; + state.counters["fan_out"] = fan_out; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + + std::vector clients; + std::vector servers; + clients.reserve(total_subs); + servers.reserve(total_subs); + + for (int i = 0; i < total_subs; ++i) + { + auto [c, s] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + c.set_option(corosio::native_socket_option::no_delay(true)); + s.set_option(corosio::native_socket_option::no_delay(true)); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < total_subs; ++i) + capy::run_async(ioc.get_executor())(echo_server(servers[i])); + + std::atomic parents_done{0}; + + auto parent_task = [&](int parent_idx) -> capy::task<> { + int base = parent_idx * fan_out; + timer_type t(ioc); + + while (state.running()) + { + auto lp = state.lap(); + + std::atomic remaining{fan_out}; + for (int i = 0; i < fan_out; ++i) + capy::run_async(ioc.get_executor())( + sub_request(clients[base + i], remaining)); + + while (remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + auto [ec] = co_await t.wait(); + (void)ec; + } + } + + if (parents_done.fetch_add(1, std::memory_order_acq_rel) == + num_parents - 1) + { + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); + } + }; + + perf::stopwatch sw; + + for (int p = 0; p < num_parents; ++p) + capy::run_async(ioc.get_executor())(parent_task(p)); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + } // anonymous namespace template @@ -335,9 +589,15 @@ make_fan_out_suite() return bench::benchmark_suite("fan_out", F::needs_conntrack_drain) .add("fork_join", bench_fork_join) .args({1, 4, 16, 64}) + .add("fork_join_lockless", bench_fork_join_lockless) + .args({1, 4, 16, 64}) .add("nested", bench_nested) .args({4, 16}) + .add("nested_lockless", bench_nested_lockless) + .args({4, 16}) .add("concurrent_parents", bench_concurrent_parents) + .args({1, 4, 16}) + .add("concurrent_parents_lockless", bench_concurrent_parents_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/corosio/http_server_bench.cpp b/perf/bench/corosio/http_server_bench.cpp index 64ed21888..8d90546b2 100644 --- a/perf/bench/corosio/http_server_bench.cpp +++ b/perf/bench/corosio/http_server_bench.cpp @@ -148,6 +148,39 @@ bench_single_connection(bench::state& state) server.close(); } +template +void +bench_single_connection_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + auto [client, server] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + + client.set_option(corosio::native_socket_option::no_delay(true)); + server.set_option(corosio::native_socket_option::no_delay(true)); + + capy::run_async(ioc.get_executor())(server_task(server)); + capy::run_async(ioc.get_executor())(client_task(client, state)); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + template void bench_concurrent_connections(bench::state& state) @@ -308,6 +341,7 @@ make_http_server_suite() s.close(); }) .add("single_conn", bench_single_connection) + .add("single_conn_lockless", bench_single_connection_lockless) .add("concurrent", bench_concurrent_connections) .args({1, 4, 16, 32}) .add("multithread", bench_multithread) diff --git a/perf/bench/corosio/io_context_bench.cpp b/perf/bench/corosio/io_context_bench.cpp index 2e3b8cdbd..15aab9051 100644 --- a/perf/bench/corosio/io_context_bench.cpp +++ b/perf/bench/corosio/io_context_bench.cpp @@ -193,6 +193,135 @@ bench_concurrent_post_run(bench::state& state) state.counters["threads"] = num_threads; } +// Configuration variant: high inline budget +template +void +bench_high_inline_budget(bench::state& state) +{ + corosio::io_context_options opts; + opts.inline_budget_max = 64; + + corosio::native_io_context ioc(opts); + auto ex = ioc.get_executor(); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + capy::run_async(ex)(increment_task(counter)); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +// Configuration variant: large event buffer +template +void +bench_large_event_buffer(bench::state& state) +{ + corosio::io_context_options opts; + opts.max_events_per_poll = 512; + + corosio::native_io_context ioc(opts); + auto ex = ioc.get_executor(); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + capy::run_async(ex)(increment_task(counter)); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +// Lockless variant of single_threaded (batch 1000) +template +void +bench_single_threaded_lockless(bench::state& state) +{ + corosio::io_context_options opts; + opts.single_threaded = true; + + corosio::native_io_context ioc(opts); + auto ex = ioc.get_executor(); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + capy::run_async(ex)(increment_task(counter)); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +// Lockless variant of interleaved (batch 100) +template +void +bench_interleaved_lockless(bench::state& state) +{ + corosio::io_context_options opts; + opts.single_threaded = true; + + int handlers_per_iteration = 100; + + corosio::native_io_context ioc(opts); + auto ex = ioc.get_executor(); + int64_t counter = 0; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < handlers_per_iteration; ++i) + capy::run_async(ex)(increment_task(counter)); + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + } // anonymous namespace template @@ -214,7 +343,11 @@ make_io_context_suite() .args({8}) .add("interleaved", bench_interleaved_post_run) .add("concurrent", bench_concurrent_post_run) - .args({4}); + .args({4}) + .add("high_inline_budget", bench_high_inline_budget) + .add("large_event_buffer", bench_large_event_buffer) + .add("single_threaded_lockless", bench_single_threaded_lockless) + .add("interleaved_lockless", bench_interleaved_lockless); } } // namespace corosio_bench diff --git a/perf/bench/corosio/socket_latency_bench.cpp b/perf/bench/corosio/socket_latency_bench.cpp index abc5bb7bd..647f82f9b 100644 --- a/perf/bench/corosio/socket_latency_bench.cpp +++ b/perf/bench/corosio/socket_latency_bench.cpp @@ -157,6 +157,95 @@ bench_concurrent_latency(bench::state& state) s.close(); } +template +void +bench_pingpong_latency_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + + auto message_size = static_cast(state.range(0)); + state.counters["message_size"] = static_cast(message_size); + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + auto [client, server] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + + client.set_option(corosio::native_socket_option::no_delay(true)); + server.set_option(corosio::native_socket_option::no_delay(true)); + + capy::run_async(ioc.get_executor())( + pingpong_client_task(client, server, message_size, state)); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + +template +void +bench_concurrent_latency_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + + int num_pairs = static_cast(state.range(0)); + state.counters["num_pairs"] = num_pairs; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + + std::vector clients; + std::vector servers; + + clients.reserve(num_pairs); + servers.reserve(num_pairs); + + for (int i = 0; i < num_pairs; ++i) + { + auto [c, s] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + c.set_option(corosio::native_socket_option::no_delay(true)); + s.set_option(corosio::native_socket_option::no_delay(true)); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int p = 0; p < num_pairs; ++p) + { + capy::run_async(ioc.get_executor())( + pingpong_client_task(clients[p], servers[p], 64, state)); + } + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); +} + } // anonymous namespace template @@ -188,7 +277,11 @@ make_socket_latency_suite() }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) + .add("pingpong_lockless", bench_pingpong_latency_lockless) + .args({1, 64, 1024}) .add("concurrent", bench_concurrent_latency) + .args({1, 4, 16}) + .add("concurrent_lockless", bench_concurrent_latency_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/corosio/socket_throughput_bench.cpp b/perf/bench/corosio/socket_throughput_bench.cpp index 8a6c7d5ef..c906d7e8a 100644 --- a/perf/bench/corosio/socket_throughput_bench.cpp +++ b/perf/bench/corosio/socket_throughput_bench.cpp @@ -239,6 +239,160 @@ mt_read_coro( } } +template +void +bench_throughput_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + auto [writer, reader] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + + set_nodelay(writer); + set_nodelay(reader); + + std::vector write_buf(chunk_size, 'x'); + std::vector read_buf(chunk_size); + + std::atomic running{true}; + + auto write_task = [&]() -> capy::task<> { + while (running.load(std::memory_order_relaxed)) + { + auto [ec, n] = co_await writer.write_some( + capy::const_buffer(write_buf.data(), chunk_size)); + if (ec) + break; + } + writer.shutdown(corosio::tcp_socket::shutdown_send); + }; + + auto read_task = [&]() -> capy::task<> { + for (;;) + { + auto [ec, n] = co_await reader.read_some( + capy::mutable_buffer(read_buf.data(), read_buf.size())); + if (ec || n == 0) + break; + state.add_bytes(static_cast(n)); + } + }; + + perf::stopwatch sw; + + capy::run_async(ioc.get_executor())(write_task()); + capy::run_async(ioc.get_executor())(read_task()); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + writer.close(); + reader.close(); +} + +template +void +bench_bidirectional_throughput_lockless(bench::state& state) +{ + using socket_type = corosio::native_tcp_socket; + + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + auto [sock1, sock2] = corosio::test::make_socket_pair< + socket_type, corosio::native_tcp_acceptor>(ioc); + + set_nodelay(sock1); + set_nodelay(sock2); + + std::vector buf1(chunk_size, 'a'); + std::vector buf2(chunk_size, 'b'); + + std::atomic running{true}; + + auto write1_task = [&]() -> capy::task<> { + while (running.load(std::memory_order_relaxed)) + { + auto [ec, n] = co_await sock1.write_some( + capy::const_buffer(buf1.data(), chunk_size)); + if (ec) + break; + } + sock1.shutdown(corosio::tcp_socket::shutdown_send); + }; + + auto read1_task = [&]() -> capy::task<> { + std::vector rbuf(chunk_size); + for (;;) + { + auto [ec, n] = co_await sock2.read_some( + capy::mutable_buffer(rbuf.data(), rbuf.size())); + if (ec || n == 0) + break; + state.add_bytes(static_cast(n)); + } + }; + + auto write2_task = [&]() -> capy::task<> { + while (running.load(std::memory_order_relaxed)) + { + auto [ec, n] = co_await sock2.write_some( + capy::const_buffer(buf2.data(), chunk_size)); + if (ec) + break; + } + sock2.shutdown(corosio::tcp_socket::shutdown_send); + }; + + auto read2_task = [&]() -> capy::task<> { + std::vector rbuf(chunk_size); + for (;;) + { + auto [ec, n] = co_await sock1.read_some( + capy::mutable_buffer(rbuf.data(), rbuf.size())); + if (ec || n == 0) + break; + state.add_bytes(static_cast(n)); + } + }; + + perf::stopwatch sw; + + capy::run_async(ioc.get_executor())(write1_task()); + capy::run_async(ioc.get_executor())(read1_task()); + capy::run_async(ioc.get_executor())(write2_task()); + capy::run_async(ioc.get_executor())(read2_task()); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + sock1.close(); + sock2.close(); +} + template void bench_multithread_throughput(bench::state& state) @@ -351,8 +505,12 @@ make_socket_throughput_suite() }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) + .add("unidirectional_lockless", bench_throughput_lockless) + .range(1024, 1048576, 4) .add("bidirectional", bench_bidirectional_throughput) .range(1024, 1048576, 4) + .add("bidirectional_lockless", bench_bidirectional_throughput_lockless) + .range(1024, 1048576, 4) .add("multithread", bench_multithread_throughput) .args({2, 4, 8}); } diff --git a/perf/bench/corosio/timer_bench.cpp b/perf/bench/corosio/timer_bench.cpp index 5c4e9deae..cc1036265 100644 --- a/perf/bench/corosio/timer_bench.cpp +++ b/perf/bench/corosio/timer_bench.cpp @@ -104,6 +104,83 @@ bench_fire_rate(bench::state& state) state.add_items(counter); } +template +void +bench_schedule_cancel_lockless(bench::state& state) +{ + using timer_type = corosio::native_timer; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + { + timer_type t(ioc); + t.expires_after(std::chrono::hours(1)); + t.cancel(); + ++counter; + } + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +template +void +bench_fire_rate_lockless(bench::state& state) +{ + using timer_type = corosio::native_timer; + + corosio::io_context_options opts; + opts.single_threaded = true; + corosio::native_io_context ioc(opts); + std::atomic running{true}; + int64_t counter = 0; + + auto task = [&]() -> capy::task<> { + timer_type t(ioc); + while (running.load(std::memory_order_relaxed)) + { + t.expires_after(std::chrono::nanoseconds(0)); + auto [ec] = co_await t.wait(); + if (ec) + co_return; + ++counter; + } + }; + + perf::stopwatch sw; + + capy::run_async(ioc.get_executor())(task()); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + // N timers with staggered intervals (100us–1000us) firing concurrently. // Stresses the timer heap under contention. template @@ -168,7 +245,9 @@ make_timer_suite() { return bench::benchmark_suite("timer") .add("schedule_cancel", bench_schedule_cancel) + .add("schedule_cancel_lockless", bench_schedule_cancel_lockless) .add("fire_rate", bench_fire_rate) + .add("fire_rate_lockless", bench_fire_rate_lockless) .add("concurrent", bench_concurrent_timers) .args({10, 100, 1000}); } diff --git a/src/corosio/src/io_context.cpp b/src/corosio/src/io_context.cpp index 6cee5676e..72b74e75b 100644 --- a/src/corosio/src/io_context.cpp +++ b/src/corosio/src/io_context.cpp @@ -10,7 +10,9 @@ #include #include +#include +#include #include #if BOOST_COROSIO_HAS_EPOLL @@ -108,23 +110,104 @@ iocp_t::construct(capy::execution_context& ctx, unsigned concurrency_hint) } #endif -io_context::io_context() : io_context(std::thread::hardware_concurrency()) {} +namespace { -io_context::io_context(unsigned concurrency_hint) - : capy::execution_context(this) - , sched_(nullptr) +// Pre-create services that must exist before construct() runs. +void +pre_create_services( + capy::execution_context& ctx, + io_context_options const& opts) +{ +#if BOOST_COROSIO_POSIX + if (opts.thread_pool_size < 1) + throw std::invalid_argument( + "thread_pool_size must be at least 1"); + // Pre-create the shared thread pool with the configured size. + // This must happen before construct() because the scheduler + // constructor creates file and resolver services that call + // get_or_create_pool(), which would create a 1-thread pool. + if (opts.thread_pool_size != 1) + ctx.make_service(opts.thread_pool_size); +#endif + + (void)ctx; + (void)opts; +} + +// Apply runtime tuning to the scheduler after construction. +void +apply_scheduler_options( + detail::scheduler& sched, + io_context_options const& opts) { +#if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_KQUEUE || BOOST_COROSIO_HAS_SELECT + auto& reactor = + static_cast(sched); + reactor.configure_reactor( + opts.max_events_per_poll, + opts.inline_budget_initial, + opts.inline_budget_max, + opts.unassisted_budget); + if (opts.single_threaded) + reactor.configure_single_threaded(true); +#endif + #if BOOST_COROSIO_HAS_IOCP - sched_ = &iocp_t::construct(*this, concurrency_hint); + static_cast(sched).configure_iocp( + opts.gqcs_timeout_ms); +#endif + + (void)sched; + (void)opts; +} + +detail::scheduler& +construct_default(capy::execution_context& ctx, unsigned concurrency_hint) +{ +#if BOOST_COROSIO_HAS_IOCP + return iocp_t::construct(ctx, concurrency_hint); #elif BOOST_COROSIO_HAS_EPOLL - sched_ = &epoll_t::construct(*this, concurrency_hint); + return epoll_t::construct(ctx, concurrency_hint); #elif BOOST_COROSIO_HAS_KQUEUE - sched_ = &kqueue_t::construct(*this, concurrency_hint); + return kqueue_t::construct(ctx, concurrency_hint); #elif BOOST_COROSIO_HAS_SELECT - sched_ = &select_t::construct(*this, concurrency_hint); + return select_t::construct(ctx, concurrency_hint); #endif } +} // anonymous namespace + +io_context::io_context() : io_context(std::thread::hardware_concurrency()) {} + +io_context::io_context(unsigned concurrency_hint) + : capy::execution_context(this) + , sched_(&construct_default(*this, concurrency_hint)) +{ +} + +io_context::io_context( + io_context_options const& opts, + unsigned concurrency_hint) + : capy::execution_context(this) + , sched_(nullptr) +{ + pre_create_services(*this, opts); + sched_ = &construct_default(*this, concurrency_hint); + apply_scheduler_options(*sched_, opts); +} + +void +io_context::apply_options_pre_(io_context_options const& opts) +{ + pre_create_services(*this, opts); +} + +void +io_context::apply_options_post_(io_context_options const& opts) +{ + apply_scheduler_options(*sched_, opts); +} + io_context::~io_context() { shutdown();