1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/error.hpp>
16  
#include <boost/capy/error.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/io_result.hpp>
18  
#include <boost/capy/io_result.hpp>
19  

19  

20  
#include <stop_token>
20  
#include <stop_token>
21  

21  

22  
#include <atomic>
22  
#include <atomic>
23  
#include <coroutine>
23  
#include <coroutine>
24  
#include <new>
24  
#include <new>
25  
#include <utility>
25  
#include <utility>
26  

26  

27  
/*  async_event implementation notes
27  
/*  async_event implementation notes
28  
    =================================
28  
    =================================
29  

29  

30  
    Same cancellation pattern as async_mutex (see that file for the
30  
    Same cancellation pattern as async_mutex (see that file for the
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
32  
    and threading assumptions).
32  
    and threading assumptions).
33  

33  

34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
35  
    It pops every waiter from the list and posts the ones it
35  
    It pops every waiter from the list and posts the ones it
36  
    claims. Waiters already claimed by a stop callback are skipped.
36  
    claims. Waiters already claimed by a stop callback are skipped.
37  

37  

38  
    Because set() pops all waiters, a canceled waiter may have been
38  
    Because set() pops all waiters, a canceled waiter may have been
39  
    removed from the list by set() before its await_resume runs.
39  
    removed from the list by set() before its await_resume runs.
40  
    This requires a separate in_list_ flag (unlike async_mutex where
40  
    This requires a separate in_list_ flag (unlike async_mutex where
41  
    active_ served double duty). await_resume only calls remove()
41  
    active_ served double duty). await_resume only calls remove()
42  
    when in_list_ is true.
42  
    when in_list_ is true.
43  
*/
43  
*/
44  

44  

45  
namespace boost {
45  
namespace boost {
46  
namespace capy {
46  
namespace capy {
47  

47  

48  
/** An asynchronous event for coroutines.
48  
/** An asynchronous event for coroutines.
49  

49  

50  
    This event provides a way to notify multiple coroutines that some
50  
    This event provides a way to notify multiple coroutines that some
51  
    condition has occurred. When a coroutine awaits an unset event, it
51  
    condition has occurred. When a coroutine awaits an unset event, it
52  
    suspends and is added to a wait queue. When the event is set, all
52  
    suspends and is added to a wait queue. When the event is set, all
53  
    waiting coroutines are resumed.
53  
    waiting coroutines are resumed.
54  

54  

55  
    @par Cancellation
55  
    @par Cancellation
56  

56  

57  
    When a coroutine is suspended waiting for the event and its stop
57  
    When a coroutine is suspended waiting for the event and its stop
58  
    token is triggered, the waiter completes with `error::canceled`
58  
    token is triggered, the waiter completes with `error::canceled`
59  
    instead of waiting for `set()`.
59  
    instead of waiting for `set()`.
60  

60  

61  
    Cancellation only applies while the coroutine is suspended in the
61  
    Cancellation only applies while the coroutine is suspended in the
62  
    wait queue. If the event is already set when `wait()` is called,
62  
    wait queue. If the event is already set when `wait()` is called,
63  
    the wait completes immediately even if the stop token is already
63  
    the wait completes immediately even if the stop token is already
64  
    signaled.
64  
    signaled.
65  

65  

66  
    @par Zero Allocation
66  
    @par Zero Allocation
67  

67  

68  
    No heap allocation occurs for wait operations.
68  
    No heap allocation occurs for wait operations.
69  

69  

70  
    @par Thread Safety
70  
    @par Thread Safety
71  

71  

72  
    The event operations are designed for single-threaded use on one
72  
    The event operations are designed for single-threaded use on one
73  
    executor. The stop callback may fire from any thread.
73  
    executor. The stop callback may fire from any thread.
74  

74  

75  
    @par Example
75  
    @par Example
76  
    @code
76  
    @code
77  
    async_event event;
77  
    async_event event;
78  

78  

79  
    task<> waiter() {
79  
    task<> waiter() {
80  
        auto [ec] = co_await event.wait();
80  
        auto [ec] = co_await event.wait();
81  
        if(ec)
81  
        if(ec)
82  
            co_return;
82  
            co_return;
83  
        // ... event was set ...
83  
        // ... event was set ...
84  
    }
84  
    }
85  

85  

86  
    task<> notifier() {
86  
    task<> notifier() {
87  
        // ... do some work ...
87  
        // ... do some work ...
88  
        event.set();  // Wake all waiters
88  
        event.set();  // Wake all waiters
89  
    }
89  
    }
90  
    @endcode
90  
    @endcode
91  
*/
91  
*/
92  
class async_event
92  
class async_event
93  
{
93  
{
94  
public:
94  
public:
95  
    class wait_awaiter;
95  
    class wait_awaiter;
96  

96  

97  
private:
97  
private:
98  
    bool set_ = false;
98  
    bool set_ = false;
99  
    detail::intrusive_list<wait_awaiter> waiters_;
99  
    detail::intrusive_list<wait_awaiter> waiters_;
100  

100  

101  
public:
101  
public:
102  
    /** Awaiter returned by wait().
102  
    /** Awaiter returned by wait().
103  
    */
103  
    */
104  
    class wait_awaiter
104  
    class wait_awaiter
105  
        : public detail::intrusive_list<wait_awaiter>::node
105  
        : public detail::intrusive_list<wait_awaiter>::node
106  
    {
106  
    {
107  
        friend class async_event;
107  
        friend class async_event;
108  

108  

109  
        async_event* e_;
109  
        async_event* e_;
110  
        std::coroutine_handle<> h_;
110  
        std::coroutine_handle<> h_;
111  
        executor_ref ex_;
111  
        executor_ref ex_;
112  

112  

113  
        // Declared before stop_cb_buf_: the callback
113  
        // Declared before stop_cb_buf_: the callback
114  
        // accesses these members, so they must still be
114  
        // accesses these members, so they must still be
115  
        // alive if the stop_cb_ destructor blocks.
115  
        // alive if the stop_cb_ destructor blocks.
116  
        std::atomic<bool> claimed_{false};
116  
        std::atomic<bool> claimed_{false};
117  
        bool canceled_ = false;
117  
        bool canceled_ = false;
118  
        bool active_ = false;
118  
        bool active_ = false;
119  
        bool in_list_ = false;
119  
        bool in_list_ = false;
120  

120  

121  
        struct cancel_fn
121  
        struct cancel_fn
122  
        {
122  
        {
123  
            wait_awaiter* self_;
123  
            wait_awaiter* self_;
124  

124  

125  
            void operator()() const noexcept
125  
            void operator()() const noexcept
126  
            {
126  
            {
127  
                if(!self_->claimed_.exchange(
127  
                if(!self_->claimed_.exchange(
128  
                    true, std::memory_order_acq_rel))
128  
                    true, std::memory_order_acq_rel))
129  
                {
129  
                {
130  
                    self_->canceled_ = true;
130  
                    self_->canceled_ = true;
131  
                    self_->ex_.post(self_->h_);
131  
                    self_->ex_.post(self_->h_);
132  
                }
132  
                }
133  
            }
133  
            }
134  
        };
134  
        };
135  

135  

136  
        using stop_cb_t =
136  
        using stop_cb_t =
137  
            std::stop_callback<cancel_fn>;
137  
            std::stop_callback<cancel_fn>;
138  

138  

139  
        // Aligned storage for stop_cb_t. Declared last:
139  
        // Aligned storage for stop_cb_t. Declared last:
140  
        // its destructor may block while the callback
140  
        // its destructor may block while the callback
141  
        // accesses the members above.
141  
        // accesses the members above.
142  
#ifdef _MSC_VER
142  
#ifdef _MSC_VER
143  
# pragma warning(push)
143  
# pragma warning(push)
144  
# pragma warning(disable: 4324) // padded due to alignas
144  
# pragma warning(disable: 4324) // padded due to alignas
145  
#endif
145  
#endif
146  
        alignas(stop_cb_t)
146  
        alignas(stop_cb_t)
147  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
147  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
148  
#ifdef _MSC_VER
148  
#ifdef _MSC_VER
149  
# pragma warning(pop)
149  
# pragma warning(pop)
150  
#endif
150  
#endif
151  

151  

152  
        stop_cb_t& stop_cb_() noexcept
152  
        stop_cb_t& stop_cb_() noexcept
153  
        {
153  
        {
154  
            return *reinterpret_cast<stop_cb_t*>(
154  
            return *reinterpret_cast<stop_cb_t*>(
155  
                stop_cb_buf_);
155  
                stop_cb_buf_);
156  
        }
156  
        }
157  

157  

158  
    public:
158  
    public:
159  
        ~wait_awaiter()
159  
        ~wait_awaiter()
160  
        {
160  
        {
161  
            if(active_)
161  
            if(active_)
162  
                stop_cb_().~stop_cb_t();
162  
                stop_cb_().~stop_cb_t();
163  
            if(in_list_)
163  
            if(in_list_)
164  
                e_->waiters_.remove(this);
164  
                e_->waiters_.remove(this);
165  
        }
165  
        }
166  

166  

167  
        explicit wait_awaiter(async_event* e) noexcept
167  
        explicit wait_awaiter(async_event* e) noexcept
168  
            : e_(e)
168  
            : e_(e)
169  
        {
169  
        {
170  
        }
170  
        }
171  

171  

172  
        wait_awaiter(wait_awaiter&& o) noexcept
172  
        wait_awaiter(wait_awaiter&& o) noexcept
173  
            : e_(o.e_)
173  
            : e_(o.e_)
174  
            , h_(o.h_)
174  
            , h_(o.h_)
175  
            , ex_(o.ex_)
175  
            , ex_(o.ex_)
176  
            , claimed_(o.claimed_.load(
176  
            , claimed_(o.claimed_.load(
177  
                std::memory_order_relaxed))
177  
                std::memory_order_relaxed))
178  
            , canceled_(o.canceled_)
178  
            , canceled_(o.canceled_)
179  
            , active_(std::exchange(o.active_, false))
179  
            , active_(std::exchange(o.active_, false))
180  
            , in_list_(std::exchange(o.in_list_, false))
180  
            , in_list_(std::exchange(o.in_list_, false))
181  
        {
181  
        {
182  
        }
182  
        }
183  

183  

184  
        wait_awaiter(wait_awaiter const&) = delete;
184  
        wait_awaiter(wait_awaiter const&) = delete;
185  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
185  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
186  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
186  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
187  

187  

188  
        bool await_ready() const noexcept
188  
        bool await_ready() const noexcept
189  
        {
189  
        {
190  
            return e_->set_;
190  
            return e_->set_;
191  
        }
191  
        }
192  

192  

193  
        /** IoAwaitable protocol overload. */
193  
        /** IoAwaitable protocol overload. */
194  
        std::coroutine_handle<>
194  
        std::coroutine_handle<>
195  
        await_suspend(
195  
        await_suspend(
196  
            std::coroutine_handle<> h,
196  
            std::coroutine_handle<> h,
197  
            io_env const* env) noexcept
197  
            io_env const* env) noexcept
198  
        {
198  
        {
199  
            if(env->stop_token.stop_requested())
199  
            if(env->stop_token.stop_requested())
200  
            {
200  
            {
201  
                canceled_ = true;
201  
                canceled_ = true;
202  
                return h;
202  
                return h;
203  
            }
203  
            }
204  
            h_ = h;
204  
            h_ = h;
205  
            ex_ = env->executor;
205  
            ex_ = env->executor;
206  
            e_->waiters_.push_back(this);
206  
            e_->waiters_.push_back(this);
207  
            in_list_ = true;
207  
            in_list_ = true;
208  
            ::new(stop_cb_buf_) stop_cb_t(
208  
            ::new(stop_cb_buf_) stop_cb_t(
209  
                env->stop_token, cancel_fn{this});
209  
                env->stop_token, cancel_fn{this});
210  
            active_ = true;
210  
            active_ = true;
211  
            return std::noop_coroutine();
211  
            return std::noop_coroutine();
212  
        }
212  
        }
213  

213  

214  
        io_result<> await_resume() noexcept
214  
        io_result<> await_resume() noexcept
215  
        {
215  
        {
216  
            if(active_)
216  
            if(active_)
217  
            {
217  
            {
218  
                stop_cb_().~stop_cb_t();
218  
                stop_cb_().~stop_cb_t();
219  
                active_ = false;
219  
                active_ = false;
220  
            }
220  
            }
221  
            if(canceled_)
221  
            if(canceled_)
222  
            {
222  
            {
223  
                if(in_list_)
223  
                if(in_list_)
224  
                {
224  
                {
225  
                    e_->waiters_.remove(this);
225  
                    e_->waiters_.remove(this);
226  
                    in_list_ = false;
226  
                    in_list_ = false;
227  
                }
227  
                }
228  
                return {make_error_code(
228  
                return {make_error_code(
229  
                    error::canceled)};
229  
                    error::canceled)};
230  
            }
230  
            }
231  
            return {{}};
231  
            return {{}};
232  
        }
232  
        }
233  
    };
233  
    };
234  

234  

235  
    async_event() = default;
235  
    async_event() = default;
236  

236  

237  
    // Non-copyable, non-movable
237  
    // Non-copyable, non-movable
238  
    async_event(async_event const&) = delete;
238  
    async_event(async_event const&) = delete;
239  
    async_event& operator=(async_event const&) = delete;
239  
    async_event& operator=(async_event const&) = delete;
240  

240  

241  
    /** Returns an awaiter that waits until the event is set.
241  
    /** Returns an awaiter that waits until the event is set.
242  

242  

243  
        If the event is already set, completes immediately.
243  
        If the event is already set, completes immediately.
244  

244  

245  
        @return An awaitable yielding `(error_code)`.
245  
        @return An awaitable yielding `(error_code)`.
246  
    */
246  
    */
247  
    wait_awaiter wait() noexcept
247  
    wait_awaiter wait() noexcept
248  
    {
248  
    {
249  
        return wait_awaiter{this};
249  
        return wait_awaiter{this};
250  
    }
250  
    }
251  

251  

252  
    /** Sets the event.
252  
    /** Sets the event.
253  

253  

254  
        All waiting coroutines are resumed. Canceled waiters
254  
        All waiting coroutines are resumed. Canceled waiters
255  
        are skipped. Subsequent calls to wait() complete
255  
        are skipped. Subsequent calls to wait() complete
256  
        immediately until clear() is called.
256  
        immediately until clear() is called.
257  
    */
257  
    */
258  
    void set()
258  
    void set()
259  
    {
259  
    {
260  
        set_ = true;
260  
        set_ = true;
261  
        for(;;)
261  
        for(;;)
262  
        {
262  
        {
263  
            auto* w = waiters_.pop_front();
263  
            auto* w = waiters_.pop_front();
264  
            if(!w)
264  
            if(!w)
265  
                break;
265  
                break;
266  
            w->in_list_ = false;
266  
            w->in_list_ = false;
267  
            if(!w->claimed_.exchange(
267  
            if(!w->claimed_.exchange(
268  
                true, std::memory_order_acq_rel))
268  
                true, std::memory_order_acq_rel))
269  
            {
269  
            {
270  
                w->ex_.post(w->h_);
270  
                w->ex_.post(w->h_);
271  
            }
271  
            }
272  
        }
272  
        }
273  
    }
273  
    }
274  

274  

275  
    /** Clears the event.
275  
    /** Clears the event.
276  

276  

277  
        Subsequent calls to wait() will suspend until
277  
        Subsequent calls to wait() will suspend until
278  
        set() is called again.
278  
        set() is called again.
279  
    */
279  
    */
280  
    void clear() noexcept
280  
    void clear() noexcept
281  
    {
281  
    {
282  
        set_ = false;
282  
        set_ = false;
283  
    }
283  
    }
284  

284  

285  
    /** Returns true if the event is currently set.
285  
    /** Returns true if the event is currently set.
286  
    */
286  
    */
287  
    bool is_set() const noexcept
287  
    bool is_set() const noexcept
288  
    {
288  
    {
289  
        return set_;
289  
        return set_;
290  
    }
290  
    }
291  
};
291  
};
292  

292  

293  
} // namespace capy
293  
} // namespace capy
294  
} // namespace boost
294  
} // namespace boost
295  

295  

296  
#endif
296  
#endif