libs/capy/src/ex/detail/strand_service.cpp

96.7% Lines (88/91) 95.5% Functions (21/22) 90.3% Branches (28/31)
libs/capy/src/ex/detail/strand_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <atomic>
13 #include <coroutine>
14 #include <mutex>
15 #include <thread>
16 #include <utility>
17
18 namespace boost {
19 namespace capy {
20 namespace detail {
21
22 //----------------------------------------------------------
23
24 /** Implementation state for a strand.
25
26 Each strand_impl provides serialization for coroutines
27 dispatched through strands that share it.
28 */
29 struct strand_impl
30 {
31 std::mutex mutex_;
32 strand_queue pending_;
33 bool locked_ = false;
34 std::atomic<std::thread::id> dispatch_thread_{};
35 void* cached_frame_ = nullptr;
36 };
37
38 //----------------------------------------------------------
39
40 /** Invoker coroutine for strand dispatch.
41
42 Uses custom allocator to recycle frame - one allocation
43 per strand_impl lifetime, stored in trailer for recovery.
44 */
45 struct strand_invoker
46 {
47 struct promise_type
48 {
49 9 void* operator new(std::size_t n, strand_impl& impl)
50 {
51 9 constexpr auto A = alignof(strand_impl*);
52 9 std::size_t padded = (n + A - 1) & ~(A - 1);
53 9 std::size_t total = padded + sizeof(strand_impl*);
54
55 9 void* p = impl.cached_frame_
56
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 7 times.
9 ? std::exchange(impl.cached_frame_, nullptr)
57
1/1
✓ Branch 1 taken 7 times.
7 : ::operator new(total);
58
59 // Trailer lets delete recover impl
60 9 *reinterpret_cast<strand_impl**>(
61 9 static_cast<char*>(p) + padded) = &impl;
62 9 return p;
63 }
64
65 9 void operator delete(void* p, std::size_t n) noexcept
66 {
67 9 constexpr auto A = alignof(strand_impl*);
68 9 std::size_t padded = (n + A - 1) & ~(A - 1);
69
70 9 auto* impl = *reinterpret_cast<strand_impl**>(
71 static_cast<char*>(p) + padded);
72
73
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (!impl->cached_frame_)
74 9 impl->cached_frame_ = p;
75 else
76 ::operator delete(p);
77 9 }
78
79 9 strand_invoker get_return_object() noexcept
80 9 { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
81
82 9 std::suspend_always initial_suspend() noexcept { return {}; }
83 9 std::suspend_never final_suspend() noexcept { return {}; }
84 9 void return_void() noexcept {}
85 void unhandled_exception() { std::terminate(); }
86 };
87
88 std::coroutine_handle<promise_type> h_;
89 };
90
91 //----------------------------------------------------------
92
93 /** Concrete implementation of strand_service.
94
95 Holds the fixed pool of strand_impl objects.
96 */
97 class strand_service_impl : public strand_service
98 {
99 static constexpr std::size_t num_impls = 211;
100
101 strand_impl impls_[num_impls];
102 std::size_t salt_ = 0;
103 std::mutex mutex_;
104
105 public:
106 explicit
107 19 strand_service_impl(execution_context&)
108 4028 {
109 19 }
110
111 strand_impl*
112 23 get_implementation() override
113 {
114
1/1
✓ Branch 1 taken 23 times.
23 std::lock_guard<std::mutex> lock(mutex_);
115 23 std::size_t index = salt_++;
116 23 index = index % num_impls;
117 23 return &impls_[index];
118 23 }
119
120 protected:
121 void
122 19 shutdown() override
123 {
124
2/2
✓ Branch 0 taken 4009 times.
✓ Branch 1 taken 19 times.
4028 for(std::size_t i = 0; i < num_impls; ++i)
125 {
126
1/1
✓ Branch 1 taken 4009 times.
4009 std::lock_guard<std::mutex> lock(impls_[i].mutex_);
127 4009 impls_[i].locked_ = true;
128
129
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 4002 times.
4009 if(impls_[i].cached_frame_)
130 {
131 7 ::operator delete(impls_[i].cached_frame_);
132 7 impls_[i].cached_frame_ = nullptr;
133 }
134 4009 }
135 19 }
136
137 private:
138 static bool
139 322 enqueue(strand_impl& impl, std::coroutine_handle<> h)
140 {
141
1/1
✓ Branch 1 taken 322 times.
322 std::lock_guard<std::mutex> lock(impl.mutex_);
142
1/1
✓ Branch 1 taken 322 times.
322 impl.pending_.push(h);
143
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 313 times.
322 if(!impl.locked_)
144 {
145 9 impl.locked_ = true;
146 9 return true;
147 }
148 313 return false;
149 322 }
150
151 static void
152 17 dispatch_pending(strand_impl& impl)
153 {
154 17 strand_queue::taken_batch batch;
155 {
156
1/1
✓ Branch 1 taken 17 times.
17 std::lock_guard<std::mutex> lock(impl.mutex_);
157 17 batch = impl.pending_.take_all();
158 17 }
159
1/1
✓ Branch 1 taken 17 times.
17 impl.pending_.dispatch_batch(batch);
160 17 }
161
162 static bool
163 17 try_unlock(strand_impl& impl)
164 {
165
1/1
✓ Branch 1 taken 17 times.
17 std::lock_guard<std::mutex> lock(impl.mutex_);
166
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 8 times.
17 if(impl.pending_.empty())
167 {
168 9 impl.locked_ = false;
169 9 return true;
170 }
171 8 return false;
172 17 }
173
174 static void
175 17 set_dispatch_thread(strand_impl& impl) noexcept
176 {
177 17 impl.dispatch_thread_.store(std::this_thread::get_id());
178 17 }
179
180 static void
181 9 clear_dispatch_thread(strand_impl& impl) noexcept
182 {
183 9 impl.dispatch_thread_.store(std::thread::id{});
184 9 }
185
186 // Loops until queue empty (aggressive). Alternative: per-batch fairness
187 // (repost after each batch to let other work run) - explore if starvation observed.
188 static strand_invoker
189
1/1
✓ Branch 1 taken 9 times.
9 make_invoker(strand_impl& impl)
190 {
191 strand_impl* p = &impl;
192 for(;;)
193 {
194 set_dispatch_thread(*p);
195 dispatch_pending(*p);
196 if(try_unlock(*p))
197 {
198 clear_dispatch_thread(*p);
199 co_return;
200 }
201 }
202 18 }
203
204 friend class strand_service;
205 };
206
207 //----------------------------------------------------------
208
209 19 strand_service::
210 19 strand_service()
211 19 : service()
212 {
213 19 }
214
215 19 strand_service::
216 ~strand_service() = default;
217
218 bool
219 2 strand_service::
220 running_in_this_thread(strand_impl& impl) noexcept
221 {
222 2 return impl.dispatch_thread_.load() == std::this_thread::get_id();
223 }
224
225 std::coroutine_handle<>
226 1 strand_service::
227 dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
228 {
229
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if(running_in_this_thread(impl))
230 return h;
231
232
1/2
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
1 if(strand_service_impl::enqueue(impl, h))
233
2/2
✓ Branch 1 taken 1 time.
✓ Branch 5 taken 1 time.
1 ex.post(strand_service_impl::make_invoker(impl).h_);
234 1 return std::noop_coroutine();
235 }
236
237 void
238 321 strand_service::
239 post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
240 {
241
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 313 times.
321 if(strand_service_impl::enqueue(impl, h))
242
2/2
✓ Branch 1 taken 8 times.
✓ Branch 5 taken 8 times.
8 ex.post(strand_service_impl::make_invoker(impl).h_);
243 321 }
244
245 strand_service&
246 23 get_strand_service(execution_context& ctx)
247 {
248 23 return ctx.use_service<strand_service_impl>();
249 }
250
251 } // namespace detail
252 } // namespace capy
253 } // namespace boost
254