Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Channel.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include <boost/asio/any_io_executor.hpp>
23#include <boost/asio/experimental/channel.hpp>
24#include <boost/asio/experimental/concurrent_channel.hpp>
25#include <boost/asio/spawn.hpp>
26#include <boost/system/detail/error_code.hpp>
27
28#include <concepts>
29#include <cstddef>
30#include <memory>
31#include <optional>
32#include <type_traits>
33#include <utility>
34
35namespace util {
36
37#ifdef __clang__
38namespace detail {
39// Forward declaration for compile-time check
40template <typename T>
41struct ChannelInstantiated;
42} // namespace detail
43#endif
44
52template <typename T>
53class Channel {
54private:
55 class ControlBlock {
56 using InternalChannelType = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
57 boost::asio::any_io_executor executor_;
58 InternalChannelType ch_;
59
60 public:
61 ControlBlock(auto&& context, std::size_t capacity) : executor_(context.get_executor()), ch_(context, capacity)
62 {
63 }
64
65 [[nodiscard]] InternalChannelType&
66 channel()
67 {
68 return ch_;
69 }
70
71 void
72 close()
73 {
74 if (not isClosed()) {
75 ch_.close();
76 // Workaround for Boost bug: close() alone doesn't cancel pending async operations.
77 // We must call cancel() to unblock them. The bug also causes cancel() to return
78 // error_code 0 instead of channel_cancelled, so async operations must check
79 // isClosed() to detect this case.
80 // https://github.com/chriskohlhoff/asio/issues/1575
81 ch_.cancel();
82 }
83 }
84
85 [[nodiscard]] bool
86 isClosed() const
87 {
88 return not ch_.is_open();
89 }
90 };
91
95 struct Guard {
96 std::shared_ptr<ControlBlock> shared;
97
98 ~Guard()
99 {
100 shared->close();
101 }
102 };
103
110 class Sender {
111 std::shared_ptr<ControlBlock> shared_;
112 std::shared_ptr<Guard> guard_;
113
114 public:
119 Sender(std::shared_ptr<ControlBlock> shared)
120 : shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
121
122 Sender(Sender&&) = default;
123 Sender(Sender const&) = default;
124 Sender&
125 operator=(Sender&&) = default;
126 Sender&
127 operator=(Sender const&) = default;
128
139 template <typename D>
140 bool
141 asyncSend(D&& data, boost::asio::yield_context yield)
142 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
143 {
144 boost::system::error_code ecIn, ecOut;
145 shared_->channel().async_send(ecIn, std::forward<D>(data), yield[ecOut]);
146
147 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
148 if (not ecOut and shared_->isClosed())
149 return false;
150
151 return not ecOut;
152 }
153
163 template <typename D>
164 void
165 asyncSend(D&& data, std::invocable<bool> auto&& fn)
166 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
167 {
168 boost::system::error_code const ecIn;
169 shared_->channel().async_send(
170 ecIn,
171 std::forward<D>(data),
172 [fn = std::forward<decltype(fn)>(fn), shared = shared_](boost::system::error_code ec) mutable {
173 // Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
174 if (not ec and shared->isClosed()) {
175 fn(false);
176 return;
177 }
178
179 fn(not ec);
180 }
181 );
182 }
183
191 template <typename D>
192 bool
193 trySend(D&& data)
194 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
195 {
196 boost::system::error_code ec;
197 return shared_->channel().try_send(ec, std::forward<D>(data));
198 }
199 };
200
207 class Receiver {
208 std::shared_ptr<ControlBlock> shared_;
209 std::shared_ptr<Guard> guard_;
210
211 public:
216 Receiver(std::shared_ptr<ControlBlock> shared)
217 : shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
218
219 Receiver(Receiver&&) = default;
220 Receiver(Receiver const&) = default;
221 Receiver&
222 operator=(Receiver&&) = default;
223 Receiver&
224 operator=(Receiver const&) = default;
225
231 std::optional<T>
232 tryReceive()
233 {
234 std::optional<T> result;
235 shared_->channel().try_receive([&result](boost::system::error_code ec, auto&& value) {
236 if (not ec)
237 result = std::forward<decltype(value)>(value);
238 });
239
240 return result;
241 }
242
251 [[nodiscard]] std::optional<T>
252 asyncReceive(boost::asio::yield_context yield)
253 {
254 boost::system::error_code ec;
255 auto value = shared_->channel().async_receive(yield[ec]);
256
257 if (ec)
258 return std::nullopt;
259
260 return value;
261 }
262
270 void
271 asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>> auto&& fn)
272 {
273 shared_->channel().async_receive(
274 [fn = std::forward<decltype(fn)>(fn)](boost::system::error_code ec, T&& value) mutable {
275 if (ec) {
276 fn(std::optional<T>(std::nullopt));
277 return;
278 }
279
280 fn(std::make_optional<T>(std::move(value)));
281 }
282 );
283 }
284
292 [[nodiscard]] bool
293 isClosed() const
294 {
295 return shared_->isClosed();
296 }
297 };
298
299public:
306 static std::pair<Sender, Receiver>
307 create(auto&& context, std::size_t capacity)
308 {
309#ifdef __clang__
310 static_assert(
311 util::detail::ChannelInstantiated<T>::value,
312 "When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
313 "to one .cpp file. See documentation at the bottom of Channel.hpp for details."
314 );
315#endif
316 auto shared = std::make_shared<ControlBlock>(std::forward<decltype(context)>(context), capacity);
317 auto sender = Sender{shared};
318 auto receiver = Receiver{std::move(shared)};
319
320 return {std::move(sender), std::move(receiver)};
321 }
322};
323
324} // namespace util
325
326// ================================================================================================
327// Clang/Apple Clang Workaround for Boost.Asio Experimental Channels
328// ================================================================================================
329//
330// IMPORTANT: When using Channel<T> with Clang or Apple Clang, you MUST add the following line
331// to ONE .cpp file that uses Channel<T>:
332//
333// INSTANTIATE_CHANNEL_FOR_CLANG(YourType)
334//
335// Example:
336// // In ChannelTests.cpp or any .cpp file that uses Channel<int>:
337// #include "util/Channel.hpp"
338// INSTANTIATE_CHANNEL_FOR_CLANG(int)
339//
340// Why this is needed:
341// Boost.Asio's experimental concurrent_channel has a bug where close() doesn't properly cancel
342// pending async operations. When using cancellation signals (which we do in our workaround),
343// Clang generates vtable references for internal cancellation_handler types but Boost.Asio
344// doesn't provide the definitions, causing linker errors:
345//
346// Undefined symbols for architecture arm64:
347// "boost::asio::detail::cancellation_handler<...>::call(boost::asio::cancellation_type)"
348// "boost::asio::detail::cancellation_handler<...>::destroy()"
349//
350// This macro explicitly instantiates the required template specializations.
351//
352// See: https://github.com/chriskohlhoff/asio/issues/1575
353//
354#ifdef __clang__
355
356#include <boost/asio/cancellation_signal.hpp>
357#include <boost/asio/experimental/channel_traits.hpp>
358#include <boost/asio/experimental/detail/channel_service.hpp>
359
360namespace util::detail {
361// Tag type used to verify that INSTANTIATE_CHANNEL_FOR_CLANG was called for a given type
362template <typename T>
363struct ChannelInstantiated : std::false_type {};
364} // namespace util::detail
365
366#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
367 /* NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) */ \
368 template class boost::asio::detail::cancellation_handler< \
369 boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
370 op_cancellation<boost::asio::experimental::channel_traits<>, void(boost::system::error_code, T)>>; \
371 namespace util::detail { \
372 template <> \
373 struct ChannelInstantiated<T> : std::true_type {}; \
374 }
375
376#else
377
378// No workaround needed for non-Clang compilers
379#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
380
381#endif
Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pi...
Definition Channel.hpp:53
static std::pair< Sender, Receiver > create(auto &&context, std::size_t capacity)
Factory function to create channel components.
Definition Channel.hpp:307
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
This namespace contains various utilities.
Definition AccountUtils.hpp:30