Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #include "src/ex/detail/strand_queue.hpp"
11 : #include <boost/capy/ex/detail/strand_service.hpp>
12 : #include <atomic>
13 : #include <coroutine>
14 : #include <mutex>
15 : #include <thread>
16 : #include <utility>
17 :
18 : namespace boost {
19 : namespace capy {
20 : namespace detail {
21 :
22 : //----------------------------------------------------------
23 :
24 : /** Implementation state for a strand.
25 :
26 : Each strand_impl provides serialization for coroutines
27 : dispatched through strands that share it.
28 : */
29 : struct strand_impl
30 : {
31 : std::mutex mutex_;
32 : strand_queue pending_;
33 : bool locked_ = false;
34 : std::atomic<std::thread::id> dispatch_thread_{};
35 : void* cached_frame_ = nullptr;
36 : };
37 :
38 : //----------------------------------------------------------
39 :
40 : /** Invoker coroutine for strand dispatch.
41 :
42 : Uses custom allocator to recycle frame - one allocation
43 : per strand_impl lifetime, stored in trailer for recovery.
44 : */
45 : struct strand_invoker
46 : {
47 : struct promise_type
48 : {
49 9 : void* operator new(std::size_t n, strand_impl& impl)
50 : {
51 9 : constexpr auto A = alignof(strand_impl*);
52 9 : std::size_t padded = (n + A - 1) & ~(A - 1);
53 9 : std::size_t total = padded + sizeof(strand_impl*);
54 :
55 9 : void* p = impl.cached_frame_
56 9 : ? std::exchange(impl.cached_frame_, nullptr)
57 7 : : ::operator new(total);
58 :
59 : // Trailer lets delete recover impl
60 9 : *reinterpret_cast<strand_impl**>(
61 9 : static_cast<char*>(p) + padded) = &impl;
62 9 : return p;
63 : }
64 :
65 9 : void operator delete(void* p, std::size_t n) noexcept
66 : {
67 9 : constexpr auto A = alignof(strand_impl*);
68 9 : std::size_t padded = (n + A - 1) & ~(A - 1);
69 :
70 9 : auto* impl = *reinterpret_cast<strand_impl**>(
71 : static_cast<char*>(p) + padded);
72 :
73 9 : if (!impl->cached_frame_)
74 9 : impl->cached_frame_ = p;
75 : else
76 0 : ::operator delete(p);
77 9 : }
78 :
79 9 : strand_invoker get_return_object() noexcept
80 9 : { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
81 :
82 9 : std::suspend_always initial_suspend() noexcept { return {}; }
83 9 : std::suspend_never final_suspend() noexcept { return {}; }
84 9 : void return_void() noexcept {}
85 0 : void unhandled_exception() { std::terminate(); }
86 : };
87 :
88 : std::coroutine_handle<promise_type> h_;
89 : };
90 :
91 : //----------------------------------------------------------
92 :
93 : /** Concrete implementation of strand_service.
94 :
95 : Holds the fixed pool of strand_impl objects.
96 : */
97 : class strand_service_impl : public strand_service
98 : {
99 : static constexpr std::size_t num_impls = 211;
100 :
101 : strand_impl impls_[num_impls];
102 : std::size_t salt_ = 0;
103 : std::mutex mutex_;
104 :
105 : public:
106 : explicit
107 19 : strand_service_impl(execution_context&)
108 4028 : {
109 19 : }
110 :
111 : strand_impl*
112 23 : get_implementation() override
113 : {
114 23 : std::lock_guard<std::mutex> lock(mutex_);
115 23 : std::size_t index = salt_++;
116 23 : index = index % num_impls;
117 23 : return &impls_[index];
118 23 : }
119 :
120 : protected:
121 : void
122 19 : shutdown() override
123 : {
124 4028 : for(std::size_t i = 0; i < num_impls; ++i)
125 : {
126 4009 : std::lock_guard<std::mutex> lock(impls_[i].mutex_);
127 4009 : impls_[i].locked_ = true;
128 :
129 4009 : if(impls_[i].cached_frame_)
130 : {
131 7 : ::operator delete(impls_[i].cached_frame_);
132 7 : impls_[i].cached_frame_ = nullptr;
133 : }
134 4009 : }
135 19 : }
136 :
137 : private:
138 : static bool
139 322 : enqueue(strand_impl& impl, std::coroutine_handle<> h)
140 : {
141 322 : std::lock_guard<std::mutex> lock(impl.mutex_);
142 322 : impl.pending_.push(h);
143 322 : if(!impl.locked_)
144 : {
145 9 : impl.locked_ = true;
146 9 : return true;
147 : }
148 313 : return false;
149 322 : }
150 :
151 : static void
152 17 : dispatch_pending(strand_impl& impl)
153 : {
154 17 : strand_queue::taken_batch batch;
155 : {
156 17 : std::lock_guard<std::mutex> lock(impl.mutex_);
157 17 : batch = impl.pending_.take_all();
158 17 : }
159 17 : impl.pending_.dispatch_batch(batch);
160 17 : }
161 :
162 : static bool
163 17 : try_unlock(strand_impl& impl)
164 : {
165 17 : std::lock_guard<std::mutex> lock(impl.mutex_);
166 17 : if(impl.pending_.empty())
167 : {
168 9 : impl.locked_ = false;
169 9 : return true;
170 : }
171 8 : return false;
172 17 : }
173 :
174 : static void
175 17 : set_dispatch_thread(strand_impl& impl) noexcept
176 : {
177 17 : impl.dispatch_thread_.store(std::this_thread::get_id());
178 17 : }
179 :
180 : static void
181 9 : clear_dispatch_thread(strand_impl& impl) noexcept
182 : {
183 9 : impl.dispatch_thread_.store(std::thread::id{});
184 9 : }
185 :
186 : // Loops until queue empty (aggressive). Alternative: per-batch fairness
187 : // (repost after each batch to let other work run) - explore if starvation observed.
188 : static strand_invoker
189 9 : make_invoker(strand_impl& impl)
190 : {
191 : strand_impl* p = &impl;
192 : for(;;)
193 : {
194 : set_dispatch_thread(*p);
195 : dispatch_pending(*p);
196 : if(try_unlock(*p))
197 : {
198 : clear_dispatch_thread(*p);
199 : co_return;
200 : }
201 : }
202 18 : }
203 :
204 : friend class strand_service;
205 : };
206 :
207 : //----------------------------------------------------------
208 :
209 19 : strand_service::
210 19 : strand_service()
211 19 : : service()
212 : {
213 19 : }
214 :
215 19 : strand_service::
216 : ~strand_service() = default;
217 :
218 : bool
219 2 : strand_service::
220 : running_in_this_thread(strand_impl& impl) noexcept
221 : {
222 2 : return impl.dispatch_thread_.load() == std::this_thread::get_id();
223 : }
224 :
225 : std::coroutine_handle<>
226 1 : strand_service::
227 : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
228 : {
229 1 : if(running_in_this_thread(impl))
230 0 : return h;
231 :
232 1 : if(strand_service_impl::enqueue(impl, h))
233 1 : ex.post(strand_service_impl::make_invoker(impl).h_);
234 1 : return std::noop_coroutine();
235 : }
236 :
237 : void
238 321 : strand_service::
239 : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
240 : {
241 321 : if(strand_service_impl::enqueue(impl, h))
242 8 : ex.post(strand_service_impl::make_invoker(impl).h_);
243 321 : }
244 :
245 : strand_service&
246 23 : get_strand_service(execution_context& ctx)
247 : {
248 23 : return ctx.use_service<strand_service_impl>();
249 : }
250 :
251 : } // namespace detail
252 : } // namespace capy
253 : } // namespace boost
|