include/boost/corosio/detail/timer_service.hpp

91.5% Lines (334/365) 97.7% List of functions (43/44)
f(x) Functions (44)
Function Calls Lines Branches Blocks
boost::corosio::detail::timer_service::callback::callback() :97 0 100.0% boost::corosio::detail::timer_service::callback::callback(void*, void (*)(void*)) :100 0 100.0% boost::corosio::detail::timer_service::callback::operator()() const :109 0 100.0% boost::corosio::detail::timer_service::timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :137 0 100.0% boost::corosio::detail::timer_service::get_scheduler() :143 0 100.0% boost::corosio::detail::timer_service::~timer_service() :149 0 100.0% boost::corosio::detail::timer_service::set_on_earliest_changed(boost::corosio::detail::timer_service::callback) :155 0 100.0% boost::corosio::detail::timer_service::nearest_expiry() const :168 0 100.0% boost::corosio::detail::timer_service::refresh_cached_nearest() :211 0 100.0% boost::corosio::detail::waiter_node::completion_op::completion_op() :235 0 100.0% boost::corosio::detail::waiter_node::waiter_node() :260 0 100.0% boost::corosio::detail::try_pop_tl_cache(boost::corosio::detail::timer_service*) :296 0 87.5% boost::corosio::detail::try_push_tl_cache(boost::corosio::detail::timer_service::implementation*) :311 0 100.0% boost::corosio::detail::try_pop_waiter_tl_cache() :322 0 100.0% boost::corosio::detail::try_push_waiter_tl_cache(boost::corosio::detail::waiter_node*) :334 0 100.0% boost::corosio::detail::timer_service_invalidate_cache() :345 0 100.0% boost::corosio::detail::timer_service::implementation::implementation(boost::corosio::detail::timer_service&) :356 0 100.0% boost::corosio::detail::timer_service::shutdown() :363 0 100.0% boost::corosio::detail::timer_service::construct() :410 0 66.7% boost::corosio::detail::timer_service::destroy(boost::corosio::io_object::implementation*) :439 0 100.0% boost::corosio::detail::timer_service::destroy_impl(boost::corosio::detail::timer_service::implementation&) :445 0 69.2% boost::corosio::detail::timer_service::create_waiter() :465 0 100.0% boost::corosio::detail::timer_service::destroy_waiter(boost::corosio::detail::waiter_node*) :483 0 100.0% boost::corosio::detail::timer_service::update_timer(boost::corosio::detail::timer_service::implementation&, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >) :494 0 93.1% boost::corosio::detail::timer_service::insert_waiter(boost::corosio::detail::timer_service::implementation&, boost::corosio::detail::waiter_node*) :544 0 100.0% boost::corosio::detail::timer_service::cancel_timer(boost::corosio::detail::timer_service::implementation&) :564 0 87.5% boost::corosio::detail::timer_service::cancel_waiter(boost::corosio::detail::waiter_node*) :604 0 92.9% boost::corosio::detail::timer_service::cancel_one_waiter(boost::corosio::detail::timer_service::implementation&) :627 0 76.5% boost::corosio::detail::timer_service::process_expired() :654 0 100.0% boost::corosio::detail::timer_service::remove_timer_impl(boost::corosio::detail::timer_service::implementation&) :689 0 84.6% boost::corosio::detail::timer_service::up_heap(unsigned long) :716 0 100.0% boost::corosio::detail::timer_service::down_heap(unsigned long) :729 0 69.2% boost::corosio::detail::timer_service::swap_heap(unsigned long, unsigned long) :749 0 100.0% boost::corosio::detail::waiter_node::canceller::operator()() const :761 0 100.0% boost::corosio::detail::waiter_node::completion_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :767 0 0.0% boost::corosio::detail::waiter_node::completion_op::operator()() :781 0 100.0% boost::corosio::detail::waiter_node::completion_op::destroy() :806 0 100.0% boost::corosio::detail::timer_service::implementation::wait(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*) :833 0 100.0% boost::corosio::detail::timer_service_access::get_scheduler(boost::corosio::io_context&) :875 0 100.0% boost::corosio::detail::timer_service_direct(boost::capy::execution_context&) :883 0 100.0% boost::corosio::detail::timer_service_update_expiry(boost::corosio::io_timer::implementation&) :890 0 100.0% boost::corosio::detail::timer_service_cancel(boost::corosio::io_timer::implementation&) :897 0 100.0% boost::corosio::detail::timer_service_cancel_one(boost::corosio::io_timer::implementation&) :904 0 100.0% boost::corosio::detail::get_timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :911 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <utility>
34 #include <vector>
35
36 namespace boost::corosio::detail {
37
38 struct scheduler;
39
40 /*
41 Timer Service
42 =============
43
44 Data Structures
45 ---------------
46 waiter_node holds per-waiter state: coroutine handle, executor,
47 error output, stop_token, embedded completion_op. Each concurrent
48 co_await t.wait() allocates one waiter_node.
49
50 timer_service::implementation holds per-timer state: expiry,
51 heap index, and an intrusive_list of waiter_nodes. Multiple
52 coroutines can wait on the same timer simultaneously.
53
54 timer_service owns a min-heap of active timers, a free list
55 of recycled impls, and a free list of recycled waiter_nodes. The
56 heap is ordered by expiry time; the scheduler queries
57 nearest_expiry() to set the epoll/timerfd timeout.
58
59 Optimization Strategy
60 ---------------------
61 1. Deferred heap insertion — expires_after() stores the expiry
62 but does not insert into the heap. Insertion happens in wait().
63 2. Thread-local impl cache — single-slot per-thread cache.
64 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 6. Thread-local waiter cache — single-slot per-thread cache.
68
69 Concurrency
70 -----------
71 stop_token callbacks can fire from any thread. The impl_
72 pointer on waiter_node is used as a "still in list" marker.
73 */
74
75 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76
77 inline void timer_service_invalidate_cache() noexcept;
78
79 // timer_service class body — member function definitions are
80 // out-of-class (after implementation and waiter_node are complete)
81 class BOOST_COROSIO_DECL timer_service final
82 : public capy::execution_context::service
83 , public io_object::io_service
84 {
85 public:
86 using clock_type = std::chrono::steady_clock;
87 using time_point = clock_type::time_point;
88
89 /// Type-erased callback for earliest-expiry-changed notifications.
90 class callback
91 {
92 void* ctx_ = nullptr;
93 void (*fn_)(void*) = nullptr;
94
95 public:
96 /// Construct an empty callback.
97 412x callback() = default;
98
99 /// Construct a callback with the given context and function.
100 412x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101
102 /// Return true if the callback is non-empty.
103 explicit operator bool() const noexcept
104 {
105 return fn_ != nullptr;
106 }
107
108 /// Invoke the callback.
109 4831x void operator()() const
110 {
111 4831x if (fn_)
112 4831x fn_(ctx_);
113 4831x }
114 };
115
116 struct implementation;
117
118 private:
119 struct heap_entry
120 {
121 time_point time_;
122 implementation* timer_;
123 };
124
125 scheduler* sched_ = nullptr;
126 mutable std::mutex mutex_;
127 std::vector<heap_entry> heap_;
128 implementation* free_list_ = nullptr;
129 waiter_node* waiter_free_list_ = nullptr;
130 callback on_earliest_changed_;
131 // Avoids mutex in nearest_expiry() and empty()
132 mutable std::atomic<std::int64_t> cached_nearest_ns_{
133 (std::numeric_limits<std::int64_t>::max)()};
134
135 public:
136 /// Construct the timer service bound to a scheduler.
137 412x inline timer_service(capy::execution_context&, scheduler& sched)
138 412x : sched_(&sched)
139 {
140 412x }
141
142 /// Return the associated scheduler.
143 9746x inline scheduler& get_scheduler() noexcept
144 {
145 9746x return *sched_;
146 }
147
148 /// Destroy the timer service.
149 824x ~timer_service() override = default;
150
151 timer_service(timer_service const&) = delete;
152 timer_service& operator=(timer_service const&) = delete;
153
154 /// Register a callback invoked when the earliest expiry changes.
155 412x inline void set_on_earliest_changed(callback cb)
156 {
157 412x on_earliest_changed_ = cb;
158 412x }
159
160 /// Return true if no timers are in the heap.
161 inline bool empty() const noexcept
162 {
163 return cached_nearest_ns_.load(std::memory_order_acquire) ==
164 (std::numeric_limits<std::int64_t>::max)();
165 }
166
167 /// Return the nearest timer expiry without acquiring the mutex.
168 10736x inline time_point nearest_expiry() const noexcept
169 {
170 10736x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
171 10736x return time_point(time_point::duration(ns));
172 }
173
174 /// Cancel all pending timers and free cached resources.
175 inline void shutdown() override;
176
177 /// Construct a new timer implementation.
178 inline io_object::implementation* construct() override;
179
180 /// Destroy a timer implementation, cancelling pending waiters.
181 inline void destroy(io_object::implementation* p) override;
182
183 /// Cancel and recycle a timer implementation.
184 inline void destroy_impl(implementation& impl);
185
186 /// Create or recycle a waiter node.
187 inline waiter_node* create_waiter();
188
189 /// Return a waiter node to the cache or free list.
190 inline void destroy_waiter(waiter_node* w);
191
192 /// Update the timer expiry, cancelling existing waiters.
193 inline std::size_t update_timer(implementation& impl, time_point new_time);
194
195 /// Insert a waiter into the timer's waiter list and the heap.
196 inline void insert_waiter(implementation& impl, waiter_node* w);
197
198 /// Cancel all waiters on a timer.
199 inline std::size_t cancel_timer(implementation& impl);
200
201 /// Cancel a single waiter ( stop_token callback path ).
202 inline void cancel_waiter(waiter_node* w);
203
204 /// Cancel one waiter on a timer.
205 inline std::size_t cancel_one_waiter(implementation& impl);
206
207 /// Complete all waiters whose timers have expired.
208 inline std::size_t process_expired();
209
210 private:
211 58041x inline void refresh_cached_nearest() noexcept
212 {
213 58041x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
214 57563x : heap_[0].time_.time_since_epoch().count();
215 58041x cached_nearest_ns_.store(ns, std::memory_order_release);
216 58041x }
217
218 inline void remove_timer_impl(implementation& impl);
219 inline void up_heap(std::size_t index);
220 inline void down_heap(std::size_t index);
221 inline void swap_heap(std::size_t i1, std::size_t i2);
222 };
223
224 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225 : intrusive_list<waiter_node>::node
226 {
227 // Embedded completion op — avoids heap allocation per fire/cancel
228 struct completion_op final : scheduler_op
229 {
230 waiter_node* waiter_ = nullptr;
231
232 static void do_complete(
233 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234
235 194x completion_op() noexcept : scheduler_op(&do_complete) {}
236
237 void operator()() override;
238 void destroy() override;
239 };
240
241 // Per-waiter stop_token cancellation
242 struct canceller
243 {
244 waiter_node* waiter_;
245 void operator()() const;
246 };
247
248 // nullptr once removed from timer's waiter list (concurrency marker)
249 timer_service::implementation* impl_ = nullptr;
250 timer_service* svc_ = nullptr;
251 std::coroutine_handle<> h_;
252 capy::executor_ref d_;
253 std::error_code* ec_out_ = nullptr;
254 std::stop_token token_;
255 std::optional<std::stop_callback<canceller>> stop_cb_;
256 completion_op op_;
257 std::error_code ec_value_;
258 waiter_node* next_free_ = nullptr;
259
260 194x waiter_node() noexcept
261 194x {
262 194x op_.waiter_ = this;
263 194x }
264 };
265
266 struct timer_service::implementation final : timer::implementation
267 {
268 using clock_type = std::chrono::steady_clock;
269 using time_point = clock_type::time_point;
270 using duration = clock_type::duration;
271
272 timer_service* svc_ = nullptr;
273 intrusive_list<waiter_node> waiters_;
274
275 // Free list linkage (reused when impl is on free_list)
276 implementation* next_free_ = nullptr;
277
278 inline explicit implementation(timer_service& svc) noexcept;
279
280 inline std::coroutine_handle<> wait(
281 std::coroutine_handle<>,
282 capy::executor_ref,
283 std::stop_token,
284 std::error_code*) override;
285 };
286
287 // Thread-local caches avoid hot-path mutex acquisitions:
288 // 1. Impl cache — single-slot, validated by comparing svc_
289 // 2. Waiter cache — single-slot, no service affinity
290 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
291
292 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
293 inline thread_local_ptr<waiter_node> tl_cached_waiter;
294
295 inline timer_service::implementation*
296 5128x try_pop_tl_cache(timer_service* svc) noexcept
297 {
298 5128x auto* impl = tl_cached_impl.get();
299 5128x if (impl)
300 {
301 4901x tl_cached_impl.set(nullptr);
302 4901x if (impl->svc_ == svc)
303 4901x return impl;
304 // Stale impl from a destroyed service
305 delete impl;
306 }
307 227x return nullptr;
308 }
309
310 inline bool
311 5126x try_push_tl_cache(timer_service::implementation* impl) noexcept
312 {
313 5126x if (!tl_cached_impl.get())
314 {
315 5052x tl_cached_impl.set(impl);
316 5052x return true;
317 }
318 74x return false;
319 }
320
321 inline waiter_node*
322 4874x try_pop_waiter_tl_cache() noexcept
323 {
324 4874x auto* w = tl_cached_waiter.get();
325 4874x if (w)
326 {
327 4678x tl_cached_waiter.set(nullptr);
328 4678x return w;
329 }
330 196x return nullptr;
331 }
332
333 inline bool
334 4864x try_push_waiter_tl_cache(waiter_node* w) noexcept
335 {
336 4864x if (!tl_cached_waiter.get())
337 {
338 4784x tl_cached_waiter.set(w);
339 4784x return true;
340 }
341 80x return false;
342 }
343
344 inline void
345 412x timer_service_invalidate_cache() noexcept
346 {
347 412x delete tl_cached_impl.get();
348 412x tl_cached_impl.set(nullptr);
349
350 412x delete tl_cached_waiter.get();
351 412x tl_cached_waiter.set(nullptr);
352 412x }
353
354 // timer_service out-of-class member function definitions
355
356 227x inline timer_service::implementation::implementation(
357 227x timer_service& svc) noexcept
358 227x : svc_(&svc)
359 {
360 227x }
361
362 inline void
363 412x timer_service::shutdown()
364 {
365 412x timer_service_invalidate_cache();
366
367 // Cancel waiting timers still in the heap.
368 // Each waiter called work_started() in implementation::wait().
369 // On IOCP the scheduler shutdown loop exits when outstanding_work_
370 // reaches zero, so we must call work_finished() here to balance it.
371 // On other backends this is harmless (their drain loops exit when
372 // the queue is empty, not based on outstanding_work_).
373 414x for (auto& entry : heap_)
374 {
375 2x auto* impl = entry.timer_;
376 4x while (auto* w = impl->waiters_.pop_front())
377 {
378 2x w->stop_cb_.reset();
379 2x auto h = std::exchange(w->h_, {});
380 2x sched_->work_finished();
381 2x if (h)
382 2x h.destroy();
383 2x delete w;
384 2x }
385 2x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
386 2x delete impl;
387 }
388 412x heap_.clear();
389 412x cached_nearest_ns_.store(
390 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
391
392 // Delete free-listed impls
393 486x while (free_list_)
394 {
395 74x auto* next = free_list_->next_free_;
396 74x delete free_list_;
397 74x free_list_ = next;
398 }
399
400 // Delete free-listed waiters
401 490x while (waiter_free_list_)
402 {
403 78x auto* next = waiter_free_list_->next_free_;
404 78x delete waiter_free_list_;
405 78x waiter_free_list_ = next;
406 }
407 412x }
408
409 inline io_object::implementation*
410 5128x timer_service::construct()
411 {
412 5128x implementation* impl = try_pop_tl_cache(this);
413 5128x if (impl)
414 {
415 4901x impl->svc_ = this;
416 4901x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
417 4901x impl->might_have_pending_waits_ = false;
418 4901x return impl;
419 }
420
421 227x std::lock_guard lock(mutex_);
422 227x if (free_list_)
423 {
424 impl = free_list_;
425 free_list_ = impl->next_free_;
426 impl->next_free_ = nullptr;
427 impl->svc_ = this;
428 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
429 impl->might_have_pending_waits_ = false;
430 }
431 else
432 {
433 227x impl = new implementation(*this);
434 }
435 227x return impl;
436 227x }
437
438 inline void
439 5126x timer_service::destroy(io_object::implementation* p)
440 {
441 5126x destroy_impl(static_cast<implementation&>(*p));
442 5126x }
443
444 inline void
445 5126x timer_service::destroy_impl(implementation& impl)
446 {
447 5126x cancel_timer(impl);
448
449 5126x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
450 {
451 std::lock_guard lock(mutex_);
452 remove_timer_impl(impl);
453 refresh_cached_nearest();
454 }
455
456 5126x if (try_push_tl_cache(&impl))
457 5052x return;
458
459 74x std::lock_guard lock(mutex_);
460 74x impl.next_free_ = free_list_;
461 74x free_list_ = &impl;
462 74x }
463
464 inline waiter_node*
465 4874x timer_service::create_waiter()
466 {
467 4874x if (auto* w = try_pop_waiter_tl_cache())
468 4678x return w;
469
470 196x std::lock_guard lock(mutex_);
471 196x if (waiter_free_list_)
472 {
473 2x auto* w = waiter_free_list_;
474 2x waiter_free_list_ = w->next_free_;
475 2x w->next_free_ = nullptr;
476 2x return w;
477 }
478
479 194x return new waiter_node();
480 196x }
481
482 inline void
483 4864x timer_service::destroy_waiter(waiter_node* w)
484 {
485 4864x if (try_push_waiter_tl_cache(w))
486 4784x return;
487
488 80x std::lock_guard lock(mutex_);
489 80x w->next_free_ = waiter_free_list_;
490 80x waiter_free_list_ = w;
491 80x }
492
493 inline std::size_t
494 6x timer_service::update_timer(implementation& impl, time_point new_time)
495 {
496 bool in_heap =
497 6x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
498 6x if (!in_heap && impl.waiters_.empty())
499 return 0;
500
501 6x bool notify = false;
502 6x intrusive_list<waiter_node> canceled;
503
504 {
505 6x std::lock_guard lock(mutex_);
506
507 16x while (auto* w = impl.waiters_.pop_front())
508 {
509 10x w->impl_ = nullptr;
510 10x canceled.push_back(w);
511 10x }
512
513 6x if (impl.heap_index_ < heap_.size())
514 {
515 6x time_point old_time = heap_[impl.heap_index_].time_;
516 6x heap_[impl.heap_index_].time_ = new_time;
517
518 6x if (new_time < old_time)
519 6x up_heap(impl.heap_index_);
520 else
521 down_heap(impl.heap_index_);
522
523 6x notify = (impl.heap_index_ == 0);
524 }
525
526 6x refresh_cached_nearest();
527 6x }
528
529 6x std::size_t count = 0;
530 16x while (auto* w = canceled.pop_front())
531 {
532 10x w->ec_value_ = make_error_code(capy::error::canceled);
533 10x sched_->post(&w->op_);
534 10x ++count;
535 10x }
536
537 6x if (notify)
538 6x on_earliest_changed_();
539
540 6x return count;
541 }
542
543 inline void
544 4874x timer_service::insert_waiter(implementation& impl, waiter_node* w)
545 {
546 4874x bool notify = false;
547 {
548 4874x std::lock_guard lock(mutex_);
549 4874x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
550 {
551 4852x impl.heap_index_ = heap_.size();
552 4852x heap_.push_back({impl.expiry_, &impl});
553 4852x up_heap(heap_.size() - 1);
554 4852x notify = (impl.heap_index_ == 0);
555 4852x refresh_cached_nearest();
556 }
557 4874x impl.waiters_.push_back(w);
558 4874x }
559 4874x if (notify)
560 4825x on_earliest_changed_();
561 4874x }
562
563 inline std::size_t
564 5134x timer_service::cancel_timer(implementation& impl)
565 {
566 5134x if (!impl.might_have_pending_waits_)
567 5110x return 0;
568
569 // Not in heap and no waiters — just clear the flag
570 24x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
571 impl.waiters_.empty())
572 {
573 impl.might_have_pending_waits_ = false;
574 return 0;
575 }
576
577 24x intrusive_list<waiter_node> canceled;
578
579 {
580 24x std::lock_guard lock(mutex_);
581 24x remove_timer_impl(impl);
582 52x while (auto* w = impl.waiters_.pop_front())
583 {
584 28x w->impl_ = nullptr;
585 28x canceled.push_back(w);
586 28x }
587 24x refresh_cached_nearest();
588 24x }
589
590 24x impl.might_have_pending_waits_ = false;
591
592 24x std::size_t count = 0;
593 52x while (auto* w = canceled.pop_front())
594 {
595 28x w->ec_value_ = make_error_code(capy::error::canceled);
596 28x sched_->post(&w->op_);
597 28x ++count;
598 28x }
599
600 24x return count;
601 }
602
603 inline void
604 30x timer_service::cancel_waiter(waiter_node* w)
605 {
606 {
607 30x std::lock_guard lock(mutex_);
608 // Already removed by cancel_timer or process_expired
609 30x if (!w->impl_)
610 return;
611 30x auto* impl = w->impl_;
612 30x w->impl_ = nullptr;
613 30x impl->waiters_.remove(w);
614 30x if (impl->waiters_.empty())
615 {
616 28x remove_timer_impl(*impl);
617 28x impl->might_have_pending_waits_ = false;
618 }
619 30x refresh_cached_nearest();
620 30x }
621
622 30x w->ec_value_ = make_error_code(capy::error::canceled);
623 30x sched_->post(&w->op_);
624 }
625
626 inline std::size_t
627 2x timer_service::cancel_one_waiter(implementation& impl)
628 {
629 2x if (!impl.might_have_pending_waits_)
630 return 0;
631
632 2x waiter_node* w = nullptr;
633
634 {
635 2x std::lock_guard lock(mutex_);
636 2x w = impl.waiters_.pop_front();
637 2x if (!w)
638 return 0;
639 2x w->impl_ = nullptr;
640 2x if (impl.waiters_.empty())
641 {
642 remove_timer_impl(impl);
643 impl.might_have_pending_waits_ = false;
644 }
645 2x refresh_cached_nearest();
646 2x }
647
648 2x w->ec_value_ = make_error_code(capy::error::canceled);
649 2x sched_->post(&w->op_);
650 2x return 1;
651 }
652
653 inline std::size_t
654 53127x timer_service::process_expired()
655 {
656 53127x intrusive_list<waiter_node> expired;
657
658 {
659 53127x std::lock_guard lock(mutex_);
660 53127x auto now = clock_type::now();
661
662 57925x while (!heap_.empty() && heap_[0].time_ <= now)
663 {
664 4798x implementation* t = heap_[0].timer_;
665 4798x remove_timer_impl(*t);
666 9600x while (auto* w = t->waiters_.pop_front())
667 {
668 4802x w->impl_ = nullptr;
669 4802x w->ec_value_ = {};
670 4802x expired.push_back(w);
671 4802x }
672 4798x t->might_have_pending_waits_ = false;
673 }
674
675 53127x refresh_cached_nearest();
676 53127x }
677
678 53127x std::size_t count = 0;
679 57929x while (auto* w = expired.pop_front())
680 {
681 4802x sched_->post(&w->op_);
682 4802x ++count;
683 4802x }
684
685 53127x return count;
686 }
687
688 inline void
689 4850x timer_service::remove_timer_impl(implementation& impl)
690 {
691 4850x std::size_t index = impl.heap_index_;
692 4850x if (index >= heap_.size())
693 return; // Not in heap
694
695 4850x if (index == heap_.size() - 1)
696 {
697 // Last element, just pop
698 136x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
699 136x heap_.pop_back();
700 }
701 else
702 {
703 // Swap with last and reheapify
704 4714x swap_heap(index, heap_.size() - 1);
705 4714x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
706 4714x heap_.pop_back();
707
708 4714x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
709 up_heap(index);
710 else
711 4714x down_heap(index);
712 }
713 }
714
715 inline void
716 4858x timer_service::up_heap(std::size_t index)
717 {
718 9551x while (index > 0)
719 {
720 4720x std::size_t parent = (index - 1) / 2;
721 4720x if (!(heap_[index].time_ < heap_[parent].time_))
722 27x break;
723 4693x swap_heap(index, parent);
724 4693x index = parent;
725 }
726 4858x }
727
728 inline void
729 4714x timer_service::down_heap(std::size_t index)
730 {
731 4714x std::size_t child = index * 2 + 1;
732 4714x while (child < heap_.size())
733 {
734 6x std::size_t min_child = (child + 1 == heap_.size() ||
735 heap_[child].time_ < heap_[child + 1].time_)
736 6x ? child
737 6x : child + 1;
738
739 6x if (heap_[index].time_ < heap_[min_child].time_)
740 6x break;
741
742 swap_heap(index, min_child);
743 index = min_child;
744 child = index * 2 + 1;
745 }
746 4714x }
747
748 inline void
749 9407x timer_service::swap_heap(std::size_t i1, std::size_t i2)
750 {
751 9407x heap_entry tmp = heap_[i1];
752 9407x heap_[i1] = heap_[i2];
753 9407x heap_[i2] = tmp;
754 9407x heap_[i1].timer_->heap_index_ = i1;
755 9407x heap_[i2].timer_->heap_index_ = i2;
756 9407x }
757
758 // waiter_node out-of-class member function definitions
759
760 inline void
761 30x waiter_node::canceller::operator()() const
762 {
763 30x waiter_->svc_->cancel_waiter(waiter_);
764 30x }
765
766 inline void
767 waiter_node::completion_op::do_complete(
768 [[maybe_unused]] void* owner,
769 scheduler_op* base,
770 std::uint32_t,
771 std::uint32_t)
772 {
773 // owner is always non-null here. The destroy path (owner == nullptr)
774 // is unreachable because completion_op overrides destroy() directly,
775 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
776 BOOST_COROSIO_ASSERT(owner);
777 static_cast<completion_op*>(base)->operator()();
778 }
779
780 inline void
781 4864x waiter_node::completion_op::operator()()
782 {
783 4864x auto* w = waiter_;
784 4864x w->stop_cb_.reset();
785 4864x if (w->ec_out_)
786 4864x *w->ec_out_ = w->ec_value_;
787
788 4864x auto h = w->h_;
789 4864x auto d = w->d_;
790 4864x auto* svc = w->svc_;
791 4864x auto& sched = svc->get_scheduler();
792
793 4864x svc->destroy_waiter(w);
794
795 4864x d.post(h);
796 4864x sched.work_finished();
797 4864x }
798
799 // GCC 14 false-positive: inlining ~optional<stop_callback> through
800 // delete loses track that stop_cb_ was already .reset() above.
801 #if defined(__GNUC__) && !defined(__clang__)
802 #pragma GCC diagnostic push
803 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
804 #endif
805 inline void
806 8x waiter_node::completion_op::destroy()
807 {
808 // Called during scheduler shutdown drain when this completion_op is
809 // in the scheduler's ready queue (posted by cancel_timer() or
810 // process_expired()). Balances the work_started() from
811 // implementation::wait(). The scheduler drain loop separately
812 // balances the work_started() from post(). On IOCP both decrements
813 // are required for outstanding_work_ to reach zero; on other
814 // backends this is harmless.
815 //
816 // This override also prevents scheduler_op::destroy() from calling
817 // do_complete(nullptr, ...). See also: timer_service::shutdown()
818 // which drains waiters still in the timer heap (the other path).
819 8x auto* w = waiter_;
820 8x w->stop_cb_.reset();
821 8x auto h = std::exchange(w->h_, {});
822 8x auto& sched = w->svc_->get_scheduler();
823 8x delete w;
824 8x sched.work_finished();
825 8x if (h)
826 8x h.destroy();
827 8x }
828 #if defined(__GNUC__) && !defined(__clang__)
829 #pragma GCC diagnostic pop
830 #endif
831
832 inline std::coroutine_handle<>
833 4875x timer_service::implementation::wait(
834 std::coroutine_handle<> h,
835 capy::executor_ref d,
836 std::stop_token token,
837 std::error_code* ec)
838 {
839 // Already-expired fast path — no waiter_node, no mutex.
840 // Post instead of dispatch so the coroutine yields to the
841 // scheduler, allowing other queued work to run.
842 4875x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
843 {
844 4853x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
845 {
846 1x if (ec)
847 1x *ec = {};
848 1x d.post(h);
849 1x return std::noop_coroutine();
850 }
851 }
852
853 4874x auto* w = svc_->create_waiter();
854 4874x w->impl_ = this;
855 4874x w->svc_ = svc_;
856 4874x w->h_ = h;
857 4874x w->d_ = d;
858 4874x w->token_ = std::move(token);
859 4874x w->ec_out_ = ec;
860
861 4874x svc_->insert_waiter(*this, w);
862 4874x might_have_pending_waits_ = true;
863 4874x svc_->get_scheduler().work_started();
864
865 4874x if (w->token_.stop_possible())
866 48x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
867
868 4874x return std::noop_coroutine();
869 }
870
871 // Free functions
872
873 struct timer_service_access
874 {
875 5128x static native_scheduler& get_scheduler(io_context& ctx) noexcept
876 {
877 5128x return static_cast<native_scheduler&>(*ctx.sched_);
878 }
879 };
880
881 // Bypass find_service() mutex by reading the scheduler's cached pointer
882 inline io_object::io_service&
883 5128x timer_service_direct(capy::execution_context& ctx) noexcept
884 {
885 5128x return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
886 5128x .timer_svc_;
887 }
888
889 inline std::size_t
890 6x timer_service_update_expiry(timer::implementation& base)
891 {
892 6x auto& impl = static_cast<timer_service::implementation&>(base);
893 6x return impl.svc_->update_timer(impl, impl.expiry_);
894 }
895
896 inline std::size_t
897 8x timer_service_cancel(timer::implementation& base) noexcept
898 {
899 8x auto& impl = static_cast<timer_service::implementation&>(base);
900 8x return impl.svc_->cancel_timer(impl);
901 }
902
903 inline std::size_t
904 2x timer_service_cancel_one(timer::implementation& base) noexcept
905 {
906 2x auto& impl = static_cast<timer_service::implementation&>(base);
907 2x return impl.svc_->cancel_one_waiter(impl);
908 }
909
910 inline timer_service&
911 412x get_timer_service(capy::execution_context& ctx, scheduler& sched)
912 {
913 412x return ctx.make_service<timer_service>(sched);
914 }
915
916 } // namespace boost::corosio::detail
917
918 #endif
919