22#include "util/async/Concepts.hpp"
24#include <boost/asio/any_io_executor.hpp>
25#include <boost/asio/experimental/channel.hpp>
26#include <boost/asio/experimental/concurrent_channel.hpp>
27#include <boost/asio/spawn.hpp>
28#include <boost/system/detail/error_code.hpp>
43struct ChannelInstantiated;
79template <
typename T, ProducerType P = ProducerType::Multi, ConsumerType C = ConsumerType::Multi>
86 using InternalChannelType =
87 boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
88 boost::asio::any_io_executor executor_;
89 InternalChannelType ch_;
92 template <
typename ContextType>
94 ControlBlock(ContextType&& context, std::size_t capacity)
95 : executor_(context.get_executor()), ch_(context, capacity)
99 template <async::SomeExecutionContext ContextType>
100 ControlBlock(ContextType&& context, std::size_t capacity)
101 : executor_(context.getExecutor().get_executor()), ch_(context.getExecutor(), capacity)
105 [[nodiscard]] InternalChannelType&
114 if (not isClosed()) {
128 return not ch_.is_open();
137 std::shared_ptr<ControlBlock> shared;
155 std::shared_ptr<ControlBlock> shared_;
156 std::conditional_t<kIS_MULTI_PRODUCER, std::shared_ptr<Guard>, Guard> guard_;
164 Sender(std::shared_ptr<ControlBlock> shared)
165 : shared_(shared), guard_([shared = std::move(shared)]() {
166 if constexpr (kIS_MULTI_PRODUCER) {
167 return std::make_shared<Guard>(std::move(shared));
169 return Guard{std::move(shared)};
176 Sender(Sender&&) =
default;
177 Sender(Sender
const&)
178 requires kIS_MULTI_PRODUCER
180 Sender(Sender
const&)
181 requires(!kIS_MULTI_PRODUCER)
185 operator=(Sender&&) =
default;
187 operator=(Sender
const&)
188 requires kIS_MULTI_PRODUCER
191 operator=(Sender
const&)
192 requires(!kIS_MULTI_PRODUCER)
205 template <
typename D>
208 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
210 boost::system::error_code
const ecIn;
211 boost::system::error_code ecOut;
212 shared_->channel().async_send(ecIn, std::forward<D>(
data), yield[ecOut]);
215 if (not ecOut and shared_->isClosed())
231 template <
typename D>
234 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
236 boost::system::error_code
const ecIn;
237 shared_->channel().async_send(
239 std::forward<D>(
data),
240 [fn = std::forward<
decltype(fn)>(fn),
241 shared = shared_](boost::system::error_code ec)
mutable {
244 if (not ec and shared->isClosed()) {
261 template <
typename D>
264 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
266 boost::system::error_code ec;
267 return shared_->channel().try_send(ec, std::forward<D>(
data));
280 std::shared_ptr<ControlBlock> shared_;
281 std::conditional_t<kIS_MULTI_CONSUMER, std::shared_ptr<Guard>, Guard> guard_;
289 Receiver(std::shared_ptr<ControlBlock> shared)
290 : shared_(shared), guard_([shared = std::move(shared)]() {
291 if constexpr (kIS_MULTI_CONSUMER) {
292 return std::make_shared<Guard>(std::move(shared));
294 return Guard{std::move(shared)};
301 Receiver(Receiver&&) =
default;
302 Receiver(Receiver
const&)
303 requires kIS_MULTI_CONSUMER
305 Receiver(Receiver
const&)
306 requires(!kIS_MULTI_CONSUMER)
310 operator=(Receiver&&) =
default;
312 operator=(Receiver
const&)
313 requires kIS_MULTI_CONSUMER
316 operator=(Receiver
const&)
317 requires(!kIS_MULTI_CONSUMER)
329 std::optional<T> result;
330 shared_->channel().try_receive([&result](boost::system::error_code ec,
auto&& value) {
332 result = std::forward<decltype(value)>(value);
347 [[nodiscard]] std::optional<T>
350 boost::system::error_code ec;
351 auto value = shared_->channel().async_receive(yield[ec]);
368 asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>>
auto&& fn)
370 shared_->channel().async_receive([fn = std::forward<
decltype(fn)>(fn)](
371 boost::system::error_code ec, T&& value
374 fn(std::optional<T>(std::nullopt));
378 fn(std::make_optional<T>(std::move(value)));
392 return shared_->isClosed();
402 static std::pair<Sender, Receiver>
403 create(
auto&& context, std::size_t capacity)
407 util::detail::ChannelInstantiated<T>::value,
408 "When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
409 "to one .cpp file. See documentation at the bottom of Channel.hpp for details."
413 std::make_shared<ControlBlock>(std::forward<
decltype(context)>(context), capacity);
414 auto sender =
Sender{shared};
415 auto receiver =
Receiver{std::move(shared)};
417 return {std::move(sender), std::move(receiver)};
453#include <boost/asio/cancellation_signal.hpp>
454#include <boost/asio/experimental/channel_traits.hpp>
455#include <boost/asio/experimental/detail/channel_service.hpp>
457namespace util::detail {
460struct ChannelInstantiated : std::false_type {};
463#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
465 template class boost::asio::detail::cancellation_handler< \
466 boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
468 boost::asio::experimental::channel_traits<>, \
469 void(boost::system::error_code, T)>>; \
470 namespace util::detail { \
472 struct ChannelInstantiated<T> : std::true_type {}; \
478#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
The receiving end of a channel.
Definition Channel.hpp:279
std::optional< T > tryReceive()
Attempts to receive data from the channel without blocking.
Definition Channel.hpp:327
std::optional< T > asyncReceive(boost::asio::yield_context yield)
Asynchronously receives data from the channel using a coroutine.
Definition Channel.hpp:348
void asyncReceive(std::invocable< std::optional< std::remove_cvref_t< T > > > auto &&fn)
Asynchronously receives data from the channel using a callback.
Definition Channel.hpp:368
bool isClosed() const
Checks if the channel is closed.
Definition Channel.hpp:390
The sending end of a channel.
Definition Channel.hpp:154
void asyncSend(D &&data, std::invocable< bool > auto &&fn)
Asynchronously sends data through the channel using a callback.
Definition Channel.hpp:233
bool trySend(D &&data)
Attempts to send data through the channel without blocking.
Definition Channel.hpp:263
bool asyncSend(D &&data, boost::asio::yield_context yield)
Asynchronously sends data through the channel using a coroutine.
Definition Channel.hpp:207
Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pi...
Definition Channel.hpp:80
static std::pair< Sender, Receiver > create(auto &&context, std::size_t capacity)
Factory function to create channel components.
Definition Channel.hpp:403
Concept that identifies types derived from ExecutionContextTag.
Definition Concepts.hpp:52
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:75
This namespace contains various utilities.
Definition AccountUtils.hpp:30
ProducerType
Specifies the producer concurrency model for a Channel.
Definition Channel.hpp:50
@ Single
Definition Channel.hpp:51
@ Multi
Definition Channel.hpp:53
ConsumerType
Specifies the consumer concurrency model for a Channel.
Definition Channel.hpp:60
@ Multi
Definition Channel.hpp:63