diff --git a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp index 2efe82a0c..ff5fb71b3 100644 --- a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp +++ b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp @@ -117,7 +117,8 @@ class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base private: void - run_task(std::unique_lock& lock, context_type* ctx) override; + run_task(std::unique_lock& lock, context_type* ctx, + long timeout_us) override; void interrupt_reactor() const override; void update_timerfd() const; @@ -294,9 +295,16 @@ epoll_scheduler::update_timerfd() const } inline void -epoll_scheduler::run_task(std::unique_lock& lock, context_type* ctx) +epoll_scheduler::run_task( + std::unique_lock& lock, context_type* ctx, long timeout_us) { - int timeout_ms = task_interrupted_ ? 0 : -1; + int timeout_ms; + if (task_interrupted_) + timeout_ms = 0; + else if (timeout_us < 0) + timeout_ms = -1; + else + timeout_ms = static_cast((timeout_us + 999) / 1000); if (lock.owns_lock()) lock.unlock(); diff --git a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp index a16a46b0c..018d7d2b5 100644 --- a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp +++ b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp @@ -139,7 +139,8 @@ class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler_base private: void - run_task(std::unique_lock& lock, context_type* ctx) override; + run_task(std::unique_lock& lock, context_type* ctx, + long timeout_us) override; void interrupt_reactor() const override; long calculate_timeout(long requested_timeout_us) const; @@ -285,9 +286,10 @@ kqueue_scheduler::calculate_timeout(long requested_timeout_us) const inline void kqueue_scheduler::run_task( - std::unique_lock& lock, context_type* ctx) + std::unique_lock& lock, context_type* ctx, long timeout_us) { - long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1); + long effective_timeout_us = + task_interrupted_ ? 0 : calculate_timeout(timeout_us); if (lock.owns_lock()) lock.unlock(); diff --git a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp index 8eeb25281..d1fd2e798 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp @@ -279,7 +279,8 @@ class reactor_scheduler_base /// Run the platform-specific reactor poll. virtual void - run_task(std::unique_lock& lock, context_type* ctx) = 0; + run_task(std::unique_lock& lock, context_type* ctx, + long timeout_us) = 0; /// Wake a blocked reactor (e.g. write to eventfd or pipe). virtual void interrupt_reactor() const = 0; @@ -775,7 +776,8 @@ reactor_scheduler_base::do_one( return 0; } - task_interrupted_ = more_handlers || timeout_us == 0; + long task_timeout_us = more_handlers ? 0 : timeout_us; + task_interrupted_ = task_timeout_us == 0; task_running_.store(true, std::memory_order_release); if (more_handlers) @@ -783,7 +785,7 @@ reactor_scheduler_base::do_one( try { - run_task(lock, ctx); + run_task(lock, ctx, task_timeout_us); } catch (...) { @@ -793,6 +795,8 @@ reactor_scheduler_base::do_one( task_running_.store(false, std::memory_order_relaxed); completed_ops_.push(&task_op_); + if (timeout_us > 0) + return 0; continue; } diff --git a/include/boost/corosio/native/detail/select/select_scheduler.hpp b/include/boost/corosio/native/detail/select/select_scheduler.hpp index f818d7e41..f87858659 100644 --- a/include/boost/corosio/native/detail/select/select_scheduler.hpp +++ b/include/boost/corosio/native/detail/select/select_scheduler.hpp @@ -127,7 +127,8 @@ class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base private: void - run_task(std::unique_lock& lock, context_type* ctx) override; + run_task(std::unique_lock& lock, context_type* ctx, + long timeout_us) override; void interrupt_reactor() const override; long calculate_timeout(long requested_timeout_us) const; @@ -302,9 +303,10 @@ select_scheduler::calculate_timeout(long requested_timeout_us) const inline void select_scheduler::run_task( - std::unique_lock& lock, context_type* ctx) + std::unique_lock& lock, context_type* ctx, long timeout_us) { - long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1); + long effective_timeout_us = + task_interrupted_ ? 0 : calculate_timeout(timeout_us); // Snapshot registered descriptors while holding lock. // Record which fds need write monitoring to avoid a hot loop: diff --git a/test/unit/io_context.cpp b/test/unit/io_context.cpp index 9dc768011..cb4ff5586 100644 --- a/test/unit/io_context.cpp +++ b/test/unit/io_context.cpp @@ -463,6 +463,50 @@ struct io_context_test BOOST_TEST(counter == 1); } + void testRunForWithOutstandingWork() + { + io_context ioc; + auto ex = ioc.get_executor(); + + // Simulate persistent outstanding work (like a listening acceptor) + ex.on_work_started(); + + auto start = std::chrono::steady_clock::now(); + std::size_t n = ioc.run_for(std::chrono::milliseconds(200)); + auto elapsed = std::chrono::steady_clock::now() - start; + + auto ms = std::chrono::duration_cast(elapsed) + .count(); + + // Must return after ~200ms, not block forever + BOOST_TEST(n == 0); + BOOST_TEST(ms >= 150); + BOOST_TEST(ms < 1000); + + ex.on_work_finished(); + } + + void testRunOneForWithOutstandingWork() + { + io_context ioc; + auto ex = ioc.get_executor(); + + ex.on_work_started(); + + auto start = std::chrono::steady_clock::now(); + std::size_t n = ioc.run_one_for(std::chrono::milliseconds(200)); + auto elapsed = std::chrono::steady_clock::now() - start; + + auto ms = std::chrono::duration_cast(elapsed) + .count(); + + BOOST_TEST(n == 0); + BOOST_TEST(ms >= 150); + BOOST_TEST(ms < 1000); + + ex.on_work_finished(); + } + void testExecutorRunningInThisThread() { io_context ioc; @@ -610,6 +654,8 @@ struct io_context_test testRunOneFor(); testRunOneUntil(); testRunFor(); + testRunForWithOutstandingWork(); + testRunOneForWithOutstandingWork(); testExecutorRunningInThisThread(); testMultithreaded(); testMultithreadedStress();