Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Channel.hpp
1#pragma once
2
3#include "util/async/Concepts.hpp"
4
5#include <boost/asio/any_io_executor.hpp>
6#include <boost/asio/experimental/channel.hpp>
7#include <boost/asio/experimental/concurrent_channel.hpp>
8#include <boost/asio/spawn.hpp>
9#include <boost/system/detail/error_code.hpp>
10
11#include <concepts>
12#include <cstddef>
13#include <memory>
14#include <optional>
15#include <type_traits>
16#include <utility>
17
18namespace util {
19
20#ifdef __clang__
21namespace detail {
22// Forward declaration for compile-time check
23template <typename T>
24struct ChannelInstantiated;
25} // namespace detail
26#endif
27
31enum class ProducerType {
36};
37
41enum class ConsumerType {
46};
47
60template <typename T, ProducerType P = ProducerType::Multi, ConsumerType C = ConsumerType::Multi>
61class Channel {
62 static constexpr bool kIS_MULTI_PRODUCER = (P == ProducerType::Multi);
63 static constexpr bool kIS_MULTI_CONSUMER = (C == ConsumerType::Multi);
64
65private:
66 class ControlBlock {
67 using InternalChannelType =
68 boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
69 boost::asio::any_io_executor executor_;
70 InternalChannelType ch_;
71
72 public:
73 template <typename ContextType>
75 ControlBlock(ContextType&& context, std::size_t capacity)
76 : executor_(context.get_executor()), ch_(context, capacity)
77 {
78 }
79
80 template <async::SomeExecutionContext ContextType>
81 ControlBlock(ContextType&& context, std::size_t capacity)
82 : executor_(context.getExecutor().get_executor()), ch_(context.getExecutor(), capacity)
83 {
84 }
85
86 [[nodiscard]] InternalChannelType&
87 channel()
88 {
89 return ch_;
90 }
91
92 void
93 close()
94 {
95 if (not isClosed()) {
96 ch_.close();
97 // Workaround for Boost bug: close() alone doesn't cancel pending async operations.
98 // We must call cancel() to unblock them. The bug also causes cancel() to return
99 // error_code 0 instead of channel_cancelled, so async operations must check
100 // isClosed() to detect this case.
101 // https://github.com/chriskohlhoff/asio/issues/1575
102 ch_.cancel();
103 }
104 }
105
106 [[nodiscard]] bool
107 isClosed() const
108 {
109 return not ch_.is_open();
110 }
111 };
112
117 struct Guard {
118 std::shared_ptr<ControlBlock> shared;
119
120 ~Guard()
121 {
122 shared->close();
123 }
124 };
125
126public:
135 class Sender {
136 std::shared_ptr<ControlBlock> shared_;
137 std::conditional_t<kIS_MULTI_PRODUCER, std::shared_ptr<Guard>, Guard> guard_;
138
139 friend class Channel<T, P, C>;
140
145 Sender(std::shared_ptr<ControlBlock> shared)
146 : shared_(shared), guard_([shared = std::move(shared)]() {
147 if constexpr (kIS_MULTI_PRODUCER) {
148 return std::make_shared<Guard>(std::move(shared));
149 } else {
150 return Guard{std::move(shared)};
151 }
152 }())
153 {
154 }
155
156 public:
157 Sender(Sender&&) = default;
158 Sender(Sender const&)
159 requires kIS_MULTI_PRODUCER
160 = default;
161 Sender(Sender const&)
162 requires(!kIS_MULTI_PRODUCER)
163 = delete;
164
165 Sender&
166 operator=(Sender&&) = default;
167 Sender&
168 operator=(Sender const&)
169 requires kIS_MULTI_PRODUCER
170 = default;
171 Sender&
172 operator=(Sender const&)
173 requires(!kIS_MULTI_PRODUCER)
174 = delete;
175
186 template <typename D>
187 bool
188 asyncSend(D&& data, boost::asio::yield_context yield)
189 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
190 {
191 boost::system::error_code const ecIn;
192 boost::system::error_code ecOut;
193 shared_->channel().async_send(ecIn, std::forward<D>(data), yield[ecOut]);
194
195 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
196 if (not ecOut and shared_->isClosed())
197 return false;
198
199 return not ecOut;
200 }
201
212 template <typename D>
213 void
214 asyncSend(D&& data, std::invocable<bool> auto&& fn)
215 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
216 {
217 boost::system::error_code const ecIn;
218 shared_->channel().async_send(
219 ecIn,
220 std::forward<D>(data),
221 [fn = std::forward<decltype(fn)>(fn),
222 shared = shared_](boost::system::error_code ec) mutable {
223 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed()
224 // instead
225 if (not ec and shared->isClosed()) {
226 fn(false);
227 return;
228 }
229
230 fn(not ec);
231 }
232 );
233 }
234
242 template <typename D>
243 bool
245 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
246 {
247 boost::system::error_code ec;
248 return shared_->channel().try_send(ec, std::forward<D>(data));
249 }
250 };
251
260 class Receiver {
261 std::shared_ptr<ControlBlock> shared_;
262 std::conditional_t<kIS_MULTI_CONSUMER, std::shared_ptr<Guard>, Guard> guard_;
263
264 friend class Channel<T, P, C>;
265
270 Receiver(std::shared_ptr<ControlBlock> shared)
271 : shared_(shared), guard_([shared = std::move(shared)]() {
272 if constexpr (kIS_MULTI_CONSUMER) {
273 return std::make_shared<Guard>(std::move(shared));
274 } else {
275 return Guard{std::move(shared)};
276 }
277 }())
278 {
279 }
280
281 public:
282 Receiver(Receiver&&) = default;
283 Receiver(Receiver const&)
284 requires kIS_MULTI_CONSUMER
285 = default;
286 Receiver(Receiver const&)
287 requires(!kIS_MULTI_CONSUMER)
288 = delete;
289
290 Receiver&
291 operator=(Receiver&&) = default;
292 Receiver&
293 operator=(Receiver const&)
294 requires kIS_MULTI_CONSUMER
295 = default;
296 Receiver&
297 operator=(Receiver const&)
298 requires(!kIS_MULTI_CONSUMER)
299 = delete;
300
307 std::optional<T>
309 {
310 std::optional<T> result;
311 shared_->channel().try_receive([&result](boost::system::error_code ec, auto&& value) {
312 if (not ec)
313 result = std::forward<decltype(value)>(value);
314 });
315
316 return result;
317 }
318
328 [[nodiscard]] std::optional<T>
329 asyncReceive(boost::asio::yield_context yield)
330 {
331 boost::system::error_code ec;
332 auto value = shared_->channel().async_receive(yield[ec]);
333
334 if (ec)
335 return std::nullopt;
336
337 return value;
338 }
339
348 void
349 asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>> auto&& fn)
350 {
351 shared_->channel().async_receive([fn = std::forward<decltype(fn)>(fn)](
352 boost::system::error_code ec, T&& value
353 ) mutable {
354 if (ec) {
355 fn(std::optional<T>(std::nullopt));
356 return;
357 }
358
359 fn(std::make_optional<T>(std::move(value)));
360 });
361 }
362
370 [[nodiscard]] bool
371 isClosed() const
372 {
373 return shared_->isClosed();
374 }
375 };
376
383 static std::pair<Sender, Receiver>
384 create(auto&& context, std::size_t capacity)
385 {
386#ifdef __clang__
387 static_assert(
388 util::detail::ChannelInstantiated<T>::value,
389 "When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
390 "to one .cpp file. See documentation at the bottom of Channel.hpp for details."
391 );
392#endif
393 auto shared =
394 std::make_shared<ControlBlock>(std::forward<decltype(context)>(context), capacity);
395 auto sender = Sender{shared};
396 auto receiver = Receiver{std::move(shared)};
397
398 return {std::move(sender), std::move(receiver)};
399 }
400};
401
402} // namespace util
403
404// ================================================================================================
405// Clang/Apple Clang Workaround for Boost.Asio Experimental Channels
406// ================================================================================================
407//
408// IMPORTANT: When using Channel<T> with Clang or Apple Clang, you MUST add the following line
409// to ONE .cpp file that uses Channel<T>:
410//
411// INSTANTIATE_CHANNEL_FOR_CLANG(YourType)
412//
413// Example:
414// // In ChannelTests.cpp or any .cpp file that uses Channel<int>:
415// #include "util/Channel.hpp"
416// INSTANTIATE_CHANNEL_FOR_CLANG(int)
417//
418// Why this is needed:
419// Boost.Asio's experimental concurrent_channel has a bug where close() doesn't properly cancel
420// pending async operations. When using cancellation signals (which we do in our workaround),
421// Clang generates vtable references for internal cancellation_handler types but Boost.Asio
422// doesn't provide the definitions, causing linker errors:
423//
424// Undefined symbols for architecture arm64:
425// "boost::asio::detail::cancellation_handler<...>::call(boost::asio::cancellation_type)"
426// "boost::asio::detail::cancellation_handler<...>::destroy()"
427//
428// This macro explicitly instantiates the required template specializations.
429//
430// See: https://github.com/chriskohlhoff/asio/issues/1575
431//
432#ifdef __clang__
433
434#include <boost/asio/cancellation_signal.hpp>
435#include <boost/asio/experimental/channel_traits.hpp>
436#include <boost/asio/experimental/detail/channel_service.hpp>
437
438namespace util::detail {
439// Tag type used to verify that INSTANTIATE_CHANNEL_FOR_CLANG was called for a given type
440template <typename T>
441struct ChannelInstantiated : std::false_type {};
442} // namespace util::detail
443
444#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
445 /* NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) */ \
446 template class boost::asio::detail::cancellation_handler< \
447 boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
448 op_cancellation< \
449 boost::asio::experimental::channel_traits<>, \
450 void(boost::system::error_code, T)>>; \
451 namespace util::detail { \
452 template <> \
453 struct ChannelInstantiated<T> : std::true_type {}; \
454 }
455
456#else
457
458// No workaround needed for non-Clang compilers
459#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
460
461#endif
The receiving end of a channel.
Definition Channel.hpp:260
std::optional< T > tryReceive()
Attempts to receive data from the channel without blocking.
Definition Channel.hpp:308
std::optional< T > asyncReceive(boost::asio::yield_context yield)
Asynchronously receives data from the channel using a coroutine.
Definition Channel.hpp:329
void asyncReceive(std::invocable< std::optional< std::remove_cvref_t< T > > > auto &&fn)
Asynchronously receives data from the channel using a callback.
Definition Channel.hpp:349
bool isClosed() const
Checks if the channel is closed.
Definition Channel.hpp:371
The sending end of a channel.
Definition Channel.hpp:135
void asyncSend(D &&data, std::invocable< bool > auto &&fn)
Asynchronously sends data through the channel using a callback.
Definition Channel.hpp:214
bool trySend(D &&data)
Attempts to send data through the channel without blocking.
Definition Channel.hpp:244
bool asyncSend(D &&data, boost::asio::yield_context yield)
Asynchronously sends data through the channel using a coroutine.
Definition Channel.hpp:188
Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pi...
Definition Channel.hpp:61
static std::pair< Sender, Receiver > create(auto &&context, std::size_t capacity)
Factory function to create channel components.
Definition Channel.hpp:384
Concept that identifies types derived from ExecutionContextTag.
Definition Concepts.hpp:33
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:56
This namespace contains various utilities.
Definition AccountUtils.hpp:11
ProducerType
Specifies the producer concurrency model for a Channel.
Definition Channel.hpp:31
@ Single
Definition Channel.hpp:32
@ Multi
Definition Channel.hpp:34
ConsumerType
Specifies the consumer concurrency model for a Channel.
Definition Channel.hpp:41
@ Multi
Definition Channel.hpp:44