LCOV - code coverage report
Current view: top level - capy/ex - async_mutex.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 98.9 % 94 93
Test Date: 2026-02-12 14:50:59 Functions: 100.0 % 20 20

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
      11              : #define BOOST_CAPY_ASYNC_MUTEX_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/intrusive.hpp>
      15              : #include <boost/capy/concept/executor.hpp>
      16              : #include <boost/capy/error.hpp>
      17              : #include <boost/capy/ex/io_env.hpp>
      18              : #include <boost/capy/io_result.hpp>
      19              : 
      20              : #include <stop_token>
      21              : 
      22              : #include <atomic>
      23              : #include <coroutine>
      24              : #include <new>
      25              : #include <utility>
      26              : 
      27              : /*  async_mutex implementation notes
      28              :     ================================
      29              : 
      30              :     Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
      31              :     inherits intrusive_list<lock_awaiter>::node; the list is owned by
      32              :     async_mutex::waiters_.
      33              : 
      34              :     Cancellation via stop_token
      35              :     ---------------------------
      36              :     A std::stop_callback is registered in await_suspend. Two actors can
      37              :     race to resume the suspended coroutine: unlock() and the stop callback.
      38              :     An atomic bool `claimed_` resolves the race -- whoever does
      39              :     claimed_.exchange(true) and reads false wins. The loser does nothing.
      40              : 
      41              :     The stop callback calls ex_.post(h_). The stop_callback is
      42              :     destroyed later in await_resume. cancel_fn touches no members
      43              :     after post returns (same pattern as delete-this).
      44              : 
      45              :     unlock() pops waiters from the front. If the popped waiter was
      46              :     already claimed by the stop callback, unlock() skips it and tries
      47              :     the next. await_resume removes the (still-linked) canceled waiter
      48              :     via waiters_.remove(this).
      49              : 
      50              :     The stop_callback lives in a union to suppress automatic
      51              :     construction/destruction. Placement new in await_suspend, explicit
      52              :     destructor call in await_resume and ~lock_awaiter.
      53              : 
      54              :     Member ordering constraint
      55              :     --------------------------
      56              :     The union containing stop_cb_ must be declared AFTER the members
      57              :     the callback accesses (h_, ex_, claimed_, canceled_). If the
      58              :     stop_cb_ destructor blocks waiting for a concurrent callback, those
      59              :     members must still be alive (C++ destroys in reverse declaration
      60              :     order).
      61              : 
      62              :     active_ flag
      63              :     ------------
      64              :     Tracks both list membership and stop_cb_ lifetime (they are always
      65              :     set and cleared together). Used by the destructor to clean up if the
      66              :     coroutine is destroyed while suspended (e.g. execution_context
      67              :     shutdown).
      68              : 
      69              :     Cancellation scope
      70              :     ------------------
      71              :     Cancellation only takes effect while the coroutine is suspended in
      72              :     the wait queue. If the mutex is unlocked, await_ready acquires it
      73              :     immediately without checking the stop token. This is intentional:
      74              :     the fast path has no token access and no overhead.
      75              : 
      76              :     Threading assumptions
      77              :     ---------------------
      78              :     - All list mutations happen on the executor thread (await_suspend,
      79              :       await_resume, unlock, ~lock_awaiter).
      80              :     - The stop callback may fire from any thread, but only touches
      81              :       claimed_ (atomic) and then calls post. It never touches the
      82              :       list.
      83              :     - ~lock_awaiter must be called from the executor thread. This is
      84              :       guaranteed during normal shutdown but NOT if the coroutine frame
      85              :       is destroyed from another thread while a stop callback could
      86              :       fire (precondition violation, same as cppcoro/folly).
      87              : */
      88              : 
      89              : namespace boost {
      90              : namespace capy {
      91              : 
      92              : /** An asynchronous mutex for coroutines.
      93              : 
      94              :     This mutex provides mutual exclusion for coroutines without blocking.
      95              :     When a coroutine attempts to acquire a locked mutex, it suspends and
      96              :     is added to an intrusive wait queue. When the holder unlocks, the next
      97              :     waiter is resumed with the lock held.
      98              : 
      99              :     @par Cancellation
     100              : 
     101              :     When a coroutine is suspended waiting for the mutex and its stop
     102              :     token is triggered, the waiter completes with `error::canceled`
     103              :     instead of acquiring the lock.
     104              : 
     105              :     Cancellation only applies while the coroutine is suspended in the
     106              :     wait queue. If the mutex is unlocked when `lock()` is called, the
     107              :     lock is acquired immediately even if the stop token is already
     108              :     signaled.
     109              : 
     110              :     @par Zero Allocation
     111              : 
     112              :     No heap allocation occurs for lock operations.
     113              : 
     114              :     @par Thread Safety
     115              : 
     116              :     The mutex operations are designed for single-threaded use on one
     117              :     executor. The stop callback may fire from any thread.
     118              : 
     119              :     @par Example
     120              :     @code
     121              :     async_mutex cm;
     122              : 
     123              :     task<> protected_operation() {
     124              :         auto [ec] = co_await cm.lock();
     125              :         if(ec)
     126              :             co_return;
     127              :         // ... critical section ...
     128              :         cm.unlock();
     129              :     }
     130              : 
     131              :     // Or with RAII:
     132              :     task<> protected_operation() {
     133              :         auto [ec, guard] = co_await cm.scoped_lock();
     134              :         if(ec)
     135              :             co_return;
     136              :         // ... critical section ...
     137              :         // unlocks automatically
     138              :     }
     139              :     @endcode
     140              : */
     141              : class async_mutex
     142              : {
     143              : public:
     144              :     class lock_awaiter;
     145              :     class lock_guard;
     146              :     class lock_guard_awaiter;
     147              : 
     148              : private:
     149              :     bool locked_ = false;
     150              :     detail::intrusive_list<lock_awaiter> waiters_;
     151              : 
     152              : public:
     153              :     /** Awaiter returned by lock().
     154              :     */
     155              :     class lock_awaiter
     156              :         : public detail::intrusive_list<lock_awaiter>::node
     157              :     {
     158              :         friend class async_mutex;
     159              : 
     160              :         async_mutex* m_;
     161              :         std::coroutine_handle<> h_;
     162              :         executor_ref ex_;
     163              : 
     164              :         // These members must be declared before stop_cb_
     165              :         // (see comment on the union below).
     166              :         std::atomic<bool> claimed_{false};
     167              :         bool canceled_ = false;
     168              :         bool active_ = false;
     169              : 
     170              :         struct cancel_fn
     171              :         {
     172              :             lock_awaiter* self_;
     173              : 
     174            6 :             void operator()() const noexcept
     175              :             {
     176            6 :                 if(!self_->claimed_.exchange(
     177              :                     true, std::memory_order_acq_rel))
     178              :                 {
     179            6 :                     self_->canceled_ = true;
     180            6 :                     self_->ex_.post(self_->h_);
     181              :                 }
     182            6 :             }
     183              :         };
     184              : 
     185              :         using stop_cb_t =
     186              :             std::stop_callback<cancel_fn>;
     187              : 
     188              :         // Aligned storage for stop_cb_t. Declared last:
     189              :         // its destructor may block while the callback
     190              :         // accesses the members above.
     191              : #ifdef _MSC_VER
     192              : # pragma warning(push)
     193              : # pragma warning(disable: 4324) // padded due to alignas
     194              : #endif
     195              :         alignas(stop_cb_t)
     196              :             unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
     197              : #ifdef _MSC_VER
     198              : # pragma warning(pop)
     199              : #endif
     200              : 
     201           17 :         stop_cb_t& stop_cb_() noexcept
     202              :         {
     203              :             return *reinterpret_cast<stop_cb_t*>(
     204           17 :                 stop_cb_buf_);
     205              :         }
     206              : 
     207              :     public:
     208           70 :         ~lock_awaiter()
     209              :         {
     210           70 :             if(active_)
     211              :             {
     212            3 :                 stop_cb_().~stop_cb_t();
     213            3 :                 m_->waiters_.remove(this);
     214              :             }
     215           70 :         }
     216              : 
     217           35 :         explicit lock_awaiter(async_mutex* m) noexcept
     218           35 :             : m_(m)
     219              :         {
     220           35 :         }
     221              : 
     222           35 :         lock_awaiter(lock_awaiter&& o) noexcept
     223           35 :             : m_(o.m_)
     224           35 :             , h_(o.h_)
     225           35 :             , ex_(o.ex_)
     226           35 :             , claimed_(o.claimed_.load(
     227              :                 std::memory_order_relaxed))
     228           35 :             , canceled_(o.canceled_)
     229           35 :             , active_(std::exchange(o.active_, false))
     230              :         {
     231           35 :         }
     232              : 
     233              :         lock_awaiter(lock_awaiter const&) = delete;
     234              :         lock_awaiter& operator=(lock_awaiter const&) = delete;
     235              :         lock_awaiter& operator=(lock_awaiter&&) = delete;
     236              : 
     237           35 :         bool await_ready() const noexcept
     238              :         {
     239           35 :             if(!m_->locked_)
     240              :             {
     241           16 :                 m_->locked_ = true;
     242           16 :                 return true;
     243              :             }
     244           19 :             return false;
     245              :         }
     246              : 
     247              :         /** IoAwaitable protocol overload. */
     248              :         std::coroutine_handle<>
     249           19 :         await_suspend(
     250              :             std::coroutine_handle<> h,
     251              :             io_env const* env) noexcept
     252              :         {
     253           19 :             if(env->stop_token.stop_requested())
     254              :             {
     255            2 :                 canceled_ = true;
     256            2 :                 return h;
     257              :             }
     258           17 :             h_ = h;
     259           17 :             ex_ = env->executor;
     260           17 :             m_->waiters_.push_back(this);
     261           51 :             ::new(stop_cb_buf_) stop_cb_t(
     262           17 :                 env->stop_token, cancel_fn{this});
     263           17 :             active_ = true;
     264           17 :             return std::noop_coroutine();
     265              :         }
     266              : 
     267           32 :         io_result<> await_resume() noexcept
     268              :         {
     269           32 :             if(active_)
     270              :             {
     271           14 :                 stop_cb_().~stop_cb_t();
     272           14 :                 if(canceled_)
     273              :                 {
     274            6 :                     m_->waiters_.remove(this);
     275            6 :                     active_ = false;
     276            6 :                     return {make_error_code(
     277            6 :                         error::canceled)};
     278              :                 }
     279            8 :                 active_ = false;
     280              :             }
     281           26 :             if(canceled_)
     282            2 :                 return {make_error_code(
     283            2 :                     error::canceled)};
     284           24 :             return {{}};
     285              :         }
     286              :     };
     287              : 
     288              :     /** RAII lock guard for async_mutex.
     289              : 
     290              :         Automatically unlocks the mutex when destroyed.
     291              :     */
     292              :     class [[nodiscard]] lock_guard
     293              :     {
     294              :         async_mutex* m_;
     295              : 
     296              :     public:
     297            5 :         ~lock_guard()
     298              :         {
     299            5 :             if(m_)
     300            2 :                 m_->unlock();
     301            5 :         }
     302              : 
     303            2 :         lock_guard() noexcept
     304            2 :             : m_(nullptr)
     305              :         {
     306            2 :         }
     307              : 
     308            2 :         explicit lock_guard(async_mutex* m) noexcept
     309            2 :             : m_(m)
     310              :         {
     311            2 :         }
     312              : 
     313            1 :         lock_guard(lock_guard&& o) noexcept
     314            1 :             : m_(std::exchange(o.m_, nullptr))
     315              :         {
     316            1 :         }
     317              : 
     318              :         lock_guard& operator=(lock_guard&& o) noexcept
     319              :         {
     320              :             if(this != &o)
     321              :             {
     322              :                 if(m_)
     323              :                     m_->unlock();
     324              :                 m_ = std::exchange(o.m_, nullptr);
     325              :             }
     326              :             return *this;
     327              :         }
     328              : 
     329              :         lock_guard(lock_guard const&) = delete;
     330              :         lock_guard& operator=(lock_guard const&) = delete;
     331              :     };
     332              : 
     333              :     /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
     334              :     */
     335              :     class lock_guard_awaiter
     336              :     {
     337              :         async_mutex* m_;
     338              :         lock_awaiter inner_;
     339              : 
     340              :     public:
     341            4 :         explicit lock_guard_awaiter(async_mutex* m) noexcept
     342            4 :             : m_(m)
     343            4 :             , inner_(m)
     344              :         {
     345            4 :         }
     346              : 
     347            4 :         bool await_ready() const noexcept
     348              :         {
     349            4 :             return inner_.await_ready();
     350              :         }
     351              : 
     352              :         /** IoAwaitable protocol overload. */
     353              :         std::coroutine_handle<>
     354            2 :         await_suspend(
     355              :             std::coroutine_handle<> h,
     356              :             io_env const* env) noexcept
     357              :         {
     358            2 :             return inner_.await_suspend(h, env);
     359              :         }
     360              : 
     361            4 :         io_result<lock_guard> await_resume() noexcept
     362              :         {
     363            4 :             auto r = inner_.await_resume();
     364            4 :             if(r.ec)
     365            2 :                 return {r.ec, {}};
     366            2 :             return {{}, lock_guard(m_)};
     367            4 :         }
     368              :     };
     369              : 
     370              :     async_mutex() = default;
     371              : 
     372              :     // Non-copyable, non-movable
     373              :     async_mutex(async_mutex const&) = delete;
     374              :     async_mutex& operator=(async_mutex const&) = delete;
     375              : 
     376              :     /** Returns an awaiter that acquires the mutex.
     377              : 
     378              :         @return An awaitable yielding `(error_code)`.
     379              :     */
     380           31 :     lock_awaiter lock() noexcept
     381              :     {
     382           31 :         return lock_awaiter{this};
     383              :     }
     384              : 
     385              :     /** Returns an awaiter that acquires the mutex with RAII.
     386              : 
     387              :         @return An awaitable yielding `(error_code,lock_guard)`.
     388              :     */
     389            4 :     lock_guard_awaiter scoped_lock() noexcept
     390              :     {
     391            4 :         return lock_guard_awaiter(this);
     392              :     }
     393              : 
     394              :     /** Releases the mutex.
     395              : 
     396              :         If waiters are queued, the next eligible waiter is
     397              :         resumed with the lock held. Canceled waiters are
     398              :         skipped. If no eligible waiter remains, the mutex
     399              :         becomes unlocked.
     400              :     */
     401           24 :     void unlock() noexcept
     402              :     {
     403              :         for(;;)
     404              :         {
     405           24 :             auto* waiter = waiters_.pop_front();
     406           24 :             if(!waiter)
     407              :             {
     408           16 :                 locked_ = false;
     409           16 :                 return;
     410              :             }
     411            8 :             if(!waiter->claimed_.exchange(
     412              :                 true, std::memory_order_acq_rel))
     413              :             {
     414            8 :                 waiter->ex_.post(waiter->h_);
     415            8 :                 return;
     416              :             }
     417            0 :         }
     418              :     }
     419              : 
     420              :     /** Returns true if the mutex is currently locked.
     421              :     */
     422           26 :     bool is_locked() const noexcept
     423              :     {
     424           26 :         return locked_;
     425              :     }
     426              : };
     427              : 
     428              : } // namespace capy
     429              : } // namespace boost
     430              : 
     431              : #endif
        

Generated by: LCOV version 2.3