22#include "util/Assert.hpp"
23#include "util/async/Concepts.hpp"
24#include "util/async/Error.hpp"
25#include "util/async/Operation.hpp"
26#include "util/async/Outcome.hpp"
27#include "util/async/context/impl/Cancellation.hpp"
28#include "util/async/context/impl/Execution.hpp"
29#include "util/async/context/impl/Strand.hpp"
30#include "util/async/context/impl/Timer.hpp"
31#include "util/async/context/impl/Utils.hpp"
32#include "util/async/impl/ErrorHandling.hpp"
34#include <boost/asio.hpp>
35#include <boost/asio/error.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/thread_pool.hpp>
62 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
74struct AsioPoolContext {
75 using Executor = boost::asio::thread_pool;
79 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
83 AsioPoolContext(AsioPoolContext
const&) =
delete;
84 AsioPoolContext(AsioPoolContext&&) =
default;
89 ASSERT(executor,
"Called after executor was moved from.");
90 return {boost::asio::make_strand(*executor)};
110 ASSERT(executor,
"Called after executor was moved from.");
114 std::unique_ptr<Executor> executor;
128 typename ContextType,
129 typename StopSourceType,
130 typename DispatcherType,
134 ContextType context_;
143 noexcept(ErrorHandlerType::wrap([](
auto&) {
throw 0; })) and
144 noexcept(ErrorHandlerType::catchAndAssert([] {
throw 0; }));
146 using ContextHolderType = ContextType;
148 using ExecutorType =
typename ContextHolderType::Executor;
150 template <
typename T>
151 using ValueType = std::expected<T, ExecutionError>;
153 using StopSource = StopSourceType;
155 using StopToken =
typename StopSourceType::Token;
157 template <
typename T>
158 using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
160 template <
typename T>
163 using Strand = impl::BasicStrand<
167 TimerContextProvider,
170 using Timer =
typename ContextHolderType::Timer;
173 template <
typename T>
177 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
211 std::optional<std::chrono::milliseconds> timeout = std::nullopt
214 if constexpr (not std::is_same_v<
215 decltype(TimerContextProvider::getContext(*
this)),
217 return TimerContextProvider::getContext(*this).scheduleAfter(
218 delay, std::forward<
decltype(fn)>(fn), timeout
221 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken>>;
223 impl::extractAssociatedExecutor(*
this),
225 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto)
mutable {
227 [fn = std::forward<
decltype(fn)>(fn)](
auto stopToken)
mutable {
228 if constexpr (std::is_void_v<FnRetType>) {
229 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken));
232 std::forward<
decltype(fn)>(fn), std::move(stopToken)
256 std::optional<std::chrono::milliseconds> timeout = std::nullopt
259 if constexpr (not std::is_same_v<
260 decltype(TimerContextProvider::getContext(*
this)),
262 return TimerContextProvider::getContext(*this).scheduleAfter(
263 delay, std::forward<
decltype(fn)>(fn), timeout
266 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken,
bool>>;
268 impl::extractAssociatedExecutor(*
this),
270 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto ec)
mutable {
272 [fn = std::forward<
decltype(fn)>(fn),
273 isAborted = (ec == boost::asio::error::operation_aborted)](
276 if constexpr (std::is_void_v<FnRetType>) {
278 std::forward<
decltype(fn)>(fn), std::move(stopToken), isAborted
282 std::forward<
decltype(fn)>(fn), std::move(stopToken), isAborted
307 if constexpr (not std::is_same_v<
308 decltype(TimerContextProvider::getContext(*
this)),
310 return TimerContextProvider::getContext(*this).executeRepeatedly(
311 interval, std::forward<
decltype(fn)>(fn)
314 return RepeatedOperation(
315 impl::extractAssociatedExecutor(*
this), interval, std::forward<
decltype(fn)>(fn)
330 std::optional<std::chrono::milliseconds> timeout = std::nullopt
333 return DispatcherType::dispatch(
335 impl::outcomeForHandler<StopSourceType>(fn),
336 ErrorHandlerType::wrap([
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
337 auto& outcome,
auto& stopSource,
auto stopToken
339 [[maybe_unused]]
auto timeoutHandler = impl::getTimeoutHandleIfNeeded(
340 TimerContextProvider::getContext(*
this), timeout, stopSource
343 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken>>;
344 if constexpr (std::is_void_v<FnRetType>) {
345 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken));
349 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken))
369 std::forward<
decltype(fn)>(fn),
370 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
384 return DispatcherType::dispatch(
386 impl::outcomeForHandler<StopSourceType>(fn),
387 ErrorHandlerType::wrap([fn = std::forward<
decltype(fn)>(fn)](
auto& outcome)
mutable {
388 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn)>>;
389 if constexpr (std::is_void_v<FnRetType>) {
390 std::invoke(std::forward<
decltype(fn)>(fn));
393 outcome.setValue(std::invoke(std::forward<
decltype(fn)>(fn)));
408 DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
419 return Strand(*
this, context_.makeStrand());
448 typename ContextType::Executor&
451 return context_.getExecutor();
A highly configurable execution context.
Definition BasicExecutionContext.hpp:133
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:328
static constexpr bool kIS_NOEXCEPT
Definition BasicExecutionContext.hpp:142
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:417
ContextType::Executor & getExecutor()
Get the underlying executor.
Definition BasicExecutionContext.hpp:449
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:302
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:253
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:364
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:208
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:184
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:382
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:435
void submit(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context without expectations of a result.
Definition BasicExecutionContext.hpp:406
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:426
~BasicExecutionContext() override
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:191
Definition Cancellation.hpp:53
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:184
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:176
Specifies that the type must be some std::duration.
Definition Concepts.hpp:192
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:477
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:222
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:213
BasicExecutionContext< impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy > CoroExecutionContext
A Boost.Coroutine-based (asio yield_context) execution context.
Definition BasicExecutionContext.hpp:465
Tag type for identifying execution context types.
Definition Concepts.hpp:39
Definition BasicExecutionContext.hpp:74
Definition BasicExecutionContext.hpp:61
Definition ErrorHandling.hpp:35
Definition Execution.hpp:32