src/corosio/src/tcp_server.cpp

66.2% Lines (47/71) 87.5% List of functions (14/16)
f(x) Functions (16)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #include <boost/corosio/tcp_server.hpp>
12 #include <boost/corosio/detail/except.hpp>
13 #include <condition_variable>
14 #include <mutex>
15 #include <utility>
16
17 namespace boost::corosio {
18
19 36x tcp_server::worker_base::worker_base() = default;
20 36x tcp_server::worker_base::~worker_base() = default;
21
22 struct tcp_server::impl
23 {
24 std::mutex join_mutex;
25 std::condition_variable join_cv;
26 capy::execution_context& ctx;
27 std::vector<tcp_acceptor> ports;
28 std::stop_source stop;
29
30 9x explicit impl(capy::execution_context& c) noexcept : ctx(c) {}
31 };
32
33 tcp_server::impl*
34 9x tcp_server::make_impl(capy::execution_context& ctx)
35 {
36 9x return new impl(ctx);
37 }
38
39 9x tcp_server::~tcp_server()
40 {
41 9x delete impl_;
42 9x }
43
44 tcp_server::tcp_server(tcp_server&& o) noexcept
45 : impl_(std::exchange(o.impl_, nullptr))
46 , ex_(o.ex_)
47 , waiters_(std::exchange(o.waiters_, nullptr))
48 , idle_head_(std::exchange(o.idle_head_, nullptr))
49 , active_head_(std::exchange(o.active_head_, nullptr))
50 , active_tail_(std::exchange(o.active_tail_, nullptr))
51 , active_accepts_(std::exchange(o.active_accepts_, 0))
52 , storage_(std::move(o.storage_))
53 , running_(std::exchange(o.running_, false))
54 {
55 }
56
57 tcp_server&
58 tcp_server::operator=(tcp_server&& o) noexcept
59 {
60 delete impl_;
61 impl_ = std::exchange(o.impl_, nullptr);
62 ex_ = o.ex_;
63 waiters_ = std::exchange(o.waiters_, nullptr);
64 idle_head_ = std::exchange(o.idle_head_, nullptr);
65 active_head_ = std::exchange(o.active_head_, nullptr);
66 active_tail_ = std::exchange(o.active_tail_, nullptr);
67 active_accepts_ = std::exchange(o.active_accepts_, 0);
68 storage_ = std::move(o.storage_);
69 running_ = std::exchange(o.running_, false);
70 return *this;
71 }
72
73 // Accept loop: wait for idle worker, accept connection, dispatch
74 capy::task<void>
75 8x tcp_server::do_accept(tcp_acceptor& acc)
76 {
77 // Analyzer can't trace value through coroutine await_transform
78 // NOLINTNEXTLINE(clang-analyzer-core.uninitialized.UndefReturn)
79 auto env = co_await capy::this_coro::environment;
80 while (!env->stop_token.stop_requested())
81 {
82 // Wait for an idle worker before blocking on accept
83 auto& w = co_await pop();
84 auto [ec] = co_await acc.accept(w.socket());
85 if (ec)
86 {
87 co_await push(w);
88 continue;
89 }
90 w.run(launcher{*this, w});
91 }
92 16x }
93
94 std::error_code
95 9x tcp_server::bind(endpoint ep)
96 {
97 try
98 {
99 9x impl_->ports.emplace_back(impl_->ctx, ep);
100 8x return {};
101 }
102 1x catch (std::system_error const& e)
103 {
104 1x return e.code();
105 1x }
106 }
107
108 endpoint
109 2x tcp_server::local_endpoint(std::size_t index) const noexcept
110 {
111 2x if (index >= impl_->ports.size())
112 return endpoint{};
113 2x return impl_->ports[index].local_endpoint();
114 }
115
116 void
117 10x tcp_server::start()
118 {
119 // Idempotent - only start if not already running
120 10x if (running_)
121 1x return;
122
123 // Previous session must be fully stopped before restart
124 9x if (active_accepts_ != 0)
125 1x detail::throw_logic_error(
126 "tcp_server::start: previous session not joined");
127
128 8x running_ = true;
129
130 8x impl_->stop = {}; // Fresh stop source
131 8x auto st = impl_->stop.get_token();
132
133 8x active_accepts_ = impl_->ports.size();
134
135 // Launch with completion handler that decrements counter
136 16x for (auto& t : impl_->ports)
137 16x capy::run_async(ex_, st, [this]() {
138 8x std::lock_guard lock(impl_->join_mutex);
139 8x if (--active_accepts_ == 0)
140 8x impl_->join_cv.notify_all();
141 16x })(do_accept(t));
142 8x }
143
144 void
145 10x tcp_server::stop()
146 {
147 // Idempotent - only stop if running
148 10x if (!running_)
149 2x return;
150 8x running_ = false;
151
152 // Stop accept loops
153 8x impl_->stop.request_stop();
154
155 // Launch cancellation coroutine on server executor
156 8x capy::run_async(ex_, std::stop_token{})(do_stop());
157 }
158
159 void
160 4x tcp_server::join()
161 {
162 4x std::unique_lock lock(impl_->join_mutex);
163 8x impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
164 4x }
165
166 capy::task<>
167 8x tcp_server::do_stop()
168 {
169 // Running on server executor - safe to iterate active list
170 // Just cancel, don't modify list - workers return themselves when done
171 for (auto* w = active_head_; w; w = w->next_)
172 w->stop_.request_stop();
173 co_return;
174 16x }
175
176 } // namespace boost::corosio
177