include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

80.1% Lines (395/493) 87.5% List of functions (42/48)
f(x) Functions (48)
Function Calls Lines Branches Blocks
boost::corosio::detail::epoll_scheduler::task_op::operator()() :318 0 0.0% boost::corosio::detail::epoll_scheduler::task_op::destroy() :319 0 0.0% boost::corosio::detail::epoll::scheduler_context::scheduler_context(boost::corosio::detail::epoll_scheduler const*, boost::corosio::detail::epoll::scheduler_context*) :404 0 100.0% boost::corosio::detail::epoll::thread_context_guard::thread_context_guard(boost::corosio::detail::epoll_scheduler const*) :421 0 100.0% boost::corosio::detail::epoll::thread_context_guard::~thread_context_guard() :427 0 66.7% boost::corosio::detail::epoll::find_context(boost::corosio::detail::epoll_scheduler const*) :437 0 100.0% boost::corosio::detail::epoll_scheduler::reset_inline_budget() const :448 0 54.5% boost::corosio::detail::epoll_scheduler::try_consume_inline_budget() const :472 0 100.0% boost::corosio::detail::descriptor_state::operator()() :486 0 68.8% boost::corosio::detail::epoll_scheduler::epoll_scheduler(boost::capy::execution_context&, int) :608 0 58.1% boost::corosio::detail::epoll_scheduler::epoll_scheduler(boost::capy::execution_context&, int)::{lambda(void*)#1}::operator()(void*) const :665 0 90.0% boost::corosio::detail::epoll_scheduler::~epoll_scheduler() :682 0 100.0% boost::corosio::detail::epoll_scheduler::shutdown() :693 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const :715 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :721 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :723 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :725 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :732 0 100.0% boost::corosio::detail::epoll_scheduler::post(boost::corosio::detail::scheduler_op*) const :760 0 100.0% boost::corosio::detail::epoll_scheduler::running_in_this_thread() const :780 0 100.0% boost::corosio::detail::epoll_scheduler::stop() :789 0 100.0% boost::corosio::detail::epoll_scheduler::stopped() const :801 0 100.0% boost::corosio::detail::epoll_scheduler::restart() :808 0 100.0% boost::corosio::detail::epoll_scheduler::run() :815 0 100.0% boost::corosio::detail::epoll_scheduler::run_one() :840 0 75.0% boost::corosio::detail::epoll_scheduler::wait_one(long) :854 0 100.0% boost::corosio::detail::epoll_scheduler::poll() :868 0 100.0% boost::corosio::detail::epoll_scheduler::poll_one() :893 0 100.0% boost::corosio::detail::epoll_scheduler::register_descriptor(int, boost::corosio::detail::descriptor_state*) const :907 0 92.3% boost::corosio::detail::epoll_scheduler::deregister_descriptor(int) const :926 0 100.0% boost::corosio::detail::epoll_scheduler::work_started() :932 0 100.0% boost::corosio::detail::epoll_scheduler::work_finished() :938 0 100.0% boost::corosio::detail::epoll_scheduler::compensating_work_started() const :945 0 100.0% boost::corosio::detail::epoll_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :953 0 0.0% boost::corosio::detail::epoll_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :963 0 30.0% boost::corosio::detail::epoll_scheduler::interrupt_reactor() const :982 0 100.0% boost::corosio::detail::epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const :996 0 100.0% boost::corosio::detail::epoll_scheduler::maybe_unlock_and_signal_one(std::unique_lock<std::mutex>&) const :1003 0 57.1% boost::corosio::detail::epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>&) const :1017 0 85.7% boost::corosio::detail::epoll_scheduler::clear_signal() const :1028 0 0.0% boost::corosio::detail::epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>&) const :1034 0 0.0% boost::corosio::detail::epoll_scheduler::wait_for_signal_for(std::unique_lock<std::mutex>&, long) const :1045 0 0.0% boost::corosio::detail::epoll_scheduler::wake_one_thread_and_unlock(std::unique_lock<std::mutex>&) const :1057 0 87.5% boost::corosio::detail::epoll_scheduler::work_cleanup::~work_cleanup() :1075 0 92.3% boost::corosio::detail::epoll_scheduler::task_cleanup::~task_cleanup() :1099 0 83.3% boost::corosio::detail::epoll_scheduler::update_timerfd() const :1120 0 88.9% boost::corosio::detail::epoll_scheduler::run_task(std::unique_lock<std::mutex>&, boost::corosio::detail::epoll::scheduler_context*) :1157 0 97.1% boost::corosio::detail::epoll_scheduler::do_one(std::unique_lock<std::mutex>&, long, boost::corosio::detail::epoll::scheduler_context*) :1233 0 77.8%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/native_scheduler.hpp>
21 #include <boost/corosio/detail/scheduler_op.hpp>
22
23 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28
29 #include <boost/corosio/detail/except.hpp>
30 #include <boost/corosio/detail/thread_local_ptr.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <condition_variable>
35 #include <cstddef>
36 #include <cstdint>
37 #include <limits>
38 #include <mutex>
39 #include <utility>
40
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <sys/epoll.h>
44 #include <sys/eventfd.h>
45 #include <sys/socket.h>
46 #include <sys/timerfd.h>
47 #include <unistd.h>
48
49 namespace boost::corosio::detail {
50
51 struct epoll_op;
52 struct descriptor_state;
53 namespace epoll {
54 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55 } // namespace epoll
56
57 /** Linux scheduler using epoll for I/O multiplexing.
58
59 This scheduler implements the scheduler interface using Linux epoll
60 for efficient I/O event notification. It uses a single reactor model
61 where one thread runs epoll_wait while other threads
62 wait on a condition variable for handler work. This design provides:
63
64 - Handler parallelism: N posted handlers can execute on N threads
65 - No thundering herd: condition_variable wakes exactly one thread
66 - IOCP parity: Behavior matches Windows I/O completion port semantics
67
68 When threads call run(), they first try to execute queued handlers.
69 If the queue is empty and no reactor is running, one thread becomes
70 the reactor and runs epoll_wait. Other threads wait on a condition
71 variable until handlers are available.
72
73 @par Thread Safety
74 All public member functions are thread-safe.
75 */
76 class BOOST_COROSIO_DECL epoll_scheduler final
77 : public native_scheduler
78 , public capy::execution_context::service
79 {
80 public:
81 using key_type = scheduler;
82
83 /** Construct the scheduler.
84
85 Creates an epoll instance, eventfd for reactor interruption,
86 and timerfd for kernel-managed timer expiry.
87
88 @param ctx Reference to the owning execution_context.
89 @param concurrency_hint Hint for expected thread count (unused).
90 */
91 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /// Destroy the scheduler.
94 ~epoll_scheduler() override;
95
96 epoll_scheduler(epoll_scheduler const&) = delete;
97 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98
99 void shutdown() override;
100 void post(std::coroutine_handle<> h) const override;
101 void post(scheduler_op* h) const override;
102 bool running_in_this_thread() const noexcept override;
103 void stop() override;
104 bool stopped() const noexcept override;
105 void restart() override;
106 std::size_t run() override;
107 std::size_t run_one() override;
108 std::size_t wait_one(long usec) override;
109 std::size_t poll() override;
110 std::size_t poll_one() override;
111
112 /** Return the epoll file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The epoll file descriptor.
118 */
119 int epoll_fd() const noexcept
120 {
121 return epoll_fd_;
122 }
123
124 /** Reset the thread's inline completion budget.
125
126 Called at the start of each posted completion handler to
127 grant a fresh budget for speculative inline completions.
128 */
129 void reset_inline_budget() const noexcept;
130
131 /** Consume one unit of inline budget if available.
132
133 @return True if budget was available and consumed.
134 */
135 bool try_consume_inline_budget() const noexcept;
136
137 /** Register a descriptor for persistent monitoring.
138
139 The fd is registered once and stays registered until explicitly
140 deregistered. Events are dispatched via descriptor_state which
141 tracks pending read/write/connect operations.
142
143 @param fd The file descriptor to register.
144 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145 */
146 void register_descriptor(int fd, descriptor_state* desc) const;
147
148 /** Deregister a persistently registered descriptor.
149
150 @param fd The file descriptor to deregister.
151 */
152 void deregister_descriptor(int fd) const;
153
154 void work_started() noexcept override;
155 void work_finished() noexcept override;
156
157 /** Offset a forthcoming work_finished from work_cleanup.
158
159 Called by descriptor_state when all I/O returned EAGAIN and no
160 handler will be executed. Must be called from a scheduler thread.
161 */
162 void compensating_work_started() const noexcept;
163
164 /** Drain work from thread context's private queue to global queue.
165
166 Called by thread_context_guard destructor when a thread exits run().
167 Transfers pending work to the global queue under mutex protection.
168
169 @param queue The private queue to drain.
170 @param count Item count for wakeup decisions (wakes other threads if positive).
171 */
172 void drain_thread_queue(op_queue& queue, long count) const;
173
174 /** Post completed operations for deferred invocation.
175
176 If called from a thread running this scheduler, operations go to
177 the thread's private queue (fast path). Otherwise, operations are
178 added to the global queue under mutex and a waiter is signaled.
179
180 @par Preconditions
181 work_started() must have been called for each operation.
182
183 @param ops Queue of operations to post.
184 */
185 void post_deferred_completions(op_queue& ops) const;
186
187 private:
188 struct work_cleanup
189 {
190 epoll_scheduler* scheduler;
191 std::unique_lock<std::mutex>* lock;
192 epoll::scheduler_context* ctx;
193 ~work_cleanup();
194 };
195
196 struct task_cleanup
197 {
198 epoll_scheduler const* scheduler;
199 std::unique_lock<std::mutex>* lock;
200 epoll::scheduler_context* ctx;
201 ~task_cleanup();
202 };
203
204 std::size_t do_one(
205 std::unique_lock<std::mutex>& lock,
206 long timeout_us,
207 epoll::scheduler_context* ctx);
208 void
209 run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211 void interrupt_reactor() const;
212 void update_timerfd() const;
213
214 /** Set the signaled state and wake all waiting threads.
215
216 @par Preconditions
217 Mutex must be held.
218
219 @param lock The held mutex lock.
220 */
221 void signal_all(std::unique_lock<std::mutex>& lock) const;
222
223 /** Set the signaled state and wake one waiter if any exist.
224
225 Only unlocks and signals if at least one thread is waiting.
226 Use this when the caller needs to perform a fallback action
227 (such as interrupting the reactor) when no waiters exist.
228
229 @par Preconditions
230 Mutex must be held.
231
232 @param lock The held mutex lock.
233
234 @return `true` if unlocked and signaled, `false` if lock still held.
235 */
236 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237
238 /** Set the signaled state, unlock, and wake one waiter if any exist.
239
240 Always unlocks the mutex. Use this when the caller will release
241 the lock regardless of whether a waiter exists.
242
243 @par Preconditions
244 Mutex must be held.
245
246 @param lock The held mutex lock.
247
248 @return `true` if a waiter was signaled, `false` otherwise.
249 */
250 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251
252 /** Clear the signaled state before waiting.
253
254 @par Preconditions
255 Mutex must be held.
256 */
257 void clear_signal() const;
258
259 /** Block until the signaled state is set.
260
261 Returns immediately if already signaled (fast-path). Otherwise
262 increments the waiter count, waits on the condition variable,
263 and decrements the waiter count upon waking.
264
265 @par Preconditions
266 Mutex must be held.
267
268 @param lock The held mutex lock.
269 */
270 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271
272 /** Block until signaled or timeout expires.
273
274 @par Preconditions
275 Mutex must be held.
276
277 @param lock The held mutex lock.
278 @param timeout_us Maximum time to wait in microseconds.
279 */
280 void wait_for_signal_for(
281 std::unique_lock<std::mutex>& lock, long timeout_us) const;
282
283 int epoll_fd_;
284 int event_fd_; // for interrupting reactor
285 int timer_fd_; // timerfd for kernel-managed timer expiry
286 mutable std::mutex mutex_;
287 mutable std::condition_variable cond_;
288 mutable op_queue completed_ops_;
289 mutable std::atomic<long> outstanding_work_;
290 bool stopped_;
291
292 // True while a thread is blocked in epoll_wait. Used by
293 // wake_one_thread_and_unlock and work_finished to know when
294 // an eventfd interrupt is needed instead of a condvar signal.
295 mutable std::atomic<bool> task_running_{false};
296
297 // True when the reactor has been told to do a non-blocking poll
298 // (more handlers queued or poll mode). Prevents redundant eventfd
299 // writes and controls the epoll_wait timeout.
300 mutable bool task_interrupted_ = false;
301
302 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
303 mutable std::size_t state_ = 0;
304
305 // Edge-triggered eventfd state
306 mutable std::atomic<bool> eventfd_armed_{false};
307
308 // Set when the earliest timer changes; flushed before epoll_wait
309 // blocks. Avoids timerfd_settime syscalls for timers that are
310 // scheduled then cancelled without being waited on.
311 mutable std::atomic<bool> timerfd_stale_{false};
312
313 // Sentinel operation for interleaving reactor runs with handler execution.
314 // Ensures the reactor runs periodically even when handlers are continuously
315 // posted, preventing starvation of I/O events, timers, and signals.
316 struct task_op final : scheduler_op
317 {
318 void operator()() override {}
319 void destroy() override {}
320 };
321 task_op task_op_;
322 };
323
324 //--------------------------------------------------------------------------
325 //
326 // Implementation
327 //
328 //--------------------------------------------------------------------------
329
330 /*
331 epoll Scheduler - Single Reactor Model
332 ======================================
333
334 This scheduler uses a thread coordination strategy to provide handler
335 parallelism and avoid the thundering herd problem.
336 Instead of all threads blocking on epoll_wait(), one thread becomes the
337 "reactor" while others wait on a condition variable for handler work.
338
339 Thread Model
340 ------------
341 - ONE thread runs epoll_wait() at a time (the reactor thread)
342 - OTHER threads wait on cond_ (condition variable) for handlers
343 - When work is posted, exactly one waiting thread wakes via notify_one()
344 - This matches Windows IOCP semantics where N posted items wake N threads
345
346 Event Loop Structure (do_one)
347 -----------------------------
348 1. Lock mutex, try to pop handler from queue
349 2. If got handler: execute it (unlocked), return
350 3. If queue empty and no reactor running: become reactor
351 - Run epoll_wait (unlocked), queue I/O completions, loop back
352 4. If queue empty and reactor running: wait on condvar for work
353
354 The task_running_ flag ensures only one thread owns epoll_wait().
355 After the reactor queues I/O completions, it loops back to try getting
356 a handler, giving priority to handler execution over more I/O polling.
357
358 Signaling State (state_)
359 ------------------------
360 The state_ variable encodes two pieces of information:
361 - Bit 0: signaled flag (1 = signaled, persists until cleared)
362 - Upper bits: waiter count (each waiter adds 2 before blocking)
363
364 This allows efficient coordination:
365 - Signalers only call notify when waiters exist (state_ > 1)
366 - Waiters check if already signaled before blocking (fast-path)
367
368 Wake Coordination (wake_one_thread_and_unlock)
369 ----------------------------------------------
370 When posting work:
371 - If waiters exist (state_ > 1): signal and notify_one()
372 - Else if reactor running: interrupt via eventfd write
373 - Else: no-op (thread will find work when it checks queue)
374
375 This avoids waking threads unnecessarily. With cascading wakes,
376 each handler execution wakes at most one additional thread if
377 more work exists in the queue.
378
379 Work Counting
380 -------------
381 outstanding_work_ tracks pending operations. When it hits zero, run()
382 returns. Each operation increments on start, decrements on completion.
383
384 Timer Integration
385 -----------------
386 Timers are handled by timer_service. The reactor adjusts epoll_wait
387 timeout to wake for the nearest timer expiry. When a new timer is
388 scheduled earlier than current, timer_service calls interrupt_reactor()
389 to re-evaluate the timeout.
390 */
391
392 namespace epoll {
393
394 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
395 {
396 epoll_scheduler const* key;
397 scheduler_context* next;
398 op_queue private_queue;
399 long private_outstanding_work;
400 int inline_budget;
401 int inline_budget_max;
402 bool unassisted;
403
404 210x scheduler_context(epoll_scheduler const* k, scheduler_context* n)
405 210x : key(k)
406 210x , next(n)
407 210x , private_outstanding_work(0)
408 210x , inline_budget(0)
409 210x , inline_budget_max(2)
410 210x , unassisted(false)
411 {
412 210x }
413 };
414
415 inline thread_local_ptr<scheduler_context> context_stack;
416
417 struct thread_context_guard
418 {
419 scheduler_context frame_;
420
421 210x explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
422 210x : frame_(ctx, context_stack.get())
423 {
424 210x context_stack.set(&frame_);
425 210x }
426
427 210x ~thread_context_guard() noexcept
428 {
429 210x if (!frame_.private_queue.empty())
430 frame_.key->drain_thread_queue(
431 frame_.private_queue, frame_.private_outstanding_work);
432 210x context_stack.set(frame_.next);
433 210x }
434 };
435
436 inline scheduler_context*
437 192150x find_context(epoll_scheduler const* self) noexcept
438 {
439 192150x for (auto* c = context_stack.get(); c != nullptr; c = c->next)
440 190415x if (c->key == self)
441 190415x return c;
442 1735x return nullptr;
443 }
444
445 } // namespace epoll
446
447 inline void
448 29637x epoll_scheduler::reset_inline_budget() const noexcept
449 {
450 29637x if (auto* ctx = epoll::find_context(this))
451 {
452 // Cap when no other thread absorbed queued work. A moderate
453 // cap (4) amortizes scheduling for small buffers while avoiding
454 // bursty I/O that fills socket buffers and stalls large transfers.
455 29637x if (ctx->unassisted)
456 {
457 29637x ctx->inline_budget_max = 4;
458 29637x ctx->inline_budget = 4;
459 29637x return;
460 }
461 // Ramp up when previous cycle fully consumed budget.
462 // Reset on partial consumption (EAGAIN hit or peer got scheduled).
463 if (ctx->inline_budget == 0)
464 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
465 else if (ctx->inline_budget < ctx->inline_budget_max)
466 ctx->inline_budget_max = 2;
467 ctx->inline_budget = ctx->inline_budget_max;
468 }
469 }
470
471 inline bool
472 116076x epoll_scheduler::try_consume_inline_budget() const noexcept
473 {
474 116076x if (auto* ctx = epoll::find_context(this))
475 {
476 116076x if (ctx->inline_budget > 0)
477 {
478 92926x --ctx->inline_budget;
479 92926x return true;
480 }
481 }
482 23150x return false;
483 }
484
485 inline void
486 20733x descriptor_state::operator()()
487 {
488 20733x is_enqueued_.store(false, std::memory_order_relaxed);
489
490 // Take ownership of impl ref set by close_socket() to prevent
491 // the owning impl from being freed while we're executing
492 20733x auto prevent_impl_destruction = std::move(impl_ref_);
493
494 20733x std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
495 20733x if (ev == 0)
496 {
497 scheduler_->compensating_work_started();
498 return;
499 }
500
501 20733x op_queue local_ops;
502
503 20733x int err = 0;
504 20733x if (ev & EPOLLERR)
505 {
506 1x socklen_t len = sizeof(err);
507 1x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
508 err = errno;
509 1x if (err == 0)
510 err = EIO;
511 }
512
513 {
514 20733x std::lock_guard lock(mutex);
515 20733x if (ev & EPOLLIN)
516 {
517 5876x if (read_op)
518 {
519 3145x auto* rd = read_op;
520 3145x if (err)
521 rd->complete(err, 0);
522 else
523 3145x rd->perform_io();
524
525 3145x if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
526 {
527 rd->errn = 0;
528 }
529 else
530 {
531 3145x read_op = nullptr;
532 3145x local_ops.push(rd);
533 }
534 }
535 else
536 {
537 2731x read_ready = true;
538 }
539 }
540 20733x if (ev & EPOLLOUT)
541 {
542 17592x bool had_write_op = (connect_op || write_op);
543 17592x if (connect_op)
544 {
545 3145x auto* cn = connect_op;
546 3145x if (err)
547 1x cn->complete(err, 0);
548 else
549 3144x cn->perform_io();
550 3145x connect_op = nullptr;
551 3145x local_ops.push(cn);
552 }
553 17592x if (write_op)
554 {
555 auto* wr = write_op;
556 if (err)
557 wr->complete(err, 0);
558 else
559 wr->perform_io();
560
561 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
562 {
563 wr->errn = 0;
564 }
565 else
566 {
567 write_op = nullptr;
568 local_ops.push(wr);
569 }
570 }
571 17592x if (!had_write_op)
572 14447x write_ready = true;
573 }
574 20733x if (err)
575 {
576 1x if (read_op)
577 {
578 read_op->complete(err, 0);
579 local_ops.push(std::exchange(read_op, nullptr));
580 }
581 1x if (write_op)
582 {
583 write_op->complete(err, 0);
584 local_ops.push(std::exchange(write_op, nullptr));
585 }
586 1x if (connect_op)
587 {
588 connect_op->complete(err, 0);
589 local_ops.push(std::exchange(connect_op, nullptr));
590 }
591 }
592 20733x }
593
594 // Execute first handler inline — the scheduler's work_cleanup
595 // accounts for this as the "consumed" work item
596 20733x scheduler_op* first = local_ops.pop();
597 20733x if (first)
598 {
599 6290x scheduler_->post_deferred_completions(local_ops);
600 6290x (*first)();
601 }
602 else
603 {
604 14443x scheduler_->compensating_work_started();
605 }
606 20733x }
607
608 244x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609 244x : epoll_fd_(-1)
610 244x , event_fd_(-1)
611 244x , timer_fd_(-1)
612 244x , outstanding_work_(0)
613 244x , stopped_(false)
614 244x , task_running_{false}
615 244x , task_interrupted_(false)
616 488x , state_(0)
617 {
618 244x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
619 244x if (epoll_fd_ < 0)
620 detail::throw_system_error(make_err(errno), "epoll_create1");
621
622 244x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
623 244x if (event_fd_ < 0)
624 {
625 int errn = errno;
626 ::close(epoll_fd_);
627 detail::throw_system_error(make_err(errn), "eventfd");
628 }
629
630 244x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
631 244x if (timer_fd_ < 0)
632 {
633 int errn = errno;
634 ::close(event_fd_);
635 ::close(epoll_fd_);
636 detail::throw_system_error(make_err(errn), "timerfd_create");
637 }
638
639 244x epoll_event ev{};
640 244x ev.events = EPOLLIN | EPOLLET;
641 244x ev.data.ptr = nullptr;
642 244x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
643 {
644 int errn = errno;
645 ::close(timer_fd_);
646 ::close(event_fd_);
647 ::close(epoll_fd_);
648 detail::throw_system_error(make_err(errn), "epoll_ctl");
649 }
650
651 244x epoll_event timer_ev{};
652 244x timer_ev.events = EPOLLIN | EPOLLERR;
653 244x timer_ev.data.ptr = &timer_fd_;
654 244x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
655 {
656 int errn = errno;
657 ::close(timer_fd_);
658 ::close(event_fd_);
659 ::close(epoll_fd_);
660 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
661 }
662
663 244x timer_svc_ = &get_timer_service(ctx, *this);
664 244x timer_svc_->set_on_earliest_changed(
665 3597x timer_service::callback(this, [](void* p) {
666 3353x auto* self = static_cast<epoll_scheduler*>(p);
667 3353x self->timerfd_stale_.store(true, std::memory_order_release);
668 3353x if (self->task_running_.load(std::memory_order_acquire))
669 self->interrupt_reactor();
670 3353x }));
671
672 // Initialize resolver service
673 244x get_resolver_service(ctx, *this);
674
675 // Initialize signal service
676 244x get_signal_service(ctx, *this);
677
678 // Push task sentinel to interleave reactor runs with handler execution
679 244x completed_ops_.push(&task_op_);
680 244x }
681
682 488x inline epoll_scheduler::~epoll_scheduler()
683 {
684 244x if (timer_fd_ >= 0)
685 244x ::close(timer_fd_);
686 244x if (event_fd_ >= 0)
687 244x ::close(event_fd_);
688 244x if (epoll_fd_ >= 0)
689 244x ::close(epoll_fd_);
690 488x }
691
692 inline void
693 244x epoll_scheduler::shutdown()
694 {
695 {
696 244x std::unique_lock lock(mutex_);
697
698 528x while (auto* h = completed_ops_.pop())
699 {
700 284x if (h == &task_op_)
701 244x continue;
702 40x lock.unlock();
703 40x h->destroy();
704 40x lock.lock();
705 284x }
706
707 244x signal_all(lock);
708 244x }
709
710 244x if (event_fd_ >= 0)
711 244x interrupt_reactor();
712 244x }
713
714 inline void
715 5236x epoll_scheduler::post(std::coroutine_handle<> h) const
716 {
717 struct post_handler final : scheduler_op
718 {
719 std::coroutine_handle<> h_;
720
721 5236x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
722
723 10472x ~post_handler() override = default;
724
725 5230x void operator()() override
726 {
727 5230x auto h = h_;
728 5230x delete this;
729 5230x h.resume();
730 5230x }
731
732 6x void destroy() override
733 {
734 6x auto h = h_;
735 6x delete this;
736 6x h.destroy();
737 6x }
738 };
739
740 5236x auto ph = std::make_unique<post_handler>(h);
741
742 // Fast path: same thread posts to private queue
743 // Only count locally; work_cleanup batches to global counter
744 5236x if (auto* ctx = epoll::find_context(this))
745 {
746 3531x ++ctx->private_outstanding_work;
747 3531x ctx->private_queue.push(ph.release());
748 3531x return;
749 }
750
751 // Slow path: cross-thread post requires mutex
752 1705x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
753
754 1705x std::unique_lock lock(mutex_);
755 1705x completed_ops_.push(ph.release());
756 1705x wake_one_thread_and_unlock(lock);
757 5236x }
758
759 inline void
760 26758x epoll_scheduler::post(scheduler_op* h) const
761 {
762 // Fast path: same thread posts to private queue
763 // Only count locally; work_cleanup batches to global counter
764 26758x if (auto* ctx = epoll::find_context(this))
765 {
766 26728x ++ctx->private_outstanding_work;
767 26728x ctx->private_queue.push(h);
768 26728x return;
769 }
770
771 // Slow path: cross-thread post requires mutex
772 30x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
773
774 30x std::unique_lock lock(mutex_);
775 30x completed_ops_.push(h);
776 30x wake_one_thread_and_unlock(lock);
777 30x }
778
779 inline bool
780 713x epoll_scheduler::running_in_this_thread() const noexcept
781 {
782 713x for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
783 427x if (c->key == this)
784 427x return true;
785 286x return false;
786 }
787
788 inline void
789 229x epoll_scheduler::stop()
790 {
791 229x std::unique_lock lock(mutex_);
792 229x if (!stopped_)
793 {
794 191x stopped_ = true;
795 191x signal_all(lock);
796 191x interrupt_reactor();
797 }
798 229x }
799
800 inline bool
801 18x epoll_scheduler::stopped() const noexcept
802 {
803 18x std::unique_lock lock(mutex_);
804 36x return stopped_;
805 18x }
806
807 inline void
808 53x epoll_scheduler::restart()
809 {
810 53x std::unique_lock lock(mutex_);
811 53x stopped_ = false;
812 53x }
813
814 inline std::size_t
815 209x epoll_scheduler::run()
816 {
817 418x if (outstanding_work_.load(std::memory_order_acquire) == 0)
818 {
819 33x stop();
820 33x return 0;
821 }
822
823 176x epoll::thread_context_guard ctx(this);
824 176x std::unique_lock lock(mutex_);
825
826 176x std::size_t n = 0;
827 for (;;)
828 {
829 52858x if (!do_one(lock, -1, &ctx.frame_))
830 176x break;
831 52682x if (n != (std::numeric_limits<std::size_t>::max)())
832 52682x ++n;
833 52682x if (!lock.owns_lock())
834 25783x lock.lock();
835 }
836 176x return n;
837 176x }
838
839 inline std::size_t
840 2x epoll_scheduler::run_one()
841 {
842 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
843 {
844 stop();
845 return 0;
846 }
847
848 2x epoll::thread_context_guard ctx(this);
849 2x std::unique_lock lock(mutex_);
850 2x return do_one(lock, -1, &ctx.frame_);
851 2x }
852
853 inline std::size_t
854 34x epoll_scheduler::wait_one(long usec)
855 {
856 68x if (outstanding_work_.load(std::memory_order_acquire) == 0)
857 {
858 7x stop();
859 7x return 0;
860 }
861
862 27x epoll::thread_context_guard ctx(this);
863 27x std::unique_lock lock(mutex_);
864 27x return do_one(lock, usec, &ctx.frame_);
865 27x }
866
867 inline std::size_t
868 4x epoll_scheduler::poll()
869 {
870 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
871 {
872 1x stop();
873 1x return 0;
874 }
875
876 3x epoll::thread_context_guard ctx(this);
877 3x std::unique_lock lock(mutex_);
878
879 3x std::size_t n = 0;
880 for (;;)
881 {
882 7x if (!do_one(lock, 0, &ctx.frame_))
883 3x break;
884 4x if (n != (std::numeric_limits<std::size_t>::max)())
885 4x ++n;
886 4x if (!lock.owns_lock())
887 4x lock.lock();
888 }
889 3x return n;
890 3x }
891
892 inline std::size_t
893 4x epoll_scheduler::poll_one()
894 {
895 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
896 {
897 2x stop();
898 2x return 0;
899 }
900
901 2x epoll::thread_context_guard ctx(this);
902 2x std::unique_lock lock(mutex_);
903 2x return do_one(lock, 0, &ctx.frame_);
904 2x }
905
906 inline void
907 6378x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
908 {
909 6378x epoll_event ev{};
910 6378x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
911 6378x ev.data.ptr = desc;
912
913 6378x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
914 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
915
916 6378x desc->registered_events = ev.events;
917 6378x desc->fd = fd;
918 6378x desc->scheduler_ = this;
919
920 6378x std::lock_guard lock(desc->mutex);
921 6378x desc->read_ready = false;
922 6378x desc->write_ready = false;
923 6378x }
924
925 inline void
926 6378x epoll_scheduler::deregister_descriptor(int fd) const
927 {
928 6378x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
929 6378x }
930
931 inline void
932 10586x epoll_scheduler::work_started() noexcept
933 {
934 10586x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
935 10586x }
936
937 inline void
938 15660x epoll_scheduler::work_finished() noexcept
939 {
940 31320x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
941 184x stop();
942 15660x }
943
944 inline void
945 14443x epoll_scheduler::compensating_work_started() const noexcept
946 {
947 14443x auto* ctx = epoll::find_context(this);
948 14443x if (ctx)
949 14443x ++ctx->private_outstanding_work;
950 14443x }
951
952 inline void
953 epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
954 {
955 // Note: outstanding_work_ was already incremented when posting
956 std::unique_lock lock(mutex_);
957 completed_ops_.splice(queue);
958 if (count > 0)
959 maybe_unlock_and_signal_one(lock);
960 }
961
962 inline void
963 6290x epoll_scheduler::post_deferred_completions(op_queue& ops) const
964 {
965 6290x if (ops.empty())
966 6290x return;
967
968 // Fast path: if on scheduler thread, use private queue
969 if (auto* ctx = epoll::find_context(this))
970 {
971 ctx->private_queue.splice(ops);
972 return;
973 }
974
975 // Slow path: add to global queue and wake a thread
976 std::unique_lock lock(mutex_);
977 completed_ops_.splice(ops);
978 wake_one_thread_and_unlock(lock);
979 }
980
981 inline void
982 456x epoll_scheduler::interrupt_reactor() const
983 {
984 // Only write if not already armed to avoid redundant writes
985 456x bool expected = false;
986 456x if (eventfd_armed_.compare_exchange_strong(
987 expected, true, std::memory_order_release,
988 std::memory_order_relaxed))
989 {
990 308x std::uint64_t val = 1;
991 308x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
992 }
993 456x }
994
995 inline void
996 435x epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
997 {
998 435x state_ |= 1;
999 435x cond_.notify_all();
1000 435x }
1001
1002 inline bool
1003 1735x epoll_scheduler::maybe_unlock_and_signal_one(
1004 std::unique_lock<std::mutex>& lock) const
1005 {
1006 1735x state_ |= 1;
1007 1735x if (state_ > 1)
1008 {
1009 lock.unlock();
1010 cond_.notify_one();
1011 return true;
1012 }
1013 1735x return false;
1014 }
1015
1016 inline bool
1017 64100x epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1018 {
1019 64100x state_ |= 1;
1020 64100x bool have_waiters = state_ > 1;
1021 64100x lock.unlock();
1022 64100x if (have_waiters)
1023 cond_.notify_one();
1024 64100x return have_waiters;
1025 }
1026
1027 inline void
1028 epoll_scheduler::clear_signal() const
1029 {
1030 state_ &= ~std::size_t(1);
1031 }
1032
1033 inline void
1034 epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1035 {
1036 while ((state_ & 1) == 0)
1037 {
1038 state_ += 2;
1039 cond_.wait(lock);
1040 state_ -= 2;
1041 }
1042 }
1043
1044 inline void
1045 epoll_scheduler::wait_for_signal_for(
1046 std::unique_lock<std::mutex>& lock, long timeout_us) const
1047 {
1048 if ((state_ & 1) == 0)
1049 {
1050 state_ += 2;
1051 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1052 state_ -= 2;
1053 }
1054 }
1055
1056 inline void
1057 1735x epoll_scheduler::wake_one_thread_and_unlock(
1058 std::unique_lock<std::mutex>& lock) const
1059 {
1060 1735x if (maybe_unlock_and_signal_one(lock))
1061 return;
1062
1063 1735x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1064 {
1065 21x task_interrupted_ = true;
1066 21x lock.unlock();
1067 21x interrupt_reactor();
1068 }
1069 else
1070 {
1071 1714x lock.unlock();
1072 }
1073 }
1074
1075 52717x inline epoll_scheduler::work_cleanup::~work_cleanup()
1076 {
1077 52717x if (ctx)
1078 {
1079 52717x long produced = ctx->private_outstanding_work;
1080 52717x if (produced > 1)
1081 8x scheduler->outstanding_work_.fetch_add(
1082 produced - 1, std::memory_order_relaxed);
1083 52709x else if (produced < 1)
1084 11364x scheduler->work_finished();
1085 52717x ctx->private_outstanding_work = 0;
1086
1087 52717x if (!ctx->private_queue.empty())
1088 {
1089 26910x lock->lock();
1090 26910x scheduler->completed_ops_.splice(ctx->private_queue);
1091 }
1092 }
1093 else
1094 {
1095 scheduler->work_finished();
1096 }
1097 52717x }
1098
1099 35768x inline epoll_scheduler::task_cleanup::~task_cleanup()
1100 {
1101 17884x if (!ctx)
1102 return;
1103
1104 17884x if (ctx->private_outstanding_work > 0)
1105 {
1106 3336x scheduler->outstanding_work_.fetch_add(
1107 3336x ctx->private_outstanding_work, std::memory_order_relaxed);
1108 3336x ctx->private_outstanding_work = 0;
1109 }
1110
1111 17884x if (!ctx->private_queue.empty())
1112 {
1113 3336x if (!lock->owns_lock())
1114 lock->lock();
1115 3336x scheduler->completed_ops_.splice(ctx->private_queue);
1116 }
1117 17884x }
1118
1119 inline void
1120 6668x epoll_scheduler::update_timerfd() const
1121 {
1122 6668x auto nearest = timer_svc_->nearest_expiry();
1123
1124 6668x itimerspec ts{};
1125 6668x int flags = 0;
1126
1127 6668x if (nearest == timer_service::time_point::max())
1128 {
1129 // No timers - disarm by setting to 0 (relative)
1130 }
1131 else
1132 {
1133 6623x auto now = std::chrono::steady_clock::now();
1134 6623x if (nearest <= now)
1135 {
1136 // Use 1ns instead of 0 - zero disarms the timerfd
1137 263x ts.it_value.tv_nsec = 1;
1138 }
1139 else
1140 {
1141 6360x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1142 6360x nearest - now)
1143 6360x .count();
1144 6360x ts.it_value.tv_sec = nsec / 1000000000;
1145 6360x ts.it_value.tv_nsec = nsec % 1000000000;
1146 // Ensure non-zero to avoid disarming if duration rounds to 0
1147 6360x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1148 ts.it_value.tv_nsec = 1;
1149 }
1150 }
1151
1152 6668x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1153 detail::throw_system_error(make_err(errno), "timerfd_settime");
1154 6668x }
1155
1156 inline void
1157 17884x epoll_scheduler::run_task(
1158 std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1159 {
1160 17884x int timeout_ms = task_interrupted_ ? 0 : -1;
1161
1162 17884x if (lock.owns_lock())
1163 6501x lock.unlock();
1164
1165 17884x task_cleanup on_exit{this, &lock, ctx};
1166
1167 // Flush deferred timerfd programming before blocking
1168 17884x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1169 3332x update_timerfd();
1170
1171 // Event loop runs without mutex held
1172 epoll_event events[128];
1173 17884x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1174
1175 17884x if (nfds < 0 && errno != EINTR)
1176 detail::throw_system_error(make_err(errno), "epoll_wait");
1177
1178 17884x bool check_timers = false;
1179 17884x op_queue local_ops;
1180
1181 // Process events without holding the mutex
1182 42047x for (int i = 0; i < nfds; ++i)
1183 {
1184 24163x if (events[i].data.ptr == nullptr)
1185 {
1186 std::uint64_t val;
1187 // Mutex released above; analyzer can't track unlock via ref
1188 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1189 64x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1190 64x eventfd_armed_.store(false, std::memory_order_relaxed);
1191 64x continue;
1192 64x }
1193
1194 24099x if (events[i].data.ptr == &timer_fd_)
1195 {
1196 std::uint64_t expirations;
1197 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1198 [[maybe_unused]] auto r =
1199 3336x ::read(timer_fd_, &expirations, sizeof(expirations));
1200 3336x check_timers = true;
1201 3336x continue;
1202 3336x }
1203
1204 // Deferred I/O: just set ready events and enqueue descriptor
1205 // No per-descriptor mutex locking in reactor hot path!
1206 20763x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1207 20763x desc->add_ready_events(events[i].events);
1208
1209 // Only enqueue if not already enqueued
1210 20763x bool expected = false;
1211 20763x if (desc->is_enqueued_.compare_exchange_strong(
1212 expected, true, std::memory_order_release,
1213 std::memory_order_relaxed))
1214 {
1215 20763x local_ops.push(desc);
1216 }
1217 }
1218
1219 // Process timers only when timerfd fires
1220 17884x if (check_timers)
1221 {
1222 3336x timer_svc_->process_expired();
1223 3336x update_timerfd();
1224 }
1225
1226 17884x lock.lock();
1227
1228 17884x if (!local_ops.empty())
1229 10891x completed_ops_.splice(local_ops);
1230 17884x }
1231
1232 inline std::size_t
1233 52896x epoll_scheduler::do_one(
1234 std::unique_lock<std::mutex>& lock,
1235 long timeout_us,
1236 epoll::scheduler_context* ctx)
1237 {
1238 for (;;)
1239 {
1240 70780x if (stopped_)
1241 177x return 0;
1242
1243 70603x scheduler_op* op = completed_ops_.pop();
1244
1245 // Handle reactor sentinel - time to poll for I/O
1246 70603x if (op == &task_op_)
1247 {
1248 17886x bool more_handlers = !completed_ops_.empty();
1249
1250 // Nothing to run the reactor for: no pending work to wait on,
1251 // or caller requested a non-blocking poll
1252 24389x if (!more_handlers &&
1253 13006x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1254 timeout_us == 0))
1255 {
1256 2x completed_ops_.push(&task_op_);
1257 2x return 0;
1258 }
1259
1260 17884x task_interrupted_ = more_handlers || timeout_us == 0;
1261 17884x task_running_.store(true, std::memory_order_release);
1262
1263 17884x if (more_handlers)
1264 11383x unlock_and_signal_one(lock);
1265
1266 17884x run_task(lock, ctx);
1267
1268 17884x task_running_.store(false, std::memory_order_relaxed);
1269 17884x completed_ops_.push(&task_op_);
1270 17884x continue;
1271 17884x }
1272
1273 // Handle operation
1274 52717x if (op != nullptr)
1275 {
1276 52717x bool more = !completed_ops_.empty();
1277
1278 52717x if (more)
1279 52717x ctx->unassisted = !unlock_and_signal_one(lock);
1280 else
1281 {
1282 ctx->unassisted = false;
1283 lock.unlock();
1284 }
1285
1286 52717x work_cleanup on_exit{this, &lock, ctx};
1287
1288 52717x (*op)();
1289 52717x return 1;
1290 52717x }
1291
1292 // No pending work to wait on, or caller requested non-blocking poll
1293 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1294 timeout_us == 0)
1295 return 0;
1296
1297 clear_signal();
1298 if (timeout_us < 0)
1299 wait_for_signal(lock);
1300 else
1301 wait_for_signal_for(lock, timeout_us);
1302 17884x }
1303 }
1304
1305 } // namespace boost::corosio::detail
1306
1307 #endif // BOOST_COROSIO_HAS_EPOLL
1308
1309 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1310