libs/capy/include/boost/capy/io/write_now.hpp

93.9% Lines (62/66) 96.2% Functions (25/26) 73.3% Branches (11/15)
libs/capy/include/boost/capy/io/write_now.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
11 #define BOOST_CAPY_IO_WRITE_NOW_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/consuming_buffers.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <coroutine>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23
24 #include <cstddef>
25 #include <exception>
26 #include <new>
27 #include <stop_token>
28 #include <utility>
29
30 #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
31 # if defined(__GNUC__) && !defined(__clang__)
32 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
33 # else
34 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
35 # endif
36 #endif
37
38 namespace boost {
39 namespace capy {
40
41 /** Eagerly writes complete buffer sequences with frame caching.
42
43 This class wraps a @ref WriteStream and provides an `operator()`
44 that writes an entire buffer sequence, attempting to complete
45 synchronously. If every `write_some` completes without suspending,
46 the entire operation finishes in `await_ready` with no coroutine
47 suspension.
48
49 The class maintains a one-element coroutine frame cache. After
50 the first call, subsequent calls reuse the cached frame memory,
51 avoiding repeated allocation for the internal coroutine.
52
53 @tparam Stream The stream type, must satisfy @ref WriteStream.
54
55 @par Thread Safety
56 Distinct objects: Safe.
57 Shared objects: Unsafe.
58
59 @par Preconditions
60 Only one operation may be outstanding at a time. A new call to
61 `operator()` must not be made until the previous operation has
62 completed (i.e., the returned awaitable has been fully consumed).
63
64 @par Example
65
66 @code
67 template< WriteStream Stream >
68 task<> send_messages( Stream& stream )
69 {
70 write_now wn( stream );
71 auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
72 if( ec1 )
73 detail::throw_system_error( ec1 );
74 auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
75 if( ec2 )
76 detail::throw_system_error( ec2 );
77 }
78 @endcode
79
80 @see write, write_some, WriteStream, ConstBufferSequence
81 */
82 template<class Stream>
83 requires WriteStream<Stream>
84 class write_now
85 {
86 Stream& stream_;
87 void* cached_frame_ = nullptr;
88 std::size_t cached_size_ = 0;
89
90 struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
91 op_type
92 {
93 struct promise_type
94 {
95 io_result<std::size_t> result_;
96 std::exception_ptr ep_;
97 std::coroutine_handle<> cont_{nullptr};
98 io_env const* env_ = nullptr;
99 bool done_ = false;
100
101 68 op_type get_return_object()
102 {
103 return op_type{
104 std::coroutine_handle<
105 68 promise_type>::from_promise(*this)};
106 }
107
108 68 auto initial_suspend() noexcept
109 {
110 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
111 68 return std::suspend_always{};
112 #else
113 return std::suspend_never{};
114 #endif
115 }
116
117 68 auto final_suspend() noexcept
118 {
119 struct awaiter
120 {
121 promise_type* p_;
122
123 68 bool await_ready() const noexcept
124 {
125 68 return false;
126 }
127
128 68 std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
129 {
130 68 p_->done_ = true;
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 68 times.
68 if(!p_->cont_)
132 return std::noop_coroutine();
133 68 return p_->cont_;
134 }
135
136 void await_resume() const noexcept
137 {
138 }
139 };
140 68 return awaiter{this};
141 }
142
143 46 void return_value(
144 io_result<std::size_t> r) noexcept
145 {
146 46 result_ = r;
147 46 }
148
149 22 void unhandled_exception()
150 {
151 22 ep_ = std::current_exception();
152 22 }
153
154 std::suspend_always yield_value(int) noexcept
155 {
156 return {};
157 }
158
159 template<class A>
160 84 auto await_transform(A&& a)
161 {
162 using decayed = std::decay_t<A>;
163 if constexpr (IoAwaitable<decayed>)
164 {
165 struct wrapper
166 {
167 decayed inner_;
168 promise_type* p_;
169
170 bool await_ready()
171 {
172 return inner_.await_ready();
173 }
174
175 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
176 {
177 return detail::call_await_suspend(
178 &inner_, h,
179 p_->env_);
180 }
181
182 decltype(auto) await_resume()
183 {
184 return inner_.await_resume();
185 }
186 };
187 return wrapper{
188 84 std::forward<A>(a), this};
189 }
190 else
191 {
192 return std::forward<A>(a);
193 }
194 }
195
196 static void*
197 68 operator new(
198 std::size_t size,
199 write_now& self,
200 auto&)
201 {
202
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 64 times.
68 if(self.cached_frame_ &&
203
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 self.cached_size_ >= size)
204 4 return self.cached_frame_;
205 64 void* p = ::operator new(size);
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64 if(self.cached_frame_)
207 ::operator delete(self.cached_frame_);
208 64 self.cached_frame_ = p;
209 64 self.cached_size_ = size;
210 64 return p;
211 }
212
213 static void
214 68 operator delete(void*, std::size_t) noexcept
215 {
216 68 }
217 };
218
219 std::coroutine_handle<promise_type> h_;
220
221 136 ~op_type()
222 {
223
2/2
✓ Branch 1 taken 68 times.
✓ Branch 2 taken 68 times.
136 if(h_)
224 68 h_.destroy();
225 136 }
226
227 op_type(op_type const&) = delete;
228 op_type& operator=(op_type const&) = delete;
229
230 68 op_type(op_type&& other) noexcept
231 68 : h_(std::exchange(other.h_, nullptr))
232 {
233 68 }
234
235 op_type& operator=(op_type&&) = delete;
236
237 68 bool await_ready() const noexcept
238 {
239 68 return h_.promise().done_;
240 }
241
242 68 std::coroutine_handle<> await_suspend(
243 std::coroutine_handle<> cont,
244 io_env const* env)
245 {
246 68 auto& p = h_.promise();
247 68 p.cont_ = cont;
248 68 p.env_ = env;
249 68 return h_;
250 }
251
252 68 io_result<std::size_t> await_resume()
253 {
254 68 auto& p = h_.promise();
255
2/2
✓ Branch 1 taken 22 times.
✓ Branch 2 taken 46 times.
68 if(p.ep_)
256 22 std::rethrow_exception(p.ep_);
257 46 return p.result_;
258 }
259
260 private:
261 68 explicit op_type(
262 std::coroutine_handle<promise_type> h)
263 68 : h_(h)
264 {
265 68 }
266 };
267
268 public:
269 /** Destructor. Frees the cached coroutine frame. */
270 64 ~write_now()
271 {
272
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64 if(cached_frame_)
273 64 ::operator delete(cached_frame_);
274 64 }
275
276 /** Construct from a stream reference.
277
278 @param s The stream to write to. Must outlive this object.
279 */
280 explicit
281 64 write_now(Stream& s) noexcept
282 64 : stream_(s)
283 {
284 64 }
285
286 write_now(write_now const&) = delete;
287 write_now& operator=(write_now const&) = delete;
288
289 /** Eagerly write the entire buffer sequence.
290
291 Writes data to the stream by calling `write_some` repeatedly
292 until the entire buffer sequence is written or an error
293 occurs. The operation attempts to complete synchronously:
294 if every `write_some` completes without suspending, the
295 entire operation finishes in `await_ready`.
296
297 When the fast path cannot complete, the coroutine suspends
298 and continues asynchronously. The internal coroutine frame
299 is cached and reused across calls.
300
301 @param buffers The buffer sequence to write. Passed by
302 value to ensure the sequence lives in the coroutine
303 frame across suspension points.
304
305 @return An awaitable yielding `(error_code,std::size_t)`.
306 On success, `n` equals `buffer_size(buffers)`. On
307 error, `n` is the number of bytes written before the
308 error. Compare error codes to conditions:
309 @li `cond::canceled` - Operation was cancelled
310 @li `std::errc::broken_pipe` - Peer closed connection
311
312 @par Example
313
314 @code
315 write_now wn( stream );
316 auto [ec, n] = co_await wn( make_buffer( body ) );
317 if( ec )
318 detail::throw_system_error( ec );
319 @endcode
320
321 @see write, write_some, WriteStream
322 */
323 // GCC falsely warns that the coroutine promise's
324 // placement operator new(size_t, write_now&, auto&)
325 // mismatches operator delete(void*, size_t). Per the
326 // standard, coroutine deallocation lookup is separate.
327 #if defined(__GNUC__) && !defined(__clang__)
328 #pragma GCC diagnostic push
329 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
330 #endif
331
332 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
333 template<ConstBufferSequence Buffers>
334 op_type
335
1/1
✓ Branch 1 taken 68 times.
68 operator()(Buffers buffers)
336 {
337 std::size_t const total_size = buffer_size(buffers);
338 std::size_t total_written = 0;
339 consuming_buffers cb(buffers);
340 while(total_written < total_size)
341 {
342 auto r =
343 co_await stream_.write_some(cb);
344 if(r.ec)
345 co_return io_result<std::size_t>{
346 r.ec, total_written};
347 cb.consume(r.t1);
348 total_written += r.t1;
349 }
350 co_return io_result<std::size_t>{
351 {}, total_written};
352 136 }
353 #else
354 template<ConstBufferSequence Buffers>
355 op_type
356 operator()(Buffers buffers)
357 {
358 std::size_t const total_size = buffer_size(buffers);
359 std::size_t total_written = 0;
360
361 // GCC ICE in expand_expr_real_1 (expr.cc:11376)
362 // when consuming_buffers spans a co_yield, so
363 // the GCC path uses a separate simple coroutine.
364 consuming_buffers cb(buffers);
365 while(total_written < total_size)
366 {
367 auto inner = stream_.write_some(cb);
368 if(!inner.await_ready())
369 break;
370 auto r = inner.await_resume();
371 if(r.ec)
372 co_return io_result<std::size_t>{
373 r.ec, total_written};
374 cb.consume(r.t1);
375 total_written += r.t1;
376 }
377
378 if(total_written >= total_size)
379 co_return io_result<std::size_t>{
380 {}, total_written};
381
382 co_yield 0;
383
384 while(total_written < total_size)
385 {
386 auto r =
387 co_await stream_.write_some(cb);
388 if(r.ec)
389 co_return io_result<std::size_t>{
390 r.ec, total_written};
391 cb.consume(r.t1);
392 total_written += r.t1;
393 }
394 co_return io_result<std::size_t>{
395 {}, total_written};
396 }
397 #endif
398
399 #if defined(__GNUC__) && !defined(__clang__)
400 #pragma GCC diagnostic pop
401 #endif
402 };
403
404 template<WriteStream S>
405 write_now(S&) -> write_now<S>;
406
407 } // namespace capy
408 } // namespace boost
409
410 #endif
411