LCOV - code coverage report
Current view: top level - capy/io - any_read_stream.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 87.4 % 95 83
Test Date: 2026-02-12 14:50:59 Functions: 79.6 % 54 43

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
      11              : #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_array.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/read_stream.hpp>
      19              : #include <boost/capy/ex/io_env.hpp>
      20              : #include <boost/capy/io_result.hpp>
      21              : 
      22              : #include <concepts>
      23              : #include <coroutine>
      24              : #include <cstddef>
      25              : #include <new>
      26              : #include <span>
      27              : #include <stop_token>
      28              : #include <system_error>
      29              : #include <utility>
      30              : 
      31              : namespace boost {
      32              : namespace capy {
      33              : 
      34              : /** Type-erased wrapper for any ReadStream.
      35              : 
      36              :     This class provides type erasure for any type satisfying the
      37              :     @ref ReadStream concept, enabling runtime polymorphism for
      38              :     read operations. It uses cached awaitable storage to achieve
      39              :     zero steady-state allocation after construction.
      40              : 
      41              :     The wrapper supports two construction modes:
      42              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      43              :       allocates storage and owns the stream.
      44              :     - **Reference**: Pass a pointer to wrap without ownership. The
      45              :       pointed-to stream must outlive this wrapper.
      46              : 
      47              :     @par Awaitable Preallocation
      48              :     The constructor preallocates storage for the type-erased awaitable.
      49              :     This reserves all virtual address space at server startup
      50              :     so memory usage can be measured up front, rather than
      51              :     allocating piecemeal as traffic arrives.
      52              : 
      53              :     @par Immediate Completion
      54              :     When the underlying stream's awaitable reports ready immediately
      55              :     (e.g. buffered data already available), the wrapper skips
      56              :     coroutine suspension entirely and returns the result inline.
      57              : 
      58              :     @par Thread Safety
      59              :     Not thread-safe. Concurrent operations on the same wrapper
      60              :     are undefined behavior.
      61              : 
      62              :     @par Example
      63              :     @code
      64              :     // Owning - takes ownership of the stream
      65              :     any_read_stream stream(socket{ioc});
      66              : 
      67              :     // Reference - wraps without ownership
      68              :     socket sock(ioc);
      69              :     any_read_stream stream(&sock);
      70              : 
      71              :     mutable_buffer buf(data, size);
      72              :     auto [ec, n] = co_await stream.read_some(buf);
      73              :     @endcode
      74              : 
      75              :     @see any_write_stream, any_stream, ReadStream
      76              : */
      77              : class any_read_stream
      78              : {
      79              :     struct vtable;
      80              : 
      81              :     template<ReadStream S>
      82              :     struct vtable_for_impl;
      83              : 
      84              :     // ordered for cache line coherence
      85              :     void* stream_ = nullptr;
      86              :     vtable const* vt_ = nullptr;
      87              :     void* cached_awaitable_ = nullptr;
      88              :     void* storage_ = nullptr;
      89              :     bool awaitable_active_ = false;
      90              : 
      91              : public:
      92              :     /** Destructor.
      93              : 
      94              :         Destroys the owned stream (if any) and releases the cached
      95              :         awaitable storage.
      96              :     */
      97              :     ~any_read_stream();
      98              : 
      99              :     /** Default constructor.
     100              : 
     101              :         Constructs an empty wrapper. Operations on a default-constructed
     102              :         wrapper result in undefined behavior.
     103              :     */
     104            1 :     any_read_stream() = default;
     105              : 
     106              :     /** Non-copyable.
     107              : 
     108              :         The awaitable cache is per-instance and cannot be shared.
     109              :     */
     110              :     any_read_stream(any_read_stream const&) = delete;
     111              :     any_read_stream& operator=(any_read_stream const&) = delete;
     112              : 
     113              :     /** Move constructor.
     114              : 
     115              :         Transfers ownership of the wrapped stream (if owned) and
     116              :         cached awaitable storage from `other`. After the move, `other` is
     117              :         in a default-constructed state.
     118              : 
     119              :         @param other The wrapper to move from.
     120              :     */
     121            2 :     any_read_stream(any_read_stream&& other) noexcept
     122            2 :         : stream_(std::exchange(other.stream_, nullptr))
     123            2 :         , vt_(std::exchange(other.vt_, nullptr))
     124            2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     125            2 :         , storage_(std::exchange(other.storage_, nullptr))
     126            2 :         , awaitable_active_(std::exchange(other.awaitable_active_, false))
     127              :     {
     128            2 :     }
     129              : 
     130              :     /** Move assignment operator.
     131              : 
     132              :         Destroys any owned stream and releases existing resources,
     133              :         then transfers ownership from `other`.
     134              : 
     135              :         @param other The wrapper to move from.
     136              :         @return Reference to this wrapper.
     137              :     */
     138              :     any_read_stream&
     139              :     operator=(any_read_stream&& other) noexcept;
     140              : 
     141              :     /** Construct by taking ownership of a ReadStream.
     142              : 
     143              :         Allocates storage and moves the stream into this wrapper.
     144              :         The wrapper owns the stream and will destroy it.
     145              : 
     146              :         @param s The stream to take ownership of.
     147              :     */
     148              :     template<ReadStream S>
     149              :         requires (!std::same_as<std::decay_t<S>, any_read_stream>)
     150              :     any_read_stream(S s);
     151              : 
     152              :     /** Construct by wrapping a ReadStream without ownership.
     153              : 
     154              :         Wraps the given stream by pointer. The stream must remain
     155              :         valid for the lifetime of this wrapper.
     156              : 
     157              :         @param s Pointer to the stream to wrap.
     158              :     */
     159              :     template<ReadStream S>
     160              :     any_read_stream(S* s);
     161              : 
     162              :     /** Check if the wrapper contains a valid stream.
     163              : 
     164              :         @return `true` if wrapping a stream, `false` if default-constructed
     165              :             or moved-from.
     166              :     */
     167              :     bool
     168           25 :     has_value() const noexcept
     169              :     {
     170           25 :         return stream_ != nullptr;
     171              :     }
     172              : 
     173              :     /** Check if the wrapper contains a valid stream.
     174              : 
     175              :         @return `true` if wrapping a stream, `false` if default-constructed
     176              :             or moved-from.
     177              :     */
     178              :     explicit
     179            3 :     operator bool() const noexcept
     180              :     {
     181            3 :         return has_value();
     182              :     }
     183              : 
     184              :     /** Initiate an asynchronous read operation.
     185              : 
     186              :         Reads data into the provided buffer sequence. The operation
     187              :         completes when at least one byte has been read, or an error
     188              :         occurs.
     189              : 
     190              :         @param buffers The buffer sequence to read into. Passed by
     191              :             value to ensure the sequence lives in the coroutine frame
     192              :             across suspension points.
     193              : 
     194              :         @return An awaitable yielding `(error_code,std::size_t)`.
     195              : 
     196              :         @par Immediate Completion
     197              :         The operation completes immediately without suspending
     198              :         the calling coroutine when the underlying stream's
     199              :         awaitable reports immediate readiness via `await_ready`.
     200              : 
     201              :         @note This is a partial operation and may not process the
     202              :         entire buffer sequence. Use the composed @ref read algorithm
     203              :         for guaranteed complete transfer.
     204              : 
     205              :         @par Preconditions
     206              :         The wrapper must contain a valid stream (`has_value() == true`).
     207              :         The caller must not call this function again after a prior
     208              :         call returned an error (including EOF).
     209              :     */
     210              :     template<MutableBufferSequence MB>
     211              :     auto
     212              :     read_some(MB buffers);
     213              : 
     214              : protected:
     215              :     /** Rebind to a new stream after move.
     216              : 
     217              :         Updates the internal pointer to reference a new stream object.
     218              :         Used by owning wrappers after move assignment when the owned
     219              :         object has moved to a new location.
     220              : 
     221              :         @param new_stream The new stream to bind to. Must be the same
     222              :             type as the original stream.
     223              : 
     224              :         @note Terminates if called with a stream of different type
     225              :             than the original.
     226              :     */
     227              :     template<ReadStream S>
     228              :     void
     229              :     rebind(S& new_stream) noexcept
     230              :     {
     231              :         if(vt_ != &vtable_for_impl<S>::value)
     232              :             std::terminate();
     233              :         stream_ = &new_stream;
     234              :     }
     235              : };
     236              : 
     237              : //----------------------------------------------------------
     238              : 
     239              : struct any_read_stream::vtable
     240              : {
     241              :     // ordered by call frequency for cache line coherence
     242              :     void (*construct_awaitable)(
     243              :         void* stream,
     244              :         void* storage,
     245              :         std::span<mutable_buffer const> buffers);
     246              :     bool (*await_ready)(void*);
     247              :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     248              :     io_result<std::size_t> (*await_resume)(void*);
     249              :     void (*destroy_awaitable)(void*) noexcept;
     250              :     std::size_t awaitable_size;
     251              :     std::size_t awaitable_align;
     252              :     void (*destroy)(void*) noexcept;
     253              : };
     254              : 
     255              : template<ReadStream S>
     256              : struct any_read_stream::vtable_for_impl
     257              : {
     258              :     using Awaitable = decltype(std::declval<S&>().read_some(
     259              :         std::span<mutable_buffer const>{}));
     260              : 
     261              :     static void
     262            1 :     do_destroy_impl(void* stream) noexcept
     263              :     {
     264            1 :         static_cast<S*>(stream)->~S();
     265            1 :     }
     266              : 
     267              :     static void
     268           91 :     construct_awaitable_impl(
     269              :         void* stream,
     270              :         void* storage,
     271              :         std::span<mutable_buffer const> buffers)
     272              :     {
     273           91 :         auto& s = *static_cast<S*>(stream);
     274           91 :         ::new(storage) Awaitable(s.read_some(buffers));
     275           91 :     }
     276              : 
     277              :     static constexpr vtable value = {
     278              :         &construct_awaitable_impl,
     279           91 :         +[](void* p) {
     280           91 :             return static_cast<Awaitable*>(p)->await_ready();
     281              :         },
     282            0 :         +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     283            0 :             return detail::call_await_suspend(
     284            0 :                 static_cast<Awaitable*>(p), h, env);
     285              :         },
     286           89 :         +[](void* p) {
     287           89 :             return static_cast<Awaitable*>(p)->await_resume();
     288              :         },
     289           93 :         +[](void* p) noexcept {
     290           16 :             static_cast<Awaitable*>(p)->~Awaitable();
     291              :         },
     292              :         sizeof(Awaitable),
     293              :         alignof(Awaitable),
     294              :         &do_destroy_impl
     295              :     };
     296              : };
     297              : 
     298              : //----------------------------------------------------------
     299              : 
     300              : inline
     301          101 : any_read_stream::~any_read_stream()
     302              : {
     303          101 :     if(storage_)
     304              :     {
     305            1 :         vt_->destroy(stream_);
     306            1 :         ::operator delete(storage_);
     307              :     }
     308          101 :     if(cached_awaitable_)
     309              :     {
     310           91 :         if(awaitable_active_)
     311            1 :             vt_->destroy_awaitable(cached_awaitable_);
     312           91 :         ::operator delete(cached_awaitable_);
     313              :     }
     314          101 : }
     315              : 
     316              : inline any_read_stream&
     317            5 : any_read_stream::operator=(any_read_stream&& other) noexcept
     318              : {
     319            5 :     if(this != &other)
     320              :     {
     321            5 :         if(storage_)
     322              :         {
     323            0 :             vt_->destroy(stream_);
     324            0 :             ::operator delete(storage_);
     325              :         }
     326            5 :         if(cached_awaitable_)
     327              :         {
     328            2 :             if(awaitable_active_)
     329            1 :                 vt_->destroy_awaitable(cached_awaitable_);
     330            2 :             ::operator delete(cached_awaitable_);
     331              :         }
     332            5 :         stream_ = std::exchange(other.stream_, nullptr);
     333            5 :         vt_ = std::exchange(other.vt_, nullptr);
     334            5 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     335            5 :         storage_ = std::exchange(other.storage_, nullptr);
     336            5 :         awaitable_active_ = std::exchange(other.awaitable_active_, false);
     337              :     }
     338            5 :     return *this;
     339              : }
     340              : 
     341              : template<ReadStream S>
     342              :     requires (!std::same_as<std::decay_t<S>, any_read_stream>)
     343            1 : any_read_stream::any_read_stream(S s)
     344            1 :     : vt_(&vtable_for_impl<S>::value)
     345              : {
     346              :     struct guard {
     347              :         any_read_stream* self;
     348              :         bool committed = false;
     349            1 :         ~guard() {
     350            1 :             if(!committed && self->storage_) {
     351            0 :                 self->vt_->destroy(self->stream_);
     352            0 :                 ::operator delete(self->storage_);
     353            0 :                 self->storage_ = nullptr;
     354            0 :                 self->stream_ = nullptr;
     355              :             }
     356            1 :         }
     357            1 :     } g{this};
     358              : 
     359            1 :     storage_ = ::operator new(sizeof(S));
     360            1 :     stream_ = ::new(storage_) S(std::move(s));
     361              : 
     362              :     // Preallocate the awaitable storage
     363            1 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     364              : 
     365            1 :     g.committed = true;
     366            1 : }
     367              : 
     368              : template<ReadStream S>
     369           92 : any_read_stream::any_read_stream(S* s)
     370           92 :     : stream_(s)
     371           92 :     , vt_(&vtable_for_impl<S>::value)
     372              : {
     373              :     // Preallocate the awaitable storage
     374           92 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     375           92 : }
     376              : 
     377              : //----------------------------------------------------------
     378              : 
     379              : template<MutableBufferSequence MB>
     380              : auto
     381           91 : any_read_stream::read_some(MB buffers)
     382              : {
     383              :     // VFALCO in theory, we could use if constexpr to detect a
     384              :     // span and then pass that through to read_some without the array
     385              :     struct awaitable
     386              :     {
     387              :         any_read_stream* self_;
     388              :         mutable_buffer_array<detail::max_iovec_> ba_;
     389              : 
     390              :         bool
     391           91 :         await_ready()
     392              :         {
     393           91 :             self_->vt_->construct_awaitable(
     394           91 :                 self_->stream_,
     395           91 :                 self_->cached_awaitable_,
     396           91 :                 ba_.to_span());
     397           91 :             self_->awaitable_active_ = true;
     398              : 
     399          182 :             return self_->vt_->await_ready(
     400           91 :                 self_->cached_awaitable_);
     401              :         }
     402              : 
     403              :         std::coroutine_handle<>
     404            0 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     405              :         {
     406            0 :             return self_->vt_->await_suspend(
     407            0 :                 self_->cached_awaitable_, h, env);
     408              :         }
     409              : 
     410              :         io_result<std::size_t>
     411           89 :         await_resume()
     412              :         {
     413              :             struct guard {
     414              :                 any_read_stream* self;
     415           89 :                 ~guard() {
     416           89 :                     self->vt_->destroy_awaitable(self->cached_awaitable_);
     417           89 :                     self->awaitable_active_ = false;
     418           89 :                 }
     419           89 :             } g{self_};
     420           89 :             return self_->vt_->await_resume(
     421          154 :                 self_->cached_awaitable_);
     422           89 :         }
     423              :     };
     424              :     return awaitable{this,
     425           91 :         mutable_buffer_array<detail::max_iovec_>(buffers)};
     426           91 : }
     427              : 
     428              : } // namespace capy
     429              : } // namespace boost
     430              : 
     431              : #endif
        

Generated by: LCOV version 2.3