libs/capy/include/boost/capy/io/any_read_source.hpp

91.2% Lines (83/91) 92.0% Functions (23/25) 79.2% Branches (19/24)
libs/capy/include/boost/capy/io/any_read_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/read_source.hpp>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/io_task.hpp>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <new>
28 #include <span>
29 #include <stop_token>
30 #include <system_error>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any ReadSource.
37
38 This class provides type erasure for any type satisfying the
39 @ref ReadSource concept, enabling runtime polymorphism for
40 source read operations. It uses cached awaitable storage to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the source.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to source must outlive this wrapper.
48
49 @par Awaitable Preallocation
50 The constructor preallocates storage for the type-erased awaitable.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Immediate Completion
56 Operations complete immediately without suspending when the
57 buffer sequence is empty, or when the underlying source's
58 awaitable reports readiness via `await_ready`.
59
60 @par Thread Safety
61 Not thread-safe. Concurrent operations on the same wrapper
62 are undefined behavior.
63
64 @par Example
65 @code
66 // Owning - takes ownership of the source
67 any_read_source rs(some_source{args...});
68
69 // Reference - wraps without ownership
70 some_source source;
71 any_read_source rs(&source);
72
73 mutable_buffer buf(data, size);
74 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
75 @endcode
76
77 @see any_read_stream, ReadSource
78 */
79 class any_read_source
80 {
81 struct vtable;
82 struct awaitable_ops;
83
84 template<ReadSource S>
85 struct vtable_for_impl;
86
87 void* source_ = nullptr;
88 vtable const* vt_ = nullptr;
89 void* cached_awaitable_ = nullptr;
90 void* storage_ = nullptr;
91 awaitable_ops const* active_ops_ = nullptr;
92
93 public:
94 /** Destructor.
95
96 Destroys the owned source (if any) and releases the cached
97 awaitable storage.
98 */
99 ~any_read_source();
100
101 /** Default constructor.
102
103 Constructs an empty wrapper. Operations on a default-constructed
104 wrapper result in undefined behavior.
105 */
106 any_read_source() = default;
107
108 /** Non-copyable.
109
110 The awaitable cache is per-instance and cannot be shared.
111 */
112 any_read_source(any_read_source const&) = delete;
113 any_read_source& operator=(any_read_source const&) = delete;
114
115 /** Move constructor.
116
117 Transfers ownership of the wrapped source (if owned) and
118 cached awaitable storage from `other`. After the move, `other` is
119 in a default-constructed state.
120
121 @param other The wrapper to move from.
122 */
123 1 any_read_source(any_read_source&& other) noexcept
124 1 : source_(std::exchange(other.source_, nullptr))
125 1 , vt_(std::exchange(other.vt_, nullptr))
126 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 1 , storage_(std::exchange(other.storage_, nullptr))
128 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
129 {
130 1 }
131
132 /** Move assignment operator.
133
134 Destroys any owned source and releases existing resources,
135 then transfers ownership from `other`.
136
137 @param other The wrapper to move from.
138 @return Reference to this wrapper.
139 */
140 any_read_source&
141 operator=(any_read_source&& other) noexcept;
142
143 /** Construct by taking ownership of a ReadSource.
144
145 Allocates storage and moves the source into this wrapper.
146 The wrapper owns the source and will destroy it.
147
148 @param s The source to take ownership of.
149 */
150 template<ReadSource S>
151 requires (!std::same_as<std::decay_t<S>, any_read_source>)
152 any_read_source(S s);
153
154 /** Construct by wrapping a ReadSource without ownership.
155
156 Wraps the given source by pointer. The source must remain
157 valid for the lifetime of this wrapper.
158
159 @param s Pointer to the source to wrap.
160 */
161 template<ReadSource S>
162 any_read_source(S* s);
163
164 /** Check if the wrapper contains a valid source.
165
166 @return `true` if wrapping a source, `false` if default-constructed
167 or moved-from.
168 */
169 bool
170 27 has_value() const noexcept
171 {
172 27 return source_ != nullptr;
173 }
174
175 /** Check if the wrapper contains a valid source.
176
177 @return `true` if wrapping a source, `false` if default-constructed
178 or moved-from.
179 */
180 explicit
181 8 operator bool() const noexcept
182 {
183 8 return has_value();
184 }
185
186 /** Initiate a partial read operation.
187
188 Reads one or more bytes into the provided buffer sequence.
189 May fill less than the full sequence.
190
191 @param buffers The buffer sequence to read into.
192
193 @return An awaitable yielding `(error_code,std::size_t)`.
194
195 @par Immediate Completion
196 The operation completes immediately without suspending
197 the calling coroutine when:
198 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
199 @li The underlying source's awaitable reports immediate
200 readiness via `await_ready`.
201
202 @note This is a partial operation and may not process the
203 entire buffer sequence. Use @ref read for guaranteed
204 complete transfer.
205
206 @par Preconditions
207 The wrapper must contain a valid source (`has_value() == true`).
208 The caller must not call this function again after a prior
209 call returned an error (including EOF).
210 */
211 template<MutableBufferSequence MB>
212 auto
213 read_some(MB buffers);
214
215 /** Initiate a complete read operation.
216
217 Reads data into the provided buffer sequence by forwarding
218 to the underlying source's `read` operation. Large buffer
219 sequences are processed in windows, with each window
220 forwarded as a separate `read` call to the underlying source.
221 The operation completes when the entire buffer sequence is
222 filled, end-of-file is reached, or an error occurs.
223
224 @param buffers The buffer sequence to read into.
225
226 @return An awaitable yielding `(error_code,std::size_t)`.
227
228 @par Immediate Completion
229 The operation completes immediately without suspending
230 the calling coroutine when:
231 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
232 @li The underlying source's `read` awaitable reports
233 immediate readiness via `await_ready`.
234
235 @par Postconditions
236 Exactly one of the following is true on return:
237 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
238 The entire buffer was filled.
239 @li **End-of-stream or Error**: `ec` and `n` indicates
240 the number of bytes transferred before the failure.
241
242 @par Preconditions
243 The wrapper must contain a valid source (`has_value() == true`).
244 The caller must not call this function again after a prior
245 call returned an error (including EOF).
246 */
247 template<MutableBufferSequence MB>
248 io_task<std::size_t>
249 read(MB buffers);
250
251 protected:
252 /** Rebind to a new source after move.
253
254 Updates the internal pointer to reference a new source object.
255 Used by owning wrappers after move assignment when the owned
256 object has moved to a new location.
257
258 @param new_source The new source to bind to. Must be the same
259 type as the original source.
260
261 @note Terminates if called with a source of different type
262 than the original.
263 */
264 template<ReadSource S>
265 void
266 rebind(S& new_source) noexcept
267 {
268 if(vt_ != &vtable_for_impl<S>::value)
269 std::terminate();
270 source_ = &new_source;
271 }
272
273 private:
274 auto
275 read_(std::span<mutable_buffer const> buffers);
276 };
277
278 //----------------------------------------------------------
279
280 // ordered by call sequence for cache line coherence
281 struct any_read_source::awaitable_ops
282 {
283 bool (*await_ready)(void*);
284 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285 io_result<std::size_t> (*await_resume)(void*);
286 void (*destroy)(void*) noexcept;
287 };
288
289 // ordered by call frequency for cache line coherence
290 struct any_read_source::vtable
291 {
292 awaitable_ops const* (*construct_read_some_awaitable)(
293 void* source,
294 void* storage,
295 std::span<mutable_buffer const> buffers);
296 awaitable_ops const* (*construct_read_awaitable)(
297 void* source,
298 void* storage,
299 std::span<mutable_buffer const> buffers);
300 std::size_t awaitable_size;
301 std::size_t awaitable_align;
302 void (*destroy)(void*) noexcept;
303 };
304
305 template<ReadSource S>
306 struct any_read_source::vtable_for_impl
307 {
308 using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309 std::span<mutable_buffer const>{}));
310 using ReadAwaitable = decltype(std::declval<S&>().read(
311 std::span<mutable_buffer const>{}));
312
313 static void
314 6 do_destroy_impl(void* source) noexcept
315 {
316 6 static_cast<S*>(source)->~S();
317 6 }
318
319 static awaitable_ops const*
320 52 construct_read_some_awaitable_impl(
321 void* source,
322 void* storage,
323 std::span<mutable_buffer const> buffers)
324 {
325 52 auto& s = *static_cast<S*>(source);
326 52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327
328 static constexpr awaitable_ops ops = {
329 +[](void* p) {
330 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331 },
332 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
333 return detail::call_await_suspend(
334 static_cast<ReadSomeAwaitable*>(p), h, env);
335 },
336 +[](void* p) {
337 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338 },
339 +[](void* p) noexcept {
340 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341 }
342 };
343 52 return &ops;
344 }
345
346 static awaitable_ops const*
347 116 construct_read_awaitable_impl(
348 void* source,
349 void* storage,
350 std::span<mutable_buffer const> buffers)
351 {
352 116 auto& s = *static_cast<S*>(source);
353 116 ::new(storage) ReadAwaitable(s.read(buffers));
354
355 static constexpr awaitable_ops ops = {
356 +[](void* p) {
357 return static_cast<ReadAwaitable*>(p)->await_ready();
358 },
359 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
360 return detail::call_await_suspend(
361 static_cast<ReadAwaitable*>(p), h, env);
362 },
363 +[](void* p) {
364 return static_cast<ReadAwaitable*>(p)->await_resume();
365 },
366 +[](void* p) noexcept {
367 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368 }
369 };
370 116 return &ops;
371 }
372
373 static constexpr std::size_t max_awaitable_size =
374 sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375 ? sizeof(ReadSomeAwaitable)
376 : sizeof(ReadAwaitable);
377 static constexpr std::size_t max_awaitable_align =
378 alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379 ? alignof(ReadSomeAwaitable)
380 : alignof(ReadAwaitable);
381
382 static constexpr vtable value = {
383 &construct_read_some_awaitable_impl,
384 &construct_read_awaitable_impl,
385 max_awaitable_size,
386 max_awaitable_align,
387 &do_destroy_impl
388 };
389 };
390
391 //----------------------------------------------------------
392
393 inline
394 145 any_read_source::~any_read_source()
395 {
396
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 139 times.
145 if(storage_)
397 {
398 6 vt_->destroy(source_);
399 6 ::operator delete(storage_);
400 }
401
2/2
✓ Branch 0 taken 139 times.
✓ Branch 1 taken 6 times.
145 if(cached_awaitable_)
402 {
403
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 138 times.
139 if(active_ops_)
404 1 active_ops_->destroy(cached_awaitable_);
405 139 ::operator delete(cached_awaitable_);
406 }
407 145 }
408
409 inline any_read_source&
410 4 any_read_source::operator=(any_read_source&& other) noexcept
411 {
412
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 if(this != &other)
413 {
414
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
415 {
416 vt_->destroy(source_);
417 ::operator delete(storage_);
418 }
419
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 if(cached_awaitable_)
420 {
421
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if(active_ops_)
422 1 active_ops_->destroy(cached_awaitable_);
423 2 ::operator delete(cached_awaitable_);
424 }
425 3 source_ = std::exchange(other.source_, nullptr);
426 3 vt_ = std::exchange(other.vt_, nullptr);
427 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
428 3 storage_ = std::exchange(other.storage_, nullptr);
429 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
430 }
431 4 return *this;
432 }
433
434 template<ReadSource S>
435 requires (!std::same_as<std::decay_t<S>, any_read_source>)
436 6 any_read_source::any_read_source(S s)
437 6 : vt_(&vtable_for_impl<S>::value)
438 {
439 struct guard {
440 any_read_source* self;
441 bool committed = false;
442 6 ~guard() {
443
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6 if(!committed && self->storage_) {
444 self->vt_->destroy(self->source_);
445 ::operator delete(self->storage_);
446 self->storage_ = nullptr;
447 self->source_ = nullptr;
448 }
449 6 }
450 6 } g{this};
451
452
1/1
✓ Branch 1 taken 6 times.
6 storage_ = ::operator new(sizeof(S));
453 6 source_ = ::new(storage_) S(std::move(s));
454
455 // Preallocate the awaitable storage
456
1/1
✓ Branch 1 taken 6 times.
6 cached_awaitable_ = ::operator new(vt_->awaitable_size);
457
458 6 g.committed = true;
459 6 }
460
461 template<ReadSource S>
462 135 any_read_source::any_read_source(S* s)
463 135 : source_(s)
464 135 , vt_(&vtable_for_impl<S>::value)
465 {
466 // Preallocate the awaitable storage
467 135 cached_awaitable_ = ::operator new(vt_->awaitable_size);
468 135 }
469
470 //----------------------------------------------------------
471
472 template<MutableBufferSequence MB>
473 auto
474 54 any_read_source::read_some(MB buffers)
475 {
476 struct awaitable
477 {
478 any_read_source* self_;
479 mutable_buffer_array<detail::max_iovec_> ba_;
480
481 awaitable(any_read_source* self, MB const& buffers)
482 : self_(self)
483 , ba_(buffers)
484 {
485 }
486
487 bool
488 await_ready() const noexcept
489 {
490 return ba_.to_span().empty();
491 }
492
493 std::coroutine_handle<>
494 await_suspend(std::coroutine_handle<> h, io_env const* env)
495 {
496 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
497 self_->source_,
498 self_->cached_awaitable_,
499 ba_.to_span());
500
501 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
502 return h;
503
504 return self_->active_ops_->await_suspend(
505 self_->cached_awaitable_, h, env);
506 }
507
508 io_result<std::size_t>
509 await_resume()
510 {
511 if(ba_.to_span().empty())
512 return {{}, 0};
513
514 struct guard {
515 any_read_source* self;
516 ~guard() {
517 self->active_ops_->destroy(self->cached_awaitable_);
518 self->active_ops_ = nullptr;
519 }
520 } g{self_};
521 return self_->active_ops_->await_resume(
522 self_->cached_awaitable_);
523 }
524 };
525 54 return awaitable(this, buffers);
526 }
527
528 inline auto
529 116 any_read_source::read_(std::span<mutable_buffer const> buffers)
530 {
531 struct awaitable
532 {
533 any_read_source* self_;
534 std::span<mutable_buffer const> buffers_;
535
536 bool
537 116 await_ready() const noexcept
538 {
539 116 return false;
540 }
541
542 std::coroutine_handle<>
543 116 await_suspend(std::coroutine_handle<> h, io_env const* env)
544 {
545 232 self_->active_ops_ = self_->vt_->construct_read_awaitable(
546 116 self_->source_,
547 116 self_->cached_awaitable_,
548 buffers_);
549
550
1/2
✓ Branch 1 taken 116 times.
✗ Branch 2 not taken.
116 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
551 116 return h;
552
553 return self_->active_ops_->await_suspend(
554 self_->cached_awaitable_, h, env);
555 }
556
557 io_result<std::size_t>
558 116 await_resume()
559 {
560 struct guard {
561 any_read_source* self;
562 116 ~guard() {
563 116 self->active_ops_->destroy(self->cached_awaitable_);
564 116 self->active_ops_ = nullptr;
565 116 }
566 116 } g{self_};
567 116 return self_->active_ops_->await_resume(
568
1/1
✓ Branch 1 taken 84 times.
200 self_->cached_awaitable_);
569 116 }
570 };
571 116 return awaitable{this, buffers};
572 }
573
574 template<MutableBufferSequence MB>
575 io_task<std::size_t>
576
1/1
✓ Branch 1 taken 110 times.
110 any_read_source::read(MB buffers)
577 {
578 buffer_param bp(buffers);
579 std::size_t total = 0;
580
581 for(;;)
582 {
583 auto bufs = bp.data();
584 if(bufs.empty())
585 break;
586
587 auto [ec, n] = co_await read_(bufs);
588 total += n;
589 if(ec)
590 co_return {ec, total};
591 bp.consume(n);
592 }
593
594 co_return {{}, total};
595 220 }
596
597 } // namespace capy
598 } // namespace boost
599
600 #endif
601