Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Channel.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/async/Concepts.hpp"
23
24#include <boost/asio/any_io_executor.hpp>
25#include <boost/asio/experimental/channel.hpp>
26#include <boost/asio/experimental/concurrent_channel.hpp>
27#include <boost/asio/spawn.hpp>
28#include <boost/system/detail/error_code.hpp>
29
30#include <concepts>
31#include <cstddef>
32#include <memory>
33#include <optional>
34#include <type_traits>
35#include <utility>
36
37namespace util {
38
39#ifdef __clang__
40namespace detail {
41// Forward declaration for compile-time check
42template <typename T>
43struct ChannelInstantiated;
44} // namespace detail
45#endif
46
50enum class ProducerType {
55};
56
60enum class ConsumerType {
65};
66
79template <typename T, ProducerType P = ProducerType::Multi, ConsumerType C = ConsumerType::Multi>
80class Channel {
81 static constexpr bool kIS_MULTI_PRODUCER = (P == ProducerType::Multi);
82 static constexpr bool kIS_MULTI_CONSUMER = (C == ConsumerType::Multi);
83
84private:
85 class ControlBlock {
86 using InternalChannelType =
87 boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
88 boost::asio::any_io_executor executor_;
89 InternalChannelType ch_;
90
91 public:
92 template <typename ContextType>
94 ControlBlock(ContextType&& context, std::size_t capacity)
95 : executor_(context.get_executor()), ch_(context, capacity)
96 {
97 }
98
99 template <async::SomeExecutionContext ContextType>
100 ControlBlock(ContextType&& context, std::size_t capacity)
101 : executor_(context.getExecutor().get_executor()), ch_(context.getExecutor(), capacity)
102 {
103 }
104
105 [[nodiscard]] InternalChannelType&
106 channel()
107 {
108 return ch_;
109 }
110
111 void
112 close()
113 {
114 if (not isClosed()) {
115 ch_.close();
116 // Workaround for Boost bug: close() alone doesn't cancel pending async operations.
117 // We must call cancel() to unblock them. The bug also causes cancel() to return
118 // error_code 0 instead of channel_cancelled, so async operations must check
119 // isClosed() to detect this case.
120 // https://github.com/chriskohlhoff/asio/issues/1575
121 ch_.cancel();
122 }
123 }
124
125 [[nodiscard]] bool
126 isClosed() const
127 {
128 return not ch_.is_open();
129 }
130 };
131
136 struct Guard {
137 std::shared_ptr<ControlBlock> shared;
138
139 ~Guard()
140 {
141 shared->close();
142 }
143 };
144
145public:
154 class Sender {
155 std::shared_ptr<ControlBlock> shared_;
156 std::conditional_t<kIS_MULTI_PRODUCER, std::shared_ptr<Guard>, Guard> guard_;
157
158 friend class Channel<T, P, C>;
159
164 Sender(std::shared_ptr<ControlBlock> shared)
165 : shared_(shared), guard_([shared = std::move(shared)]() {
166 if constexpr (kIS_MULTI_PRODUCER) {
167 return std::make_shared<Guard>(std::move(shared));
168 } else {
169 return Guard{std::move(shared)};
170 }
171 }())
172 {
173 }
174
175 public:
176 Sender(Sender&&) = default;
177 Sender(Sender const&)
178 requires kIS_MULTI_PRODUCER
179 = default;
180 Sender(Sender const&)
181 requires(!kIS_MULTI_PRODUCER)
182 = delete;
183
184 Sender&
185 operator=(Sender&&) = default;
186 Sender&
187 operator=(Sender const&)
188 requires kIS_MULTI_PRODUCER
189 = default;
190 Sender&
191 operator=(Sender const&)
192 requires(!kIS_MULTI_PRODUCER)
193 = delete;
194
205 template <typename D>
206 bool
207 asyncSend(D&& data, boost::asio::yield_context yield)
208 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
209 {
210 boost::system::error_code const ecIn;
211 boost::system::error_code ecOut;
212 shared_->channel().async_send(ecIn, std::forward<D>(data), yield[ecOut]);
213
214 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
215 if (not ecOut and shared_->isClosed())
216 return false;
217
218 return not ecOut;
219 }
220
231 template <typename D>
232 void
233 asyncSend(D&& data, std::invocable<bool> auto&& fn)
234 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
235 {
236 boost::system::error_code const ecIn;
237 shared_->channel().async_send(
238 ecIn,
239 std::forward<D>(data),
240 [fn = std::forward<decltype(fn)>(fn),
241 shared = shared_](boost::system::error_code ec) mutable {
242 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed()
243 // instead
244 if (not ec and shared->isClosed()) {
245 fn(false);
246 return;
247 }
248
249 fn(not ec);
250 }
251 );
252 }
253
261 template <typename D>
262 bool
264 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
265 {
266 boost::system::error_code ec;
267 return shared_->channel().try_send(ec, std::forward<D>(data));
268 }
269 };
270
279 class Receiver {
280 std::shared_ptr<ControlBlock> shared_;
281 std::conditional_t<kIS_MULTI_CONSUMER, std::shared_ptr<Guard>, Guard> guard_;
282
283 friend class Channel<T, P, C>;
284
289 Receiver(std::shared_ptr<ControlBlock> shared)
290 : shared_(shared), guard_([shared = std::move(shared)]() {
291 if constexpr (kIS_MULTI_CONSUMER) {
292 return std::make_shared<Guard>(std::move(shared));
293 } else {
294 return Guard{std::move(shared)};
295 }
296 }())
297 {
298 }
299
300 public:
301 Receiver(Receiver&&) = default;
302 Receiver(Receiver const&)
303 requires kIS_MULTI_CONSUMER
304 = default;
305 Receiver(Receiver const&)
306 requires(!kIS_MULTI_CONSUMER)
307 = delete;
308
309 Receiver&
310 operator=(Receiver&&) = default;
311 Receiver&
312 operator=(Receiver const&)
313 requires kIS_MULTI_CONSUMER
314 = default;
315 Receiver&
316 operator=(Receiver const&)
317 requires(!kIS_MULTI_CONSUMER)
318 = delete;
319
326 std::optional<T>
328 {
329 std::optional<T> result;
330 shared_->channel().try_receive([&result](boost::system::error_code ec, auto&& value) {
331 if (not ec)
332 result = std::forward<decltype(value)>(value);
333 });
334
335 return result;
336 }
337
347 [[nodiscard]] std::optional<T>
348 asyncReceive(boost::asio::yield_context yield)
349 {
350 boost::system::error_code ec;
351 auto value = shared_->channel().async_receive(yield[ec]);
352
353 if (ec)
354 return std::nullopt;
355
356 return value;
357 }
358
367 void
368 asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>> auto&& fn)
369 {
370 shared_->channel().async_receive([fn = std::forward<decltype(fn)>(fn)](
371 boost::system::error_code ec, T&& value
372 ) mutable {
373 if (ec) {
374 fn(std::optional<T>(std::nullopt));
375 return;
376 }
377
378 fn(std::make_optional<T>(std::move(value)));
379 });
380 }
381
389 [[nodiscard]] bool
390 isClosed() const
391 {
392 return shared_->isClosed();
393 }
394 };
395
402 static std::pair<Sender, Receiver>
403 create(auto&& context, std::size_t capacity)
404 {
405#ifdef __clang__
406 static_assert(
407 util::detail::ChannelInstantiated<T>::value,
408 "When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
409 "to one .cpp file. See documentation at the bottom of Channel.hpp for details."
410 );
411#endif
412 auto shared =
413 std::make_shared<ControlBlock>(std::forward<decltype(context)>(context), capacity);
414 auto sender = Sender{shared};
415 auto receiver = Receiver{std::move(shared)};
416
417 return {std::move(sender), std::move(receiver)};
418 }
419};
420
421} // namespace util
422
423// ================================================================================================
424// Clang/Apple Clang Workaround for Boost.Asio Experimental Channels
425// ================================================================================================
426//
427// IMPORTANT: When using Channel<T> with Clang or Apple Clang, you MUST add the following line
428// to ONE .cpp file that uses Channel<T>:
429//
430// INSTANTIATE_CHANNEL_FOR_CLANG(YourType)
431//
432// Example:
433// // In ChannelTests.cpp or any .cpp file that uses Channel<int>:
434// #include "util/Channel.hpp"
435// INSTANTIATE_CHANNEL_FOR_CLANG(int)
436//
437// Why this is needed:
438// Boost.Asio's experimental concurrent_channel has a bug where close() doesn't properly cancel
439// pending async operations. When using cancellation signals (which we do in our workaround),
440// Clang generates vtable references for internal cancellation_handler types but Boost.Asio
441// doesn't provide the definitions, causing linker errors:
442//
443// Undefined symbols for architecture arm64:
444// "boost::asio::detail::cancellation_handler<...>::call(boost::asio::cancellation_type)"
445// "boost::asio::detail::cancellation_handler<...>::destroy()"
446//
447// This macro explicitly instantiates the required template specializations.
448//
449// See: https://github.com/chriskohlhoff/asio/issues/1575
450//
451#ifdef __clang__
452
453#include <boost/asio/cancellation_signal.hpp>
454#include <boost/asio/experimental/channel_traits.hpp>
455#include <boost/asio/experimental/detail/channel_service.hpp>
456
457namespace util::detail {
458// Tag type used to verify that INSTANTIATE_CHANNEL_FOR_CLANG was called for a given type
459template <typename T>
460struct ChannelInstantiated : std::false_type {};
461} // namespace util::detail
462
463#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
464 /* NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) */ \
465 template class boost::asio::detail::cancellation_handler< \
466 boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
467 op_cancellation< \
468 boost::asio::experimental::channel_traits<>, \
469 void(boost::system::error_code, T)>>; \
470 namespace util::detail { \
471 template <> \
472 struct ChannelInstantiated<T> : std::true_type {}; \
473 }
474
475#else
476
477// No workaround needed for non-Clang compilers
478#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
479
480#endif
The receiving end of a channel.
Definition Channel.hpp:279
std::optional< T > tryReceive()
Attempts to receive data from the channel without blocking.
Definition Channel.hpp:327
std::optional< T > asyncReceive(boost::asio::yield_context yield)
Asynchronously receives data from the channel using a coroutine.
Definition Channel.hpp:348
void asyncReceive(std::invocable< std::optional< std::remove_cvref_t< T > > > auto &&fn)
Asynchronously receives data from the channel using a callback.
Definition Channel.hpp:368
bool isClosed() const
Checks if the channel is closed.
Definition Channel.hpp:390
The sending end of a channel.
Definition Channel.hpp:154
void asyncSend(D &&data, std::invocable< bool > auto &&fn)
Asynchronously sends data through the channel using a callback.
Definition Channel.hpp:233
bool trySend(D &&data)
Attempts to send data through the channel without blocking.
Definition Channel.hpp:263
bool asyncSend(D &&data, boost::asio::yield_context yield)
Asynchronously sends data through the channel using a coroutine.
Definition Channel.hpp:207
Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pi...
Definition Channel.hpp:80
static std::pair< Sender, Receiver > create(auto &&context, std::size_t capacity)
Factory function to create channel components.
Definition Channel.hpp:403
Concept that identifies types derived from ExecutionContextTag.
Definition Concepts.hpp:52
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:75
This namespace contains various utilities.
Definition AccountUtils.hpp:30
ProducerType
Specifies the producer concurrency model for a Channel.
Definition Channel.hpp:50
@ Single
Definition Channel.hpp:51
@ Multi
Definition Channel.hpp:53
ConsumerType
Specifies the consumer concurrency model for a Channel.
Definition Channel.hpp:60
@ Multi
Definition Channel.hpp:63