include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp

80.6% Lines (203/252) 95.8% List of functions (23/24)
f(x) Functions (24)
Function Calls Lines Branches Blocks
boost::corosio::detail::epoll_acceptor_state::epoll_acceptor_state(boost::corosio::detail::epoll_scheduler&) :46 0 100.0% boost::corosio::detail::epoll_acceptor_service::scheduler() const :87 0 100.0% boost::corosio::detail::epoll_accept_op::cancel() :110 0 80.0% boost::corosio::detail::epoll_accept_op::operator()() :119 0 88.9% boost::corosio::detail::epoll_acceptor::epoll_acceptor(boost::corosio::detail::epoll_acceptor_service&) :192 0 100.0% boost::corosio::detail::epoll_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :198 0 52.3% boost::corosio::detail::epoll_acceptor::cancel() :312 0 100.0% boost::corosio::detail::epoll_acceptor::cancel_single_op(boost::corosio::detail::epoll_op&) :318 0 93.3% boost::corosio::detail::epoll_acceptor::close_socket() :341 0 96.0% boost::corosio::detail::epoll_acceptor_service::epoll_acceptor_service(boost::capy::execution_context&) :381 0 100.0% boost::corosio::detail::epoll_acceptor_service::~epoll_acceptor_service() :390 0 100.0% boost::corosio::detail::epoll_acceptor_service::shutdown() :393 0 80.0% boost::corosio::detail::epoll_acceptor_service::construct() :406 0 100.0% boost::corosio::detail::epoll_acceptor_service::destroy(boost::corosio::io_object::implementation*) :419 0 100.0% boost::corosio::detail::epoll_acceptor_service::close(boost::corosio::io_object::handle&) :429 0 100.0% boost::corosio::detail::epoll_acceptor::set_option(int, int, void const*, unsigned long) :435 0 75.0% boost::corosio::detail::epoll_acceptor::get_option(int, int, void*, unsigned long*) const :445 0 0.0% boost::corosio::detail::epoll_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :456 0 93.3% boost::corosio::detail::epoll_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :485 0 100.0% boost::corosio::detail::epoll_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :506 0 85.7% boost::corosio::detail::epoll_acceptor_service::post(boost::corosio::detail::epoll_op*) :522 0 100.0% boost::corosio::detail::epoll_acceptor_service::work_started() :528 0 100.0% boost::corosio::detail::epoll_acceptor_service::work_finished() :534 0 100.0% boost::corosio::detail::epoll_acceptor_service::socket_service() const :540 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24
25 #include <boost/corosio/native/detail/endpoint_convert.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/native/detail/make_err.hpp>
28
29 #include <memory>
30 #include <mutex>
31 #include <unordered_map>
32 #include <utility>
33
34 #include <errno.h>
35 #include <netinet/in.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 namespace boost::corosio::detail {
41
42 /** State for epoll acceptor service. */
43 class epoll_acceptor_state
44 {
45 public:
46 244x explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
47 244x : sched_(sched)
48 {
49 244x }
50
51 epoll_scheduler& sched_;
52 std::mutex mutex_;
53 intrusive_list<epoll_acceptor> acceptor_list_;
54 std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
55 acceptor_ptrs_;
56 };
57
58 /** epoll acceptor service implementation.
59
60 Inherits from acceptor_service to enable runtime polymorphism.
61 Uses key_type = acceptor_service for service lookup.
62 */
63 class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
64 {
65 public:
66 explicit epoll_acceptor_service(capy::execution_context& ctx);
67 ~epoll_acceptor_service() override;
68
69 epoll_acceptor_service(epoll_acceptor_service const&) = delete;
70 epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
71
72 void shutdown() override;
73
74 io_object::implementation* construct() override;
75 void destroy(io_object::implementation*) override;
76 void close(io_object::handle&) override;
77 std::error_code open_acceptor_socket(
78 tcp_acceptor::implementation& impl,
79 int family,
80 int type,
81 int protocol) override;
82 std::error_code
83 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
84 std::error_code
85 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
86
87 3304x epoll_scheduler& scheduler() const noexcept
88 {
89 3304x return state_->sched_;
90 }
91 void post(epoll_op* op);
92 void work_started() noexcept;
93 void work_finished() noexcept;
94
95 /** Get the socket service for creating peer sockets during accept. */
96 epoll_socket_service* socket_service() const noexcept;
97
98 private:
99 capy::execution_context& ctx_;
100 std::unique_ptr<epoll_acceptor_state> state_;
101 };
102
103 //--------------------------------------------------------------------------
104 //
105 // Implementation
106 //
107 //--------------------------------------------------------------------------
108
109 inline void
110 6x epoll_accept_op::cancel() noexcept
111 {
112 6x if (acceptor_impl_)
113 6x acceptor_impl_->cancel_single_op(*this);
114 else
115 request_cancel();
116 6x }
117
118 inline void
119 3152x epoll_accept_op::operator()()
120 {
121 3152x stop_cb.reset();
122
123 3152x static_cast<epoll_acceptor*>(acceptor_impl_)
124 3152x ->service()
125 3152x .scheduler()
126 3152x .reset_inline_budget();
127
128 3152x bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
129
130 3152x if (cancelled.load(std::memory_order_acquire))
131 9x *ec_out = capy::error::canceled;
132 3143x else if (errn != 0)
133 *ec_out = make_err(errn);
134 else
135 3143x *ec_out = {};
136
137 // Set up the peer socket on success
138 3152x if (success && accepted_fd >= 0 && acceptor_impl_)
139 {
140 3143x auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
141 3143x ->service()
142 3143x .socket_service();
143 3143x if (socket_svc)
144 {
145 3143x auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
146 3143x impl.set_socket(accepted_fd);
147
148 3143x impl.desc_state_.fd = accepted_fd;
149 {
150 3143x std::lock_guard lock(impl.desc_state_.mutex);
151 3143x impl.desc_state_.read_op = nullptr;
152 3143x impl.desc_state_.write_op = nullptr;
153 3143x impl.desc_state_.connect_op = nullptr;
154 3143x }
155 3143x socket_svc->scheduler().register_descriptor(
156 accepted_fd, &impl.desc_state_);
157
158 3143x impl.set_endpoints(
159 3143x static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
160 3143x from_sockaddr(peer_storage));
161
162 3143x if (impl_out)
163 3143x *impl_out = &impl;
164 3143x accepted_fd = -1;
165 }
166 else
167 {
168 // No socket service — treat as error
169 *ec_out = make_err(ENOENT);
170 success = false;
171 }
172 }
173
174 3152x if (!success || !acceptor_impl_)
175 {
176 9x if (accepted_fd >= 0)
177 {
178 ::close(accepted_fd);
179 accepted_fd = -1;
180 }
181 9x if (impl_out)
182 9x *impl_out = nullptr;
183 }
184
185 // Move to stack before resuming. See epoll_op::operator()() for rationale.
186 3152x capy::executor_ref saved_ex(ex);
187 3152x std::coroutine_handle<> saved_h(h);
188 3152x auto prevent_premature_destruction = std::move(impl_ptr);
189 3152x dispatch_coro(saved_ex, saved_h).resume();
190 3152x }
191
192 80x inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
193 80x : svc_(svc)
194 {
195 80x }
196
197 inline std::coroutine_handle<>
198 3152x epoll_acceptor::accept(
199 std::coroutine_handle<> h,
200 capy::executor_ref ex,
201 std::stop_token token,
202 std::error_code* ec,
203 io_object::implementation** impl_out)
204 {
205 3152x auto& op = acc_;
206 3152x op.reset();
207 3152x op.h = h;
208 3152x op.ex = ex;
209 3152x op.ec_out = ec;
210 3152x op.impl_out = impl_out;
211 3152x op.fd = fd_;
212 3152x op.start(token, this);
213
214 3152x sockaddr_storage peer_storage{};
215 3152x socklen_t addrlen = sizeof(peer_storage);
216 int accepted;
217 do
218 {
219 3152x accepted = ::accept4(
220 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
221 SOCK_NONBLOCK | SOCK_CLOEXEC);
222 }
223 3152x while (accepted < 0 && errno == EINTR);
224
225 3152x if (accepted >= 0)
226 {
227 {
228 2x std::lock_guard lock(desc_state_.mutex);
229 2x desc_state_.read_ready = false;
230 2x }
231
232 2x if (svc_.scheduler().try_consume_inline_budget())
233 {
234 auto* socket_svc = svc_.socket_service();
235 if (socket_svc)
236 {
237 auto& impl =
238 static_cast<epoll_socket&>(*socket_svc->construct());
239 impl.set_socket(accepted);
240
241 impl.desc_state_.fd = accepted;
242 {
243 std::lock_guard lock(impl.desc_state_.mutex);
244 impl.desc_state_.read_op = nullptr;
245 impl.desc_state_.write_op = nullptr;
246 impl.desc_state_.connect_op = nullptr;
247 }
248 socket_svc->scheduler().register_descriptor(
249 accepted, &impl.desc_state_);
250
251 impl.set_endpoints(
252 local_endpoint_, from_sockaddr(peer_storage));
253
254 *ec = {};
255 if (impl_out)
256 *impl_out = &impl;
257 }
258 else
259 {
260 ::close(accepted);
261 *ec = make_err(ENOENT);
262 if (impl_out)
263 *impl_out = nullptr;
264 }
265 return dispatch_coro(ex, h);
266 }
267
268 2x op.accepted_fd = accepted;
269 2x op.peer_storage = peer_storage;
270 2x op.complete(0, 0);
271 2x op.impl_ptr = shared_from_this();
272 2x svc_.post(&op);
273 2x return std::noop_coroutine();
274 }
275
276 3150x if (errno == EAGAIN || errno == EWOULDBLOCK)
277 {
278 3150x op.impl_ptr = shared_from_this();
279 3150x svc_.work_started();
280
281 3150x std::lock_guard lock(desc_state_.mutex);
282 3150x bool io_done = false;
283 3150x if (desc_state_.read_ready)
284 {
285 desc_state_.read_ready = false;
286 op.perform_io();
287 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
288 if (!io_done)
289 op.errn = 0;
290 }
291
292 3150x if (io_done || op.cancelled.load(std::memory_order_acquire))
293 {
294 svc_.post(&op);
295 svc_.work_finished();
296 }
297 else
298 {
299 3150x desc_state_.read_op = &op;
300 }
301 3150x return std::noop_coroutine();
302 3150x }
303
304 op.complete(errno, 0);
305 op.impl_ptr = shared_from_this();
306 svc_.post(&op);
307 // completion is always posted to scheduler queue, never inline.
308 return std::noop_coroutine();
309 }
310
311 inline void
312 2x epoll_acceptor::cancel() noexcept
313 {
314 2x cancel_single_op(acc_);
315 2x }
316
317 inline void
318 8x epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
319 {
320 8x auto self = weak_from_this().lock();
321 8x if (!self)
322 return;
323
324 8x op.request_cancel();
325
326 8x epoll_op* claimed = nullptr;
327 {
328 8x std::lock_guard lock(desc_state_.mutex);
329 8x if (desc_state_.read_op == &op)
330 7x claimed = std::exchange(desc_state_.read_op, nullptr);
331 8x }
332 8x if (claimed)
333 {
334 7x op.impl_ptr = self;
335 7x svc_.post(&op);
336 7x svc_.work_finished();
337 }
338 8x }
339
340 inline void
341 318x epoll_acceptor::close_socket() noexcept
342 {
343 318x auto self = weak_from_this().lock();
344 318x if (self)
345 {
346 318x acc_.request_cancel();
347
348 318x epoll_op* claimed = nullptr;
349 {
350 318x std::lock_guard lock(desc_state_.mutex);
351 318x claimed = std::exchange(desc_state_.read_op, nullptr);
352 318x desc_state_.read_ready = false;
353 318x desc_state_.write_ready = false;
354 318x }
355
356 318x if (claimed)
357 {
358 2x acc_.impl_ptr = self;
359 2x svc_.post(&acc_);
360 2x svc_.work_finished();
361 }
362
363 318x if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
364 desc_state_.impl_ref_ = self;
365 }
366
367 318x if (fd_ >= 0)
368 {
369 79x if (desc_state_.registered_events != 0)
370 75x svc_.scheduler().deregister_descriptor(fd_);
371 79x ::close(fd_);
372 79x fd_ = -1;
373 }
374
375 318x desc_state_.fd = -1;
376 318x desc_state_.registered_events = 0;
377
378 318x local_endpoint_ = endpoint{};
379 318x }
380
381 244x inline epoll_acceptor_service::epoll_acceptor_service(
382 244x capy::execution_context& ctx)
383 244x : ctx_(ctx)
384 244x , state_(
385 std::make_unique<epoll_acceptor_state>(
386 244x ctx.use_service<epoll_scheduler>()))
387 {
388 244x }
389
390 488x inline epoll_acceptor_service::~epoll_acceptor_service() {}
391
392 inline void
393 244x epoll_acceptor_service::shutdown()
394 {
395 244x std::lock_guard lock(state_->mutex_);
396
397 244x while (auto* impl = state_->acceptor_list_.pop_front())
398 impl->close_socket();
399
400 // Don't clear acceptor_ptrs_ here — same rationale as
401 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
402 // after scheduler shutdown has drained all queued ops.
403 244x }
404
405 inline io_object::implementation*
406 80x epoll_acceptor_service::construct()
407 {
408 80x auto impl = std::make_shared<epoll_acceptor>(*this);
409 80x auto* raw = impl.get();
410
411 80x std::lock_guard lock(state_->mutex_);
412 80x state_->acceptor_list_.push_back(raw);
413 80x state_->acceptor_ptrs_.emplace(raw, std::move(impl));
414
415 80x return raw;
416 80x }
417
418 inline void
419 80x epoll_acceptor_service::destroy(io_object::implementation* impl)
420 {
421 80x auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
422 80x epoll_impl->close_socket();
423 80x std::lock_guard lock(state_->mutex_);
424 80x state_->acceptor_list_.remove(epoll_impl);
425 80x state_->acceptor_ptrs_.erase(epoll_impl);
426 80x }
427
428 inline void
429 159x epoll_acceptor_service::close(io_object::handle& h)
430 {
431 159x static_cast<epoll_acceptor*>(h.get())->close_socket();
432 159x }
433
434 inline std::error_code
435 77x epoll_acceptor::set_option(
436 int level, int optname, void const* data, std::size_t size) noexcept
437 {
438 77x if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
439 0)
440 return make_err(errno);
441 77x return {};
442 }
443
444 inline std::error_code
445 epoll_acceptor::get_option(
446 int level, int optname, void* data, std::size_t* size) const noexcept
447 {
448 socklen_t len = static_cast<socklen_t>(*size);
449 if (::getsockopt(fd_, level, optname, data, &len) != 0)
450 return make_err(errno);
451 *size = static_cast<std::size_t>(len);
452 return {};
453 }
454
455 inline std::error_code
456 79x epoll_acceptor_service::open_acceptor_socket(
457 tcp_acceptor::implementation& impl, int family, int type, int protocol)
458 {
459 79x auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
460 79x epoll_impl->close_socket();
461
462 79x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
463 79x if (fd < 0)
464 return make_err(errno);
465
466 79x if (family == AF_INET6)
467 {
468 8x int val = 0; // dual-stack default
469 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
470 }
471
472 79x epoll_impl->fd_ = fd;
473
474 // Set up descriptor state but do NOT register with epoll yet
475 79x epoll_impl->desc_state_.fd = fd;
476 {
477 79x std::lock_guard lock(epoll_impl->desc_state_.mutex);
478 79x epoll_impl->desc_state_.read_op = nullptr;
479 79x }
480
481 79x return {};
482 }
483
484 inline std::error_code
485 78x epoll_acceptor_service::bind_acceptor(
486 tcp_acceptor::implementation& impl, endpoint ep)
487 {
488 78x auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
489 78x int fd = epoll_impl->fd_;
490
491 78x sockaddr_storage storage{};
492 78x socklen_t addrlen = detail::to_sockaddr(ep, storage);
493 78x if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
494 3x return make_err(errno);
495
496 // Cache local endpoint (resolves ephemeral port)
497 75x sockaddr_storage local{};
498 75x socklen_t local_len = sizeof(local);
499 75x if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
500 75x epoll_impl->set_local_endpoint(detail::from_sockaddr(local));
501
502 75x return {};
503 }
504
505 inline std::error_code
506 75x epoll_acceptor_service::listen_acceptor(
507 tcp_acceptor::implementation& impl, int backlog)
508 {
509 75x auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
510 75x int fd = epoll_impl->fd_;
511
512 75x if (::listen(fd, backlog) < 0)
513 return make_err(errno);
514
515 // Register fd with epoll (edge-triggered mode)
516 75x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
517
518 75x return {};
519 }
520
521 inline void
522 11x epoll_acceptor_service::post(epoll_op* op)
523 {
524 11x state_->sched_.post(op);
525 11x }
526
527 inline void
528 3150x epoll_acceptor_service::work_started() noexcept
529 {
530 3150x state_->sched_.work_started();
531 3150x }
532
533 inline void
534 9x epoll_acceptor_service::work_finished() noexcept
535 {
536 9x state_->sched_.work_finished();
537 9x }
538
539 inline epoll_socket_service*
540 3143x epoll_acceptor_service::socket_service() const noexcept
541 {
542 3143x auto* svc = ctx_.find_service<detail::socket_service>();
543 3143x return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
544 }
545
546 } // namespace boost::corosio::detail
547
548 #endif // BOOST_COROSIO_HAS_EPOLL
549
550 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
551