22#include "util/async/Concepts.hpp"
24#include <boost/asio/any_io_executor.hpp>
25#include <boost/asio/experimental/channel.hpp>
26#include <boost/asio/experimental/concurrent_channel.hpp>
27#include <boost/asio/spawn.hpp>
28#include <boost/system/detail/error_code.hpp>
43struct ChannelInstantiated;
72template <
typename T, ProducerType P = ProducerType::Multi, ConsumerType C = ConsumerType::Multi>
79 using InternalChannelType = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
80 boost::asio::any_io_executor executor_;
81 InternalChannelType ch_;
84 template <
typename ContextType>
86 ControlBlock(ContextType&& context, std::size_t capacity)
87 : executor_(context.get_executor()), ch_(context, capacity)
91 template <async::SomeExecutionContext ContextType>
92 ControlBlock(ContextType&& context, std::size_t capacity)
93 : executor_(context.getExecutor().get_executor()), ch_(context.getExecutor(), capacity)
97 [[nodiscard]] InternalChannelType&
106 if (not isClosed()) {
120 return not ch_.is_open();
128 std::shared_ptr<ControlBlock> shared;
145 std::shared_ptr<ControlBlock> shared_;
146 std::conditional_t<kIS_MULTI_PRODUCER, std::shared_ptr<Guard>, Guard> guard_;
154 Sender(std::shared_ptr<ControlBlock> shared)
155 : shared_(shared), guard_([shared = std::move(shared)]() {
156 if constexpr (kIS_MULTI_PRODUCER) {
157 return std::make_shared<Guard>(std::move(shared));
159 return Guard{std::move(shared)};
166 Sender(Sender&&) =
default;
167 Sender(Sender
const&)
168 requires kIS_MULTI_PRODUCER
170 Sender(Sender
const&)
171 requires(!kIS_MULTI_PRODUCER)
175 operator=(Sender&&) =
default;
177 operator=(Sender
const&)
178 requires kIS_MULTI_PRODUCER
181 operator=(Sender
const&)
182 requires(!kIS_MULTI_PRODUCER)
195 template <
typename D>
198 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
200 boost::system::error_code
const ecIn;
201 boost::system::error_code ecOut;
202 shared_->channel().async_send(ecIn, std::forward<D>(
data), yield[ecOut]);
205 if (not ecOut and shared_->isClosed())
220 template <
typename D>
223 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
225 boost::system::error_code
const ecIn;
226 shared_->channel().async_send(
228 std::forward<D>(
data),
229 [fn = std::forward<
decltype(fn)>(fn), shared = shared_](boost::system::error_code ec)
mutable {
231 if (not ec and shared->isClosed()) {
248 template <
typename D>
251 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
253 boost::system::error_code ec;
254 return shared_->channel().try_send(ec, std::forward<D>(
data));
266 std::shared_ptr<ControlBlock> shared_;
267 std::conditional_t<kIS_MULTI_CONSUMER, std::shared_ptr<Guard>, Guard> guard_;
275 Receiver(std::shared_ptr<ControlBlock> shared)
276 : shared_(shared), guard_([shared = std::move(shared)]() {
277 if constexpr (kIS_MULTI_CONSUMER) {
278 return std::make_shared<Guard>(std::move(shared));
280 return Guard{std::move(shared)};
287 Receiver(Receiver&&) =
default;
288 Receiver(Receiver
const&)
289 requires kIS_MULTI_CONSUMER
291 Receiver(Receiver
const&)
292 requires(!kIS_MULTI_CONSUMER)
296 operator=(Receiver&&) =
default;
298 operator=(Receiver
const&)
299 requires kIS_MULTI_CONSUMER
302 operator=(Receiver
const&)
303 requires(!kIS_MULTI_CONSUMER)
314 std::optional<T> result;
315 shared_->channel().try_receive([&result](boost::system::error_code ec,
auto&& value) {
317 result = std::forward<decltype(value)>(value);
331 [[nodiscard]] std::optional<T>
334 boost::system::error_code ec;
335 auto value = shared_->channel().async_receive(yield[ec]);
351 asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>>
auto&& fn)
353 shared_->channel().async_receive(
354 [fn = std::forward<
decltype(fn)>(fn)](boost::system::error_code ec, T&& value)
mutable {
356 fn(std::optional<T>(std::nullopt));
360 fn(std::make_optional<T>(std::move(value)));
375 return shared_->isClosed();
385 static std::pair<Sender, Receiver>
386 create(
auto&& context, std::size_t capacity)
390 util::detail::ChannelInstantiated<T>::value,
391 "When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
392 "to one .cpp file. See documentation at the bottom of Channel.hpp for details."
395 auto shared = std::make_shared<ControlBlock>(std::forward<
decltype(context)>(context), capacity);
396 auto sender =
Sender{shared};
397 auto receiver =
Receiver{std::move(shared)};
399 return {std::move(sender), std::move(receiver)};
435#include <boost/asio/cancellation_signal.hpp>
436#include <boost/asio/experimental/channel_traits.hpp>
437#include <boost/asio/experimental/detail/channel_service.hpp>
439namespace util::detail {
442struct ChannelInstantiated : std::false_type {};
445#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
447 template class boost::asio::detail::cancellation_handler< \
448 boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
449 op_cancellation<boost::asio::experimental::channel_traits<>, void(boost::system::error_code, T)>>; \
450 namespace util::detail { \
452 struct ChannelInstantiated<T> : std::true_type {}; \
458#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
The receiving end of a channel.
Definition Channel.hpp:265
std::optional< T > tryReceive()
Attempts to receive data from the channel without blocking.
Definition Channel.hpp:312
std::optional< T > asyncReceive(boost::asio::yield_context yield)
Asynchronously receives data from the channel using a coroutine.
Definition Channel.hpp:332
void asyncReceive(std::invocable< std::optional< std::remove_cvref_t< T > > > auto &&fn)
Asynchronously receives data from the channel using a callback.
Definition Channel.hpp:351
bool isClosed() const
Checks if the channel is closed.
Definition Channel.hpp:373
The sending end of a channel.
Definition Channel.hpp:144
void asyncSend(D &&data, std::invocable< bool > auto &&fn)
Asynchronously sends data through the channel using a callback.
Definition Channel.hpp:222
bool trySend(D &&data)
Attempts to send data through the channel without blocking.
Definition Channel.hpp:250
bool asyncSend(D &&data, boost::asio::yield_context yield)
Asynchronously sends data through the channel using a coroutine.
Definition Channel.hpp:197
Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pi...
Definition Channel.hpp:73
static std::pair< Sender, Receiver > create(auto &&context, std::size_t capacity)
Factory function to create channel components.
Definition Channel.hpp:386
Concept that identifies types derived from ExecutionContextTag.
Definition Concepts.hpp:51
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
This namespace contains various utilities.
Definition AccountUtils.hpp:30
ProducerType
Specifies the producer concurrency model for a Channel.
Definition Channel.hpp:50
@ Single
Definition Channel.hpp:51
@ Multi
Definition Channel.hpp:52
ConsumerType
Specifies the consumer concurrency model for a Channel.
Definition Channel.hpp:58
@ Multi
Definition Channel.hpp:60