Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Channel.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/async/Concepts.hpp"
23
24#include <boost/asio/any_io_executor.hpp>
25#include <boost/asio/experimental/channel.hpp>
26#include <boost/asio/experimental/concurrent_channel.hpp>
27#include <boost/asio/spawn.hpp>
28#include <boost/system/detail/error_code.hpp>
29
30#include <concepts>
31#include <cstddef>
32#include <memory>
33#include <optional>
34#include <type_traits>
35#include <utility>
36
37namespace util {
38
39#ifdef __clang__
40namespace detail {
41// Forward declaration for compile-time check
42template <typename T>
43struct ChannelInstantiated;
44} // namespace detail
45#endif
46
50enum class ProducerType {
53};
54
58enum class ConsumerType {
61};
62
72template <typename T, ProducerType P = ProducerType::Multi, ConsumerType C = ConsumerType::Multi>
73class Channel {
74 static constexpr bool kIS_MULTI_PRODUCER = (P == ProducerType::Multi);
75 static constexpr bool kIS_MULTI_CONSUMER = (C == ConsumerType::Multi);
76
77private:
78 class ControlBlock {
79 using InternalChannelType = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
80 boost::asio::any_io_executor executor_;
81 InternalChannelType ch_;
82
83 public:
84 template <typename ContextType>
86 ControlBlock(ContextType&& context, std::size_t capacity)
87 : executor_(context.get_executor()), ch_(context, capacity)
88 {
89 }
90
91 template <async::SomeExecutionContext ContextType>
92 ControlBlock(ContextType&& context, std::size_t capacity)
93 : executor_(context.getExecutor().get_executor()), ch_(context.getExecutor(), capacity)
94 {
95 }
96
97 [[nodiscard]] InternalChannelType&
98 channel()
99 {
100 return ch_;
101 }
102
103 void
104 close()
105 {
106 if (not isClosed()) {
107 ch_.close();
108 // Workaround for Boost bug: close() alone doesn't cancel pending async operations.
109 // We must call cancel() to unblock them. The bug also causes cancel() to return
110 // error_code 0 instead of channel_cancelled, so async operations must check
111 // isClosed() to detect this case.
112 // https://github.com/chriskohlhoff/asio/issues/1575
113 ch_.cancel();
114 }
115 }
116
117 [[nodiscard]] bool
118 isClosed() const
119 {
120 return not ch_.is_open();
121 }
122 };
123
127 struct Guard {
128 std::shared_ptr<ControlBlock> shared;
129
130 ~Guard()
131 {
132 shared->close();
133 }
134 };
135
136public:
144 class Sender {
145 std::shared_ptr<ControlBlock> shared_;
146 std::conditional_t<kIS_MULTI_PRODUCER, std::shared_ptr<Guard>, Guard> guard_;
147
148 friend class Channel<T, P, C>;
149
154 Sender(std::shared_ptr<ControlBlock> shared)
155 : shared_(shared), guard_([shared = std::move(shared)]() {
156 if constexpr (kIS_MULTI_PRODUCER) {
157 return std::make_shared<Guard>(std::move(shared));
158 } else {
159 return Guard{std::move(shared)};
160 }
161 }())
162 {
163 }
164
165 public:
166 Sender(Sender&&) = default;
167 Sender(Sender const&)
168 requires kIS_MULTI_PRODUCER
169 = default;
170 Sender(Sender const&)
171 requires(!kIS_MULTI_PRODUCER)
172 = delete;
173
174 Sender&
175 operator=(Sender&&) = default;
176 Sender&
177 operator=(Sender const&)
178 requires kIS_MULTI_PRODUCER
179 = default;
180 Sender&
181 operator=(Sender const&)
182 requires(!kIS_MULTI_PRODUCER)
183 = delete;
184
195 template <typename D>
196 bool
197 asyncSend(D&& data, boost::asio::yield_context yield)
198 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
199 {
200 boost::system::error_code const ecIn;
201 boost::system::error_code ecOut;
202 shared_->channel().async_send(ecIn, std::forward<D>(data), yield[ecOut]);
203
204 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
205 if (not ecOut and shared_->isClosed())
206 return false;
207
208 return not ecOut;
209 }
210
220 template <typename D>
221 void
222 asyncSend(D&& data, std::invocable<bool> auto&& fn)
223 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
224 {
225 boost::system::error_code const ecIn;
226 shared_->channel().async_send(
227 ecIn,
228 std::forward<D>(data),
229 [fn = std::forward<decltype(fn)>(fn), shared = shared_](boost::system::error_code ec) mutable {
230 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
231 if (not ec and shared->isClosed()) {
232 fn(false);
233 return;
234 }
235
236 fn(not ec);
237 }
238 );
239 }
240
248 template <typename D>
249 bool
251 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
252 {
253 boost::system::error_code ec;
254 return shared_->channel().try_send(ec, std::forward<D>(data));
255 }
256 };
257
265 class Receiver {
266 std::shared_ptr<ControlBlock> shared_;
267 std::conditional_t<kIS_MULTI_CONSUMER, std::shared_ptr<Guard>, Guard> guard_;
268
269 friend class Channel<T, P, C>;
270
275 Receiver(std::shared_ptr<ControlBlock> shared)
276 : shared_(shared), guard_([shared = std::move(shared)]() {
277 if constexpr (kIS_MULTI_CONSUMER) {
278 return std::make_shared<Guard>(std::move(shared));
279 } else {
280 return Guard{std::move(shared)};
281 }
282 }())
283 {
284 }
285
286 public:
287 Receiver(Receiver&&) = default;
288 Receiver(Receiver const&)
289 requires kIS_MULTI_CONSUMER
290 = default;
291 Receiver(Receiver const&)
292 requires(!kIS_MULTI_CONSUMER)
293 = delete;
294
295 Receiver&
296 operator=(Receiver&&) = default;
297 Receiver&
298 operator=(Receiver const&)
299 requires kIS_MULTI_CONSUMER
300 = default;
301 Receiver&
302 operator=(Receiver const&)
303 requires(!kIS_MULTI_CONSUMER)
304 = delete;
305
311 std::optional<T>
313 {
314 std::optional<T> result;
315 shared_->channel().try_receive([&result](boost::system::error_code ec, auto&& value) {
316 if (not ec)
317 result = std::forward<decltype(value)>(value);
318 });
319
320 return result;
321 }
322
331 [[nodiscard]] std::optional<T>
332 asyncReceive(boost::asio::yield_context yield)
333 {
334 boost::system::error_code ec;
335 auto value = shared_->channel().async_receive(yield[ec]);
336
337 if (ec)
338 return std::nullopt;
339
340 return value;
341 }
342
350 void
351 asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>> auto&& fn)
352 {
353 shared_->channel().async_receive(
354 [fn = std::forward<decltype(fn)>(fn)](boost::system::error_code ec, T&& value) mutable {
355 if (ec) {
356 fn(std::optional<T>(std::nullopt));
357 return;
358 }
359
360 fn(std::make_optional<T>(std::move(value)));
361 }
362 );
363 }
364
372 [[nodiscard]] bool
373 isClosed() const
374 {
375 return shared_->isClosed();
376 }
377 };
378
385 static std::pair<Sender, Receiver>
386 create(auto&& context, std::size_t capacity)
387 {
388#ifdef __clang__
389 static_assert(
390 util::detail::ChannelInstantiated<T>::value,
391 "When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
392 "to one .cpp file. See documentation at the bottom of Channel.hpp for details."
393 );
394#endif
395 auto shared = std::make_shared<ControlBlock>(std::forward<decltype(context)>(context), capacity);
396 auto sender = Sender{shared};
397 auto receiver = Receiver{std::move(shared)};
398
399 return {std::move(sender), std::move(receiver)};
400 }
401};
402
403} // namespace util
404
405// ================================================================================================
406// Clang/Apple Clang Workaround for Boost.Asio Experimental Channels
407// ================================================================================================
408//
409// IMPORTANT: When using Channel<T> with Clang or Apple Clang, you MUST add the following line
410// to ONE .cpp file that uses Channel<T>:
411//
412// INSTANTIATE_CHANNEL_FOR_CLANG(YourType)
413//
414// Example:
415// // In ChannelTests.cpp or any .cpp file that uses Channel<int>:
416// #include "util/Channel.hpp"
417// INSTANTIATE_CHANNEL_FOR_CLANG(int)
418//
419// Why this is needed:
420// Boost.Asio's experimental concurrent_channel has a bug where close() doesn't properly cancel
421// pending async operations. When using cancellation signals (which we do in our workaround),
422// Clang generates vtable references for internal cancellation_handler types but Boost.Asio
423// doesn't provide the definitions, causing linker errors:
424//
425// Undefined symbols for architecture arm64:
426// "boost::asio::detail::cancellation_handler<...>::call(boost::asio::cancellation_type)"
427// "boost::asio::detail::cancellation_handler<...>::destroy()"
428//
429// This macro explicitly instantiates the required template specializations.
430//
431// See: https://github.com/chriskohlhoff/asio/issues/1575
432//
433#ifdef __clang__
434
435#include <boost/asio/cancellation_signal.hpp>
436#include <boost/asio/experimental/channel_traits.hpp>
437#include <boost/asio/experimental/detail/channel_service.hpp>
438
439namespace util::detail {
440// Tag type used to verify that INSTANTIATE_CHANNEL_FOR_CLANG was called for a given type
441template <typename T>
442struct ChannelInstantiated : std::false_type {};
443} // namespace util::detail
444
445#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
446 /* NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) */ \
447 template class boost::asio::detail::cancellation_handler< \
448 boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
449 op_cancellation<boost::asio::experimental::channel_traits<>, void(boost::system::error_code, T)>>; \
450 namespace util::detail { \
451 template <> \
452 struct ChannelInstantiated<T> : std::true_type {}; \
453 }
454
455#else
456
457// No workaround needed for non-Clang compilers
458#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
459
460#endif
The receiving end of a channel.
Definition Channel.hpp:265
std::optional< T > tryReceive()
Attempts to receive data from the channel without blocking.
Definition Channel.hpp:312
std::optional< T > asyncReceive(boost::asio::yield_context yield)
Asynchronously receives data from the channel using a coroutine.
Definition Channel.hpp:332
void asyncReceive(std::invocable< std::optional< std::remove_cvref_t< T > > > auto &&fn)
Asynchronously receives data from the channel using a callback.
Definition Channel.hpp:351
bool isClosed() const
Checks if the channel is closed.
Definition Channel.hpp:373
The sending end of a channel.
Definition Channel.hpp:144
void asyncSend(D &&data, std::invocable< bool > auto &&fn)
Asynchronously sends data through the channel using a callback.
Definition Channel.hpp:222
bool trySend(D &&data)
Attempts to send data through the channel without blocking.
Definition Channel.hpp:250
bool asyncSend(D &&data, boost::asio::yield_context yield)
Asynchronously sends data through the channel using a coroutine.
Definition Channel.hpp:197
Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pi...
Definition Channel.hpp:73
static std::pair< Sender, Receiver > create(auto &&context, std::size_t capacity)
Factory function to create channel components.
Definition Channel.hpp:386
Concept that identifies types derived from ExecutionContextTag.
Definition Concepts.hpp:51
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
This namespace contains various utilities.
Definition AccountUtils.hpp:30
ProducerType
Specifies the producer concurrency model for a Channel.
Definition Channel.hpp:50
@ Single
Definition Channel.hpp:51
@ Multi
Definition Channel.hpp:52
ConsumerType
Specifies the consumer concurrency model for a Channel.
Definition Channel.hpp:58
@ Multi
Definition Channel.hpp:60