56 using InternalChannelType = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
57 boost::asio::any_io_executor executor_;
58 InternalChannelType ch_;
61 ControlBlock(
auto&& context, std::size_t capacity) : executor_(context.get_executor()), ch_(context, capacity)
65 [[nodiscard]] InternalChannelType&
88 return not ch_.is_open();
96 std::shared_ptr<ControlBlock> shared;
111 std::shared_ptr<ControlBlock> shared_;
112 std::shared_ptr<Guard> guard_;
119 Sender(std::shared_ptr<ControlBlock> shared)
120 : shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
122 Sender(Sender&&) =
default;
123 Sender(Sender
const&) =
default;
125 operator=(Sender&&) =
default;
127 operator=(Sender
const&) =
default;
139 template <
typename D>
141 asyncSend(D&&
data, boost::asio::yield_context yield)
142 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
144 boost::system::error_code ecIn, ecOut;
145 shared_->channel().async_send(ecIn, std::forward<D>(
data), yield[ecOut]);
148 if (not ecOut and shared_->isClosed())
163 template <
typename D>
165 asyncSend(D&&
data, std::invocable<bool>
auto&& fn)
166 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
168 boost::system::error_code
const ecIn;
169 shared_->channel().async_send(
171 std::forward<D>(
data),
172 [fn = std::forward<
decltype(fn)>(fn), shared = shared_](boost::system::error_code ec)
mutable {
174 if (not ec and shared->isClosed()) {
191 template <
typename D>
194 requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
196 boost::system::error_code ec;
197 return shared_->channel().try_send(ec, std::forward<D>(
data));
208 std::shared_ptr<ControlBlock> shared_;
209 std::shared_ptr<Guard> guard_;
216 Receiver(std::shared_ptr<ControlBlock> shared)
217 : shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
219 Receiver(Receiver&&) =
default;
220 Receiver(Receiver
const&) =
default;
222 operator=(Receiver&&) =
default;
224 operator=(Receiver
const&) =
default;
234 std::optional<T> result;
235 shared_->channel().try_receive([&result](boost::system::error_code ec,
auto&& value) {
237 result = std::forward<decltype(value)>(value);
251 [[nodiscard]] std::optional<T>
252 asyncReceive(boost::asio::yield_context yield)
254 boost::system::error_code ec;
255 auto value = shared_->channel().async_receive(yield[ec]);
271 asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>>
auto&& fn)
273 shared_->channel().async_receive(
274 [fn = std::forward<
decltype(fn)>(fn)](boost::system::error_code ec, T&& value)
mutable {
276 fn(std::optional<T>(std::nullopt));
280 fn(std::make_optional<T>(std::move(value)));
295 return shared_->isClosed();
306 static std::pair<Sender, Receiver>
307 create(
auto&& context, std::size_t capacity)
311 util::detail::ChannelInstantiated<T>::value,
312 "When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
313 "to one .cpp file. See documentation at the bottom of Channel.hpp for details."
316 auto shared = std::make_shared<ControlBlock>(std::forward<
decltype(context)>(context), capacity);
317 auto sender = Sender{shared};
318 auto receiver = Receiver{std::move(shared)};
320 return {std::move(sender), std::move(receiver)};
static std::pair< Sender, Receiver > create(auto &&context, std::size_t capacity)
Factory function to create channel components.
Definition Channel.hpp:307