Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any BufferSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref BufferSink concept, enabling runtime polymorphism for
42 : buffer sink operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper exposes two interfaces for producing data:
46 : the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 : and the @ref WriteSink interface (`write_some`, `write`,
48 : `write_eof`). Choose the interface that matches how your data
49 : is produced:
50 :
51 : @par Choosing an Interface
52 :
53 : Use the **BufferSink** interface when you are a generator that
54 : produces data into externally-provided buffers. The sink owns
55 : the memory; you call @ref prepare to obtain writable buffers,
56 : fill them, then call @ref commit or @ref commit_eof.
57 :
58 : Use the **WriteSink** interface when you already have buffers
59 : containing the data to write:
60 : - If the entire body is available up front, call
61 : @ref write_eof(buffers) to send everything atomically.
62 : - If data arrives incrementally, call @ref write or
63 : @ref write_some in a loop, then @ref write_eof() when done.
64 : Prefer `write` (complete) unless your streaming pattern
65 : benefits from partial writes via `write_some`.
66 :
67 : If the wrapped type only satisfies @ref BufferSink, the
68 : @ref WriteSink operations are provided automatically.
69 :
70 : @par Construction Modes
71 :
72 : - **Owning**: Pass by value to transfer ownership. The wrapper
73 : allocates storage and owns the sink.
74 : - **Reference**: Pass a pointer to wrap without ownership. The
75 : pointed-to sink must outlive this wrapper.
76 :
77 : @par Awaitable Preallocation
78 : The constructor preallocates storage for the type-erased awaitable.
79 : This reserves all virtual address space at server startup
80 : so memory usage can be measured up front, rather than
81 : allocating piecemeal as traffic arrives.
82 :
83 : @par Thread Safety
84 : Not thread-safe. Concurrent operations on the same wrapper
85 : are undefined behavior.
86 :
87 : @par Example
88 : @code
89 : // Owning - takes ownership of the sink
90 : any_buffer_sink abs(some_buffer_sink{args...});
91 :
92 : // Reference - wraps without ownership
93 : some_buffer_sink sink;
94 : any_buffer_sink abs(&sink);
95 :
96 : // BufferSink interface: generate into callee-owned buffers
97 : mutable_buffer arr[16];
98 : auto bufs = abs.prepare(arr);
99 : // Write data into bufs[0..bufs.size())
100 : auto [ec] = co_await abs.commit(bytes_written);
101 : auto [ec2] = co_await abs.commit_eof(0);
102 :
103 : // WriteSink interface: send caller-owned buffers
104 : auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 : auto [ec4] = co_await abs.write_eof();
106 :
107 : // Or send everything at once
108 : auto [ec5, n2] = co_await abs.write_eof(
109 : make_buffer(body_data));
110 : @endcode
111 :
112 : @see any_buffer_source, BufferSink, WriteSink
113 : */
114 : class any_buffer_sink
115 : {
116 : struct vtable;
117 : struct awaitable_ops;
118 : struct write_awaitable_ops;
119 :
120 : template<BufferSink S>
121 : struct vtable_for_impl;
122 :
123 : // hot-path members first for cache locality
124 : void* sink_ = nullptr;
125 : vtable const* vt_ = nullptr;
126 : void* cached_awaitable_ = nullptr;
127 : awaitable_ops const* active_ops_ = nullptr;
128 : write_awaitable_ops const* active_write_ops_ = nullptr;
129 : void* storage_ = nullptr;
130 :
131 : public:
132 : /** Destructor.
133 :
134 : Destroys the owned sink (if any) and releases the cached
135 : awaitable storage.
136 : */
137 : ~any_buffer_sink();
138 :
139 : /** Default constructor.
140 :
141 : Constructs an empty wrapper. Operations on a default-constructed
142 : wrapper result in undefined behavior.
143 : */
144 : any_buffer_sink() = default;
145 :
146 : /** Non-copyable.
147 :
148 : The awaitable cache is per-instance and cannot be shared.
149 : */
150 : any_buffer_sink(any_buffer_sink const&) = delete;
151 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152 :
153 : /** Move constructor.
154 :
155 : Transfers ownership of the wrapped sink (if owned) and
156 : cached awaitable storage from `other`. After the move, `other` is
157 : in a default-constructed state.
158 :
159 : @param other The wrapper to move from.
160 : */
161 2 : any_buffer_sink(any_buffer_sink&& other) noexcept
162 2 : : sink_(std::exchange(other.sink_, nullptr))
163 2 : , vt_(std::exchange(other.vt_, nullptr))
164 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2 : , storage_(std::exchange(other.storage_, nullptr))
168 : {
169 2 : }
170 :
171 : /** Move assignment operator.
172 :
173 : Destroys any owned sink and releases existing resources,
174 : then transfers ownership from `other`.
175 :
176 : @param other The wrapper to move from.
177 : @return Reference to this wrapper.
178 : */
179 : any_buffer_sink&
180 : operator=(any_buffer_sink&& other) noexcept;
181 :
182 : /** Construct by taking ownership of a BufferSink.
183 :
184 : Allocates storage and moves the sink into this wrapper.
185 : The wrapper owns the sink and will destroy it. If `S` also
186 : satisfies @ref WriteSink, native write operations are
187 : forwarded through the virtual boundary.
188 :
189 : @param s The sink to take ownership of.
190 : */
191 : template<BufferSink S>
192 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 : any_buffer_sink(S s);
194 :
195 : /** Construct by wrapping a BufferSink without ownership.
196 :
197 : Wraps the given sink by pointer. The sink must remain
198 : valid for the lifetime of this wrapper. If `S` also
199 : satisfies @ref WriteSink, native write operations are
200 : forwarded through the virtual boundary.
201 :
202 : @param s Pointer to the sink to wrap.
203 : */
204 : template<BufferSink S>
205 : any_buffer_sink(S* s);
206 :
207 : /** Check if the wrapper contains a valid sink.
208 :
209 : @return `true` if wrapping a sink, `false` if default-constructed
210 : or moved-from.
211 : */
212 : bool
213 26 : has_value() const noexcept
214 : {
215 26 : return sink_ != nullptr;
216 : }
217 :
218 : /** Check if the wrapper contains a valid sink.
219 :
220 : @return `true` if wrapping a sink, `false` if default-constructed
221 : or moved-from.
222 : */
223 : explicit
224 3 : operator bool() const noexcept
225 : {
226 3 : return has_value();
227 : }
228 :
229 : /** Prepare writable buffers.
230 :
231 : Fills the provided span with mutable buffer descriptors
232 : pointing to the underlying sink's internal storage. This
233 : operation is synchronous.
234 :
235 : @param dest Span of mutable_buffer to fill.
236 :
237 : @return A span of filled buffers.
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid sink (`has_value() == true`).
241 : */
242 : std::span<mutable_buffer>
243 : prepare(std::span<mutable_buffer> dest);
244 :
245 : /** Commit bytes written to the prepared buffers.
246 :
247 : Commits `n` bytes written to the buffers returned by the
248 : most recent call to @ref prepare. The operation may trigger
249 : underlying I/O.
250 :
251 : @param n The number of bytes to commit.
252 :
253 : @return An awaitable yielding `(error_code)`.
254 :
255 : @par Preconditions
256 : The wrapper must contain a valid sink (`has_value() == true`).
257 : */
258 : auto
259 : commit(std::size_t n);
260 :
261 : /** Commit final bytes and signal end-of-stream.
262 :
263 : Commits `n` bytes written to the buffers returned by the
264 : most recent call to @ref prepare and finalizes the sink.
265 : After success, no further operations are permitted.
266 :
267 : @param n The number of bytes to commit.
268 :
269 : @return An awaitable yielding `(error_code)`.
270 :
271 : @par Preconditions
272 : The wrapper must contain a valid sink (`has_value() == true`).
273 : */
274 : auto
275 : commit_eof(std::size_t n);
276 :
277 : /** Write some data from a buffer sequence.
278 :
279 : Writes one or more bytes from the buffer sequence to the
280 : underlying sink. May consume less than the full sequence.
281 :
282 : When the wrapped type provides native @ref WriteSink support,
283 : the operation forwards directly. Otherwise it is synthesized
284 : from @ref prepare and @ref commit with a buffer copy.
285 :
286 : @param buffers The buffer sequence to write.
287 :
288 : @return An awaitable yielding `(error_code,std::size_t)`.
289 :
290 : @par Preconditions
291 : The wrapper must contain a valid sink (`has_value() == true`).
292 : */
293 : template<ConstBufferSequence CB>
294 : io_task<std::size_t>
295 : write_some(CB buffers);
296 :
297 : /** Write all data from a buffer sequence.
298 :
299 : Writes all data from the buffer sequence to the underlying
300 : sink. This method satisfies the @ref WriteSink concept.
301 :
302 : When the wrapped type provides native @ref WriteSink support,
303 : each window is forwarded directly. Otherwise the data is
304 : copied into the sink via @ref prepare and @ref commit.
305 :
306 : @param buffers The buffer sequence to write.
307 :
308 : @return An awaitable yielding `(error_code,std::size_t)`.
309 :
310 : @par Preconditions
311 : The wrapper must contain a valid sink (`has_value() == true`).
312 : */
313 : template<ConstBufferSequence CB>
314 : io_task<std::size_t>
315 : write(CB buffers);
316 :
317 : /** Atomically write data and signal end-of-stream.
318 :
319 : Writes all data from the buffer sequence to the underlying
320 : sink and then signals end-of-stream.
321 :
322 : When the wrapped type provides native @ref WriteSink support,
323 : the final window is sent atomically via the underlying
324 : `write_eof(buffers)`. Otherwise the data is synthesized
325 : through @ref prepare, @ref commit, and @ref commit_eof.
326 :
327 : @param buffers The buffer sequence to write.
328 :
329 : @return An awaitable yielding `(error_code,std::size_t)`.
330 :
331 : @par Preconditions
332 : The wrapper must contain a valid sink (`has_value() == true`).
333 : */
334 : template<ConstBufferSequence CB>
335 : io_task<std::size_t>
336 : write_eof(CB buffers);
337 :
338 : /** Signal end-of-stream.
339 :
340 : Indicates that no more data will be written to the sink.
341 : This method satisfies the @ref WriteSink concept.
342 :
343 : When the wrapped type provides native @ref WriteSink support,
344 : the underlying `write_eof()` is called. Otherwise the
345 : operation is implemented as `commit_eof(0)`.
346 :
347 : @return An awaitable yielding `(error_code)`.
348 :
349 : @par Preconditions
350 : The wrapper must contain a valid sink (`has_value() == true`).
351 : */
352 : auto
353 : write_eof();
354 :
355 : protected:
356 : /** Rebind to a new sink after move.
357 :
358 : Updates the internal pointer to reference a new sink object.
359 : Used by owning wrappers after move assignment when the owned
360 : object has moved to a new location.
361 :
362 : @param new_sink The new sink to bind to. Must be the same
363 : type as the original sink.
364 :
365 : @note Terminates if called with a sink of different type
366 : than the original.
367 : */
368 : template<BufferSink S>
369 : void
370 : rebind(S& new_sink) noexcept
371 : {
372 : if(vt_ != &vtable_for_impl<S>::value)
373 : std::terminate();
374 : sink_ = &new_sink;
375 : }
376 :
377 : private:
378 : /** Forward a partial write through the vtable.
379 :
380 : Constructs the underlying `write_some` awaitable in
381 : cached storage and returns a type-erased awaitable.
382 : */
383 : auto
384 : write_some_(std::span<const_buffer const> buffers);
385 :
386 : /** Forward a complete write through the vtable.
387 :
388 : Constructs the underlying `write` awaitable in
389 : cached storage and returns a type-erased awaitable.
390 : */
391 : auto
392 : write_(std::span<const_buffer const> buffers);
393 :
394 : /** Forward an atomic write-with-EOF through the vtable.
395 :
396 : Constructs the underlying `write_eof(buffers)` awaitable
397 : in cached storage and returns a type-erased awaitable.
398 : */
399 : auto
400 : write_eof_buffers_(std::span<const_buffer const> buffers);
401 : };
402 :
403 : //----------------------------------------------------------
404 :
405 : /** Type-erased ops for awaitables yielding `io_result<>`. */
406 : struct any_buffer_sink::awaitable_ops
407 : {
408 : bool (*await_ready)(void*);
409 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
410 : io_result<> (*await_resume)(void*);
411 : void (*destroy)(void*) noexcept;
412 : };
413 :
414 : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
415 : struct any_buffer_sink::write_awaitable_ops
416 : {
417 : bool (*await_ready)(void*);
418 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
419 : io_result<std::size_t> (*await_resume)(void*);
420 : void (*destroy)(void*) noexcept;
421 : };
422 :
423 : struct any_buffer_sink::vtable
424 : {
425 : void (*destroy)(void*) noexcept;
426 : std::span<mutable_buffer> (*do_prepare)(
427 : void* sink,
428 : std::span<mutable_buffer> dest);
429 : std::size_t awaitable_size;
430 : std::size_t awaitable_align;
431 : awaitable_ops const* (*construct_commit_awaitable)(
432 : void* sink,
433 : void* storage,
434 : std::size_t n);
435 : awaitable_ops const* (*construct_commit_eof_awaitable)(
436 : void* sink,
437 : void* storage,
438 : std::size_t n);
439 :
440 : // WriteSink forwarding (null when wrapped type is BufferSink-only)
441 : write_awaitable_ops const* (*construct_write_some_awaitable)(
442 : void* sink,
443 : void* storage,
444 : std::span<const_buffer const> buffers);
445 : write_awaitable_ops const* (*construct_write_awaitable)(
446 : void* sink,
447 : void* storage,
448 : std::span<const_buffer const> buffers);
449 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
450 : void* sink,
451 : void* storage,
452 : std::span<const_buffer const> buffers);
453 : awaitable_ops const* (*construct_write_eof_awaitable)(
454 : void* sink,
455 : void* storage);
456 : };
457 :
458 : template<BufferSink S>
459 : struct any_buffer_sink::vtable_for_impl
460 : {
461 : using CommitAwaitable = decltype(std::declval<S&>().commit(
462 : std::size_t{}));
463 : using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
464 : std::size_t{}));
465 :
466 : static void
467 18 : do_destroy_impl(void* sink) noexcept
468 : {
469 18 : static_cast<S*>(sink)->~S();
470 18 : }
471 :
472 : static std::span<mutable_buffer>
473 126 : do_prepare_impl(
474 : void* sink,
475 : std::span<mutable_buffer> dest)
476 : {
477 126 : auto& s = *static_cast<S*>(sink);
478 126 : return s.prepare(dest);
479 : }
480 :
481 : static awaitable_ops const*
482 96 : construct_commit_awaitable_impl(
483 : void* sink,
484 : void* storage,
485 : std::size_t n)
486 : {
487 96 : auto& s = *static_cast<S*>(sink);
488 96 : ::new(storage) CommitAwaitable(s.commit(n));
489 :
490 : static constexpr awaitable_ops ops = {
491 96 : +[](void* p) {
492 96 : return static_cast<CommitAwaitable*>(p)->await_ready();
493 : },
494 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
495 0 : return detail::call_await_suspend(
496 0 : static_cast<CommitAwaitable*>(p), h, env);
497 : },
498 96 : +[](void* p) {
499 96 : return static_cast<CommitAwaitable*>(p)->await_resume();
500 : },
501 96 : +[](void* p) noexcept {
502 96 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
503 : }
504 : };
505 96 : return &ops;
506 : }
507 :
508 : static awaitable_ops const*
509 70 : construct_commit_eof_awaitable_impl(
510 : void* sink,
511 : void* storage,
512 : std::size_t n)
513 : {
514 70 : auto& s = *static_cast<S*>(sink);
515 70 : ::new(storage) CommitEofAwaitable(s.commit_eof(n));
516 :
517 : static constexpr awaitable_ops ops = {
518 70 : +[](void* p) {
519 70 : return static_cast<CommitEofAwaitable*>(p)->await_ready();
520 : },
521 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
522 0 : return detail::call_await_suspend(
523 0 : static_cast<CommitEofAwaitable*>(p), h, env);
524 : },
525 70 : +[](void* p) {
526 70 : return static_cast<CommitEofAwaitable*>(p)->await_resume();
527 : },
528 70 : +[](void* p) noexcept {
529 70 : static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
530 : }
531 : };
532 70 : return &ops;
533 : }
534 :
535 : //------------------------------------------------------
536 : // WriteSink forwarding (only instantiated when WriteSink<S>)
537 :
538 : static write_awaitable_ops const*
539 6 : construct_write_some_awaitable_impl(
540 : void* sink,
541 : void* storage,
542 : std::span<const_buffer const> buffers)
543 : requires WriteSink<S>
544 : {
545 : using Aw = decltype(std::declval<S&>().write_some(
546 : std::span<const_buffer const>{}));
547 6 : auto& s = *static_cast<S*>(sink);
548 6 : ::new(storage) Aw(s.write_some(buffers));
549 :
550 : static constexpr write_awaitable_ops ops = {
551 6 : +[](void* p) {
552 6 : return static_cast<Aw*>(p)->await_ready();
553 : },
554 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
555 0 : return detail::call_await_suspend(
556 0 : static_cast<Aw*>(p), h, env);
557 : },
558 6 : +[](void* p) {
559 6 : return static_cast<Aw*>(p)->await_resume();
560 : },
561 6 : +[](void* p) noexcept {
562 6 : static_cast<Aw*>(p)->~Aw();
563 : }
564 : };
565 6 : return &ops;
566 : }
567 :
568 : static write_awaitable_ops const*
569 14 : construct_write_awaitable_impl(
570 : void* sink,
571 : void* storage,
572 : std::span<const_buffer const> buffers)
573 : requires WriteSink<S>
574 : {
575 : using Aw = decltype(std::declval<S&>().write(
576 : std::span<const_buffer const>{}));
577 14 : auto& s = *static_cast<S*>(sink);
578 14 : ::new(storage) Aw(s.write(buffers));
579 :
580 : static constexpr write_awaitable_ops ops = {
581 14 : +[](void* p) {
582 14 : return static_cast<Aw*>(p)->await_ready();
583 : },
584 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
585 0 : return detail::call_await_suspend(
586 0 : static_cast<Aw*>(p), h, env);
587 : },
588 14 : +[](void* p) {
589 14 : return static_cast<Aw*>(p)->await_resume();
590 : },
591 14 : +[](void* p) noexcept {
592 14 : static_cast<Aw*>(p)->~Aw();
593 : }
594 : };
595 14 : return &ops;
596 : }
597 :
598 : static write_awaitable_ops const*
599 12 : construct_write_eof_buffers_awaitable_impl(
600 : void* sink,
601 : void* storage,
602 : std::span<const_buffer const> buffers)
603 : requires WriteSink<S>
604 : {
605 : using Aw = decltype(std::declval<S&>().write_eof(
606 : std::span<const_buffer const>{}));
607 12 : auto& s = *static_cast<S*>(sink);
608 12 : ::new(storage) Aw(s.write_eof(buffers));
609 :
610 : static constexpr write_awaitable_ops ops = {
611 12 : +[](void* p) {
612 12 : return static_cast<Aw*>(p)->await_ready();
613 : },
614 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
615 0 : return detail::call_await_suspend(
616 0 : static_cast<Aw*>(p), h, env);
617 : },
618 12 : +[](void* p) {
619 12 : return static_cast<Aw*>(p)->await_resume();
620 : },
621 12 : +[](void* p) noexcept {
622 12 : static_cast<Aw*>(p)->~Aw();
623 : }
624 : };
625 12 : return &ops;
626 : }
627 :
628 : static awaitable_ops const*
629 16 : construct_write_eof_awaitable_impl(
630 : void* sink,
631 : void* storage)
632 : requires WriteSink<S>
633 : {
634 : using Aw = decltype(std::declval<S&>().write_eof());
635 16 : auto& s = *static_cast<S*>(sink);
636 16 : ::new(storage) Aw(s.write_eof());
637 :
638 : static constexpr awaitable_ops ops = {
639 16 : +[](void* p) {
640 16 : return static_cast<Aw*>(p)->await_ready();
641 : },
642 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
643 0 : return detail::call_await_suspend(
644 0 : static_cast<Aw*>(p), h, env);
645 : },
646 16 : +[](void* p) {
647 16 : return static_cast<Aw*>(p)->await_resume();
648 : },
649 16 : +[](void* p) noexcept {
650 16 : static_cast<Aw*>(p)->~Aw();
651 : }
652 : };
653 16 : return &ops;
654 : }
655 :
656 : //------------------------------------------------------
657 :
658 : static consteval std::size_t
659 : compute_max_size() noexcept
660 : {
661 : std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
662 : ? sizeof(CommitAwaitable)
663 : : sizeof(CommitEofAwaitable);
664 : if constexpr (WriteSink<S>)
665 : {
666 : using WS = decltype(std::declval<S&>().write_some(
667 : std::span<const_buffer const>{}));
668 : using W = decltype(std::declval<S&>().write(
669 : std::span<const_buffer const>{}));
670 : using WEB = decltype(std::declval<S&>().write_eof(
671 : std::span<const_buffer const>{}));
672 : using WE = decltype(std::declval<S&>().write_eof());
673 :
674 : if(sizeof(WS) > s) s = sizeof(WS);
675 : if(sizeof(W) > s) s = sizeof(W);
676 : if(sizeof(WEB) > s) s = sizeof(WEB);
677 : if(sizeof(WE) > s) s = sizeof(WE);
678 : }
679 : return s;
680 : }
681 :
682 : static consteval std::size_t
683 : compute_max_align() noexcept
684 : {
685 : std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
686 : ? alignof(CommitAwaitable)
687 : : alignof(CommitEofAwaitable);
688 : if constexpr (WriteSink<S>)
689 : {
690 : using WS = decltype(std::declval<S&>().write_some(
691 : std::span<const_buffer const>{}));
692 : using W = decltype(std::declval<S&>().write(
693 : std::span<const_buffer const>{}));
694 : using WEB = decltype(std::declval<S&>().write_eof(
695 : std::span<const_buffer const>{}));
696 : using WE = decltype(std::declval<S&>().write_eof());
697 :
698 : if(alignof(WS) > a) a = alignof(WS);
699 : if(alignof(W) > a) a = alignof(W);
700 : if(alignof(WEB) > a) a = alignof(WEB);
701 : if(alignof(WE) > a) a = alignof(WE);
702 : }
703 : return a;
704 : }
705 :
706 : static consteval vtable
707 : make_vtable() noexcept
708 : {
709 : vtable v{};
710 : v.destroy = &do_destroy_impl;
711 : v.do_prepare = &do_prepare_impl;
712 : v.awaitable_size = compute_max_size();
713 : v.awaitable_align = compute_max_align();
714 : v.construct_commit_awaitable = &construct_commit_awaitable_impl;
715 : v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
716 : v.construct_write_some_awaitable = nullptr;
717 : v.construct_write_awaitable = nullptr;
718 : v.construct_write_eof_buffers_awaitable = nullptr;
719 : v.construct_write_eof_awaitable = nullptr;
720 :
721 : if constexpr (WriteSink<S>)
722 : {
723 : v.construct_write_some_awaitable =
724 : &construct_write_some_awaitable_impl;
725 : v.construct_write_awaitable =
726 : &construct_write_awaitable_impl;
727 : v.construct_write_eof_buffers_awaitable =
728 : &construct_write_eof_buffers_awaitable_impl;
729 : v.construct_write_eof_awaitable =
730 : &construct_write_eof_awaitable_impl;
731 : }
732 : return v;
733 : }
734 :
735 : static constexpr vtable value = make_vtable();
736 : };
737 :
738 : //----------------------------------------------------------
739 :
740 : inline
741 215 : any_buffer_sink::~any_buffer_sink()
742 : {
743 215 : if(storage_)
744 : {
745 17 : vt_->destroy(sink_);
746 17 : ::operator delete(storage_);
747 : }
748 215 : if(cached_awaitable_)
749 208 : ::operator delete(cached_awaitable_);
750 215 : }
751 :
752 : inline any_buffer_sink&
753 5 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
754 : {
755 5 : if(this != &other)
756 : {
757 4 : if(storage_)
758 : {
759 1 : vt_->destroy(sink_);
760 1 : ::operator delete(storage_);
761 : }
762 4 : if(cached_awaitable_)
763 2 : ::operator delete(cached_awaitable_);
764 4 : sink_ = std::exchange(other.sink_, nullptr);
765 4 : vt_ = std::exchange(other.vt_, nullptr);
766 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
767 4 : storage_ = std::exchange(other.storage_, nullptr);
768 4 : active_ops_ = std::exchange(other.active_ops_, nullptr);
769 4 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
770 : }
771 5 : return *this;
772 : }
773 :
774 : template<BufferSink S>
775 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
776 18 : any_buffer_sink::any_buffer_sink(S s)
777 18 : : vt_(&vtable_for_impl<S>::value)
778 : {
779 : struct guard {
780 : any_buffer_sink* self;
781 : bool committed = false;
782 18 : ~guard() {
783 18 : if(!committed && self->storage_) {
784 0 : self->vt_->destroy(self->sink_);
785 0 : ::operator delete(self->storage_);
786 0 : self->storage_ = nullptr;
787 0 : self->sink_ = nullptr;
788 : }
789 18 : }
790 18 : } g{this};
791 :
792 18 : storage_ = ::operator new(sizeof(S));
793 18 : sink_ = ::new(storage_) S(std::move(s));
794 :
795 18 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
796 :
797 18 : g.committed = true;
798 18 : }
799 :
800 : template<BufferSink S>
801 192 : any_buffer_sink::any_buffer_sink(S* s)
802 192 : : sink_(s)
803 192 : , vt_(&vtable_for_impl<S>::value)
804 : {
805 192 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
806 192 : }
807 :
808 : //----------------------------------------------------------
809 :
810 : inline std::span<mutable_buffer>
811 126 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
812 : {
813 126 : return vt_->do_prepare(sink_, dest);
814 : }
815 :
816 : inline auto
817 96 : any_buffer_sink::commit(std::size_t n)
818 : {
819 : struct awaitable
820 : {
821 : any_buffer_sink* self_;
822 : std::size_t n_;
823 :
824 : bool
825 96 : await_ready()
826 : {
827 192 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
828 96 : self_->sink_,
829 96 : self_->cached_awaitable_,
830 : n_);
831 96 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
832 : }
833 :
834 : std::coroutine_handle<>
835 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
836 : {
837 0 : return self_->active_ops_->await_suspend(
838 0 : self_->cached_awaitable_, h, env);
839 : }
840 :
841 : io_result<>
842 96 : await_resume()
843 : {
844 : struct guard {
845 : any_buffer_sink* self;
846 96 : ~guard() {
847 96 : self->active_ops_->destroy(self->cached_awaitable_);
848 96 : self->active_ops_ = nullptr;
849 96 : }
850 96 : } g{self_};
851 96 : return self_->active_ops_->await_resume(
852 166 : self_->cached_awaitable_);
853 96 : }
854 : };
855 96 : return awaitable{this, n};
856 : }
857 :
858 : inline auto
859 54 : any_buffer_sink::commit_eof(std::size_t n)
860 : {
861 : struct awaitable
862 : {
863 : any_buffer_sink* self_;
864 : std::size_t n_;
865 :
866 : bool
867 54 : await_ready()
868 : {
869 108 : self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
870 54 : self_->sink_,
871 54 : self_->cached_awaitable_,
872 : n_);
873 54 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
874 : }
875 :
876 : std::coroutine_handle<>
877 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
878 : {
879 0 : return self_->active_ops_->await_suspend(
880 0 : self_->cached_awaitable_, h, env);
881 : }
882 :
883 : io_result<>
884 54 : await_resume()
885 : {
886 : struct guard {
887 : any_buffer_sink* self;
888 54 : ~guard() {
889 54 : self->active_ops_->destroy(self->cached_awaitable_);
890 54 : self->active_ops_ = nullptr;
891 54 : }
892 54 : } g{self_};
893 54 : return self_->active_ops_->await_resume(
894 92 : self_->cached_awaitable_);
895 54 : }
896 : };
897 54 : return awaitable{this, n};
898 : }
899 :
900 : //----------------------------------------------------------
901 : // Private helpers for native WriteSink forwarding
902 :
903 : inline auto
904 6 : any_buffer_sink::write_some_(
905 : std::span<const_buffer const> buffers)
906 : {
907 : struct awaitable
908 : {
909 : any_buffer_sink* self_;
910 : std::span<const_buffer const> buffers_;
911 :
912 : bool
913 6 : await_ready() const noexcept
914 : {
915 6 : return false;
916 : }
917 :
918 : std::coroutine_handle<>
919 6 : await_suspend(std::coroutine_handle<> h, io_env const* env)
920 : {
921 12 : self_->active_write_ops_ =
922 12 : self_->vt_->construct_write_some_awaitable(
923 6 : self_->sink_,
924 6 : self_->cached_awaitable_,
925 : buffers_);
926 :
927 6 : if(self_->active_write_ops_->await_ready(
928 6 : self_->cached_awaitable_))
929 6 : return h;
930 :
931 0 : return self_->active_write_ops_->await_suspend(
932 0 : self_->cached_awaitable_, h, env);
933 : }
934 :
935 : io_result<std::size_t>
936 6 : await_resume()
937 : {
938 : struct guard {
939 : any_buffer_sink* self;
940 6 : ~guard() {
941 6 : self->active_write_ops_->destroy(
942 6 : self->cached_awaitable_);
943 6 : self->active_write_ops_ = nullptr;
944 6 : }
945 6 : } g{self_};
946 6 : return self_->active_write_ops_->await_resume(
947 10 : self_->cached_awaitable_);
948 6 : }
949 : };
950 6 : return awaitable{this, buffers};
951 : }
952 :
953 : inline auto
954 14 : any_buffer_sink::write_(
955 : std::span<const_buffer const> buffers)
956 : {
957 : struct awaitable
958 : {
959 : any_buffer_sink* self_;
960 : std::span<const_buffer const> buffers_;
961 :
962 : bool
963 14 : await_ready() const noexcept
964 : {
965 14 : return false;
966 : }
967 :
968 : std::coroutine_handle<>
969 14 : await_suspend(std::coroutine_handle<> h, io_env const* env)
970 : {
971 28 : self_->active_write_ops_ =
972 28 : self_->vt_->construct_write_awaitable(
973 14 : self_->sink_,
974 14 : self_->cached_awaitable_,
975 : buffers_);
976 :
977 14 : if(self_->active_write_ops_->await_ready(
978 14 : self_->cached_awaitable_))
979 14 : return h;
980 :
981 0 : return self_->active_write_ops_->await_suspend(
982 0 : self_->cached_awaitable_, h, env);
983 : }
984 :
985 : io_result<std::size_t>
986 14 : await_resume()
987 : {
988 : struct guard {
989 : any_buffer_sink* self;
990 14 : ~guard() {
991 14 : self->active_write_ops_->destroy(
992 14 : self->cached_awaitable_);
993 14 : self->active_write_ops_ = nullptr;
994 14 : }
995 14 : } g{self_};
996 14 : return self_->active_write_ops_->await_resume(
997 24 : self_->cached_awaitable_);
998 14 : }
999 : };
1000 14 : return awaitable{this, buffers};
1001 : }
1002 :
1003 : inline auto
1004 12 : any_buffer_sink::write_eof_buffers_(
1005 : std::span<const_buffer const> buffers)
1006 : {
1007 : struct awaitable
1008 : {
1009 : any_buffer_sink* self_;
1010 : std::span<const_buffer const> buffers_;
1011 :
1012 : bool
1013 12 : await_ready() const noexcept
1014 : {
1015 12 : return false;
1016 : }
1017 :
1018 : std::coroutine_handle<>
1019 12 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1020 : {
1021 24 : self_->active_write_ops_ =
1022 24 : self_->vt_->construct_write_eof_buffers_awaitable(
1023 12 : self_->sink_,
1024 12 : self_->cached_awaitable_,
1025 : buffers_);
1026 :
1027 12 : if(self_->active_write_ops_->await_ready(
1028 12 : self_->cached_awaitable_))
1029 12 : return h;
1030 :
1031 0 : return self_->active_write_ops_->await_suspend(
1032 0 : self_->cached_awaitable_, h, env);
1033 : }
1034 :
1035 : io_result<std::size_t>
1036 12 : await_resume()
1037 : {
1038 : struct guard {
1039 : any_buffer_sink* self;
1040 12 : ~guard() {
1041 12 : self->active_write_ops_->destroy(
1042 12 : self->cached_awaitable_);
1043 12 : self->active_write_ops_ = nullptr;
1044 12 : }
1045 12 : } g{self_};
1046 12 : return self_->active_write_ops_->await_resume(
1047 20 : self_->cached_awaitable_);
1048 12 : }
1049 : };
1050 12 : return awaitable{this, buffers};
1051 : }
1052 :
1053 : //----------------------------------------------------------
1054 : // Public WriteSink methods
1055 :
1056 : template<ConstBufferSequence CB>
1057 : io_task<std::size_t>
1058 22 : any_buffer_sink::write_some(CB buffers)
1059 : {
1060 : buffer_param<CB> bp(buffers);
1061 : auto src = bp.data();
1062 : if(src.empty())
1063 : co_return {{}, 0};
1064 :
1065 : // Native WriteSink path
1066 : if(vt_->construct_write_some_awaitable)
1067 : co_return co_await write_some_(src);
1068 :
1069 : // Synthesized path: prepare + buffer_copy + commit
1070 : mutable_buffer arr[detail::max_iovec_];
1071 : auto dst_bufs = prepare(arr);
1072 : if(dst_bufs.empty())
1073 : {
1074 : auto [ec] = co_await commit(0);
1075 : if(ec)
1076 : co_return {ec, 0};
1077 : dst_bufs = prepare(arr);
1078 : if(dst_bufs.empty())
1079 : co_return {{}, 0};
1080 : }
1081 :
1082 : auto n = buffer_copy(dst_bufs, src);
1083 : auto [ec] = co_await commit(n);
1084 : if(ec)
1085 : co_return {ec, 0};
1086 : co_return {{}, n};
1087 44 : }
1088 :
1089 : template<ConstBufferSequence CB>
1090 : io_task<std::size_t>
1091 38 : any_buffer_sink::write(CB buffers)
1092 : {
1093 : buffer_param<CB> bp(buffers);
1094 : std::size_t total = 0;
1095 :
1096 : // Native WriteSink path
1097 : if(vt_->construct_write_awaitable)
1098 : {
1099 : for(;;)
1100 : {
1101 : auto bufs = bp.data();
1102 : if(bufs.empty())
1103 : break;
1104 :
1105 : auto [ec, n] = co_await write_(bufs);
1106 : total += n;
1107 : if(ec)
1108 : co_return {ec, total};
1109 : bp.consume(n);
1110 : }
1111 : co_return {{}, total};
1112 : }
1113 :
1114 : // Synthesized path: prepare + buffer_copy + commit
1115 : for(;;)
1116 : {
1117 : auto src = bp.data();
1118 : if(src.empty())
1119 : break;
1120 :
1121 : mutable_buffer arr[detail::max_iovec_];
1122 : auto dst_bufs = prepare(arr);
1123 : if(dst_bufs.empty())
1124 : {
1125 : auto [ec] = co_await commit(0);
1126 : if(ec)
1127 : co_return {ec, total};
1128 : continue;
1129 : }
1130 :
1131 : auto n = buffer_copy(dst_bufs, src);
1132 : auto [ec] = co_await commit(n);
1133 : if(ec)
1134 : co_return {ec, total};
1135 : bp.consume(n);
1136 : total += n;
1137 : }
1138 :
1139 : co_return {{}, total};
1140 76 : }
1141 :
1142 : inline auto
1143 32 : any_buffer_sink::write_eof()
1144 : {
1145 : struct awaitable
1146 : {
1147 : any_buffer_sink* self_;
1148 :
1149 : bool
1150 32 : await_ready()
1151 : {
1152 32 : if(self_->vt_->construct_write_eof_awaitable)
1153 : {
1154 : // Native WriteSink: forward to underlying write_eof()
1155 32 : self_->active_ops_ =
1156 16 : self_->vt_->construct_write_eof_awaitable(
1157 16 : self_->sink_,
1158 16 : self_->cached_awaitable_);
1159 : }
1160 : else
1161 : {
1162 : // Synthesized: commit_eof(0)
1163 32 : self_->active_ops_ =
1164 16 : self_->vt_->construct_commit_eof_awaitable(
1165 16 : self_->sink_,
1166 16 : self_->cached_awaitable_,
1167 : 0);
1168 : }
1169 64 : return self_->active_ops_->await_ready(
1170 32 : self_->cached_awaitable_);
1171 : }
1172 :
1173 : std::coroutine_handle<>
1174 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1175 : {
1176 0 : return self_->active_ops_->await_suspend(
1177 0 : self_->cached_awaitable_, h, env);
1178 : }
1179 :
1180 : io_result<>
1181 32 : await_resume()
1182 : {
1183 : struct guard {
1184 : any_buffer_sink* self;
1185 32 : ~guard() {
1186 32 : self->active_ops_->destroy(self->cached_awaitable_);
1187 32 : self->active_ops_ = nullptr;
1188 32 : }
1189 32 : } g{self_};
1190 32 : return self_->active_ops_->await_resume(
1191 54 : self_->cached_awaitable_);
1192 32 : }
1193 : };
1194 32 : return awaitable{this};
1195 : }
1196 :
1197 : template<ConstBufferSequence CB>
1198 : io_task<std::size_t>
1199 40 : any_buffer_sink::write_eof(CB buffers)
1200 : {
1201 : // Native WriteSink path
1202 : if(vt_->construct_write_eof_buffers_awaitable)
1203 : {
1204 : const_buffer_param<CB> bp(buffers);
1205 : std::size_t total = 0;
1206 :
1207 : for(;;)
1208 : {
1209 : auto bufs = bp.data();
1210 : if(bufs.empty())
1211 : {
1212 : auto [ec] = co_await write_eof();
1213 : co_return {ec, total};
1214 : }
1215 :
1216 : if(!bp.more())
1217 : {
1218 : // Last window: send atomically with EOF
1219 : auto [ec, n] = co_await write_eof_buffers_(bufs);
1220 : total += n;
1221 : co_return {ec, total};
1222 : }
1223 :
1224 : auto [ec, n] = co_await write_(bufs);
1225 : total += n;
1226 : if(ec)
1227 : co_return {ec, total};
1228 : bp.consume(n);
1229 : }
1230 : }
1231 :
1232 : // Synthesized path: prepare + buffer_copy + commit + commit_eof
1233 : buffer_param<CB> bp(buffers);
1234 : std::size_t total = 0;
1235 :
1236 : for(;;)
1237 : {
1238 : auto src = bp.data();
1239 : if(src.empty())
1240 : break;
1241 :
1242 : mutable_buffer arr[detail::max_iovec_];
1243 : auto dst_bufs = prepare(arr);
1244 : if(dst_bufs.empty())
1245 : {
1246 : auto [ec] = co_await commit(0);
1247 : if(ec)
1248 : co_return {ec, total};
1249 : continue;
1250 : }
1251 :
1252 : auto n = buffer_copy(dst_bufs, src);
1253 : auto [ec] = co_await commit(n);
1254 : if(ec)
1255 : co_return {ec, total};
1256 : bp.consume(n);
1257 : total += n;
1258 : }
1259 :
1260 : auto [ec] = co_await commit_eof(0);
1261 : if(ec)
1262 : co_return {ec, total};
1263 :
1264 : co_return {{}, total};
1265 80 : }
1266 :
1267 : //----------------------------------------------------------
1268 :
1269 : static_assert(BufferSink<any_buffer_sink>);
1270 : static_assert(WriteSink<any_buffer_sink>);
1271 :
1272 : } // namespace capy
1273 : } // namespace boost
1274 :
1275 : #endif
|