3#include "util/Assert.hpp"
4#include "util/async/Concepts.hpp"
5#include "util/async/Error.hpp"
6#include "util/async/Operation.hpp"
7#include "util/async/Outcome.hpp"
8#include "util/async/context/impl/Cancellation.hpp"
9#include "util/async/context/impl/Execution.hpp"
10#include "util/async/context/impl/Strand.hpp"
11#include "util/async/context/impl/Timer.hpp"
12#include "util/async/context/impl/Utils.hpp"
13#include "util/async/impl/ErrorHandling.hpp"
15#include <boost/asio.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/strand.hpp>
18#include <boost/asio/thread_pool.hpp>
43 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
55struct AsioPoolContext {
56 using Executor = boost::asio::thread_pool;
60 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
64 AsioPoolContext(AsioPoolContext
const&) =
delete;
65 AsioPoolContext(AsioPoolContext&&) =
default;
70 ASSERT(executor,
"Called after executor was moved from.");
71 return {boost::asio::make_strand(*executor)};
91 ASSERT(executor,
"Called after executor was moved from.");
95 std::unique_ptr<Executor> executor;
109 typename ContextType,
110 typename StopSourceType,
111 typename DispatcherType,
115 ContextType context_;
124 noexcept(ErrorHandlerType::wrap([](
auto&) {
throw 0; })) and
125 noexcept(ErrorHandlerType::catchAndAssert([] {
throw 0; }));
127 using ContextHolderType = ContextType;
129 using ExecutorType =
typename ContextHolderType::Executor;
131 template <
typename T>
132 using ValueType = std::expected<T, ExecutionError>;
134 using StopSource = StopSourceType;
136 using StopToken =
typename StopSourceType::Token;
138 template <
typename T>
139 using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
141 template <
typename T>
144 using Strand = impl::BasicStrand<
148 TimerContextProvider,
151 using Timer =
typename ContextHolderType::Timer;
154 template <
typename T>
158 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
192 std::optional<std::chrono::milliseconds> timeout = std::nullopt
195 if constexpr (not std::is_same_v<
196 decltype(TimerContextProvider::getContext(*
this)),
198 return TimerContextProvider::getContext(*this).scheduleAfter(
199 delay, std::forward<
decltype(fn)>(fn), timeout
202 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken>>;
204 impl::extractAssociatedExecutor(*
this),
206 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto)
mutable {
208 [fn = std::forward<
decltype(fn)>(fn)](
auto stopToken)
mutable {
209 if constexpr (std::is_void_v<FnRetType>) {
210 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken));
213 std::forward<
decltype(fn)>(fn), std::move(stopToken)
237 std::optional<std::chrono::milliseconds> timeout = std::nullopt
240 if constexpr (not std::is_same_v<
241 decltype(TimerContextProvider::getContext(*
this)),
243 return TimerContextProvider::getContext(*this).scheduleAfter(
244 delay, std::forward<
decltype(fn)>(fn), timeout
247 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken,
bool>>;
249 impl::extractAssociatedExecutor(*
this),
251 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto ec)
mutable {
253 [fn = std::forward<
decltype(fn)>(fn),
254 isAborted = (ec == boost::asio::error::operation_aborted)](
257 if constexpr (std::is_void_v<FnRetType>) {
259 std::forward<
decltype(fn)>(fn), std::move(stopToken), isAborted
263 std::forward<
decltype(fn)>(fn), std::move(stopToken), isAborted
288 if constexpr (not std::is_same_v<
289 decltype(TimerContextProvider::getContext(*
this)),
291 return TimerContextProvider::getContext(*this).executeRepeatedly(
292 interval, std::forward<
decltype(fn)>(fn)
295 return RepeatedOperation(
296 impl::extractAssociatedExecutor(*
this), interval, std::forward<
decltype(fn)>(fn)
311 std::optional<std::chrono::milliseconds> timeout = std::nullopt
314 return DispatcherType::dispatch(
316 impl::outcomeForHandler<StopSourceType>(fn),
317 ErrorHandlerType::wrap([
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
318 auto& outcome,
auto& stopSource,
auto stopToken
320 [[maybe_unused]]
auto timeoutHandler = impl::getTimeoutHandleIfNeeded(
321 TimerContextProvider::getContext(*
this), timeout, stopSource
324 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken>>;
325 if constexpr (std::is_void_v<FnRetType>) {
326 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken));
330 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken))
350 std::forward<
decltype(fn)>(fn),
351 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
365 return DispatcherType::dispatch(
367 impl::outcomeForHandler<StopSourceType>(fn),
368 ErrorHandlerType::wrap([fn = std::forward<
decltype(fn)>(fn)](
auto& outcome)
mutable {
369 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn)>>;
370 if constexpr (std::is_void_v<FnRetType>) {
371 std::invoke(std::forward<
decltype(fn)>(fn));
374 outcome.setValue(std::invoke(std::forward<
decltype(fn)>(fn)));
389 DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
400 return Strand(*
this, context_.makeStrand());
429 typename ContextType::Executor&
432 return context_.getExecutor();
A highly configurable execution context.
Definition BasicExecutionContext.hpp:114
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:309
static constexpr bool kIS_NOEXCEPT
Definition BasicExecutionContext.hpp:123
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:398
ContextType::Executor & getExecutor()
Get the underlying executor.
Definition BasicExecutionContext.hpp:430
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:283
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:234
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:345
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:189
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:165
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:363
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:416
void submit(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context without expectations of a result.
Definition BasicExecutionContext.hpp:387
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:407
~BasicExecutionContext() override
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:172
Definition Cancellation.hpp:34
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:165
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:157
Specifies that the type must be some std::duration.
Definition Concepts.hpp:173
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:17
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:458
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:203
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:194
BasicExecutionContext< impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy > CoroExecutionContext
A Boost.Coroutine-based (asio yield_context) execution context.
Definition BasicExecutionContext.hpp:446
Tag type for identifying execution context types.
Definition Concepts.hpp:20
Definition BasicExecutionContext.hpp:55
Definition BasicExecutionContext.hpp:42
Definition ErrorHandling.hpp:16
Definition Execution.hpp:13