3#include "util/Assert.hpp"
4#include "util/async/Concepts.hpp"
5#include "util/async/Error.hpp"
6#include "util/async/Operation.hpp"
7#include "util/async/Outcome.hpp"
8#include "util/async/context/impl/Cancellation.hpp"
9#include "util/async/context/impl/Execution.hpp"
10#include "util/async/context/impl/Strand.hpp"
11#include "util/async/context/impl/Timer.hpp"
12#include "util/async/context/impl/Utils.hpp"
13#include "util/async/impl/ErrorHandling.hpp"
15#include <boost/asio.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/strand.hpp>
18#include <boost/asio/thread_pool.hpp>
43 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
55struct AsioPoolContext {
56 using Executor = boost::asio::thread_pool;
60 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
64 AsioPoolContext(AsioPoolContext
const&) =
delete;
65 AsioPoolContext(AsioPoolContext&&) =
default;
70 ASSERT(executor,
"Called after executor was moved from.");
71 return {boost::asio::make_strand(*executor)};
88 [[nodiscard]] Executor&
91 ASSERT(executor,
"Called after executor was moved from.");
95 std::unique_ptr<Executor> executor;
109 typename ContextType,
110 typename StopSourceType,
111 typename DispatcherType,
115 ContextType context_;
123 static constexpr bool kIsNoexcept =
noexcept(ErrorHandlerType::wrap([](
auto&) {
throw 0; })) and
124 noexcept(ErrorHandlerType::catchAndAssert([] {
throw 0; }));
126 using ContextHolderType = ContextType;
128 using ExecutorType =
typename ContextHolderType::Executor;
130 template <
typename T>
131 using ValueType = std::expected<T, ExecutionError>;
133 using StopSource = StopSourceType;
135 using StopToken =
typename StopSourceType::Token;
137 template <
typename T>
138 using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
140 template <
typename T>
143 using Strand = impl::BasicStrand<
147 TimerContextProvider,
150 using Timer =
typename ContextHolderType::Timer;
153 template <
typename T>
157 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
191 std::optional<std::chrono::milliseconds> timeout = std::nullopt
194 if constexpr (not std::is_same_v<
195 decltype(TimerContextProvider::getContext(*
this)),
197 return TimerContextProvider::getContext(*this).scheduleAfter(
198 delay, std::forward<
decltype(fn)>(fn), timeout
201 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken>>;
203 impl::extractAssociatedExecutor(*
this),
205 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto)
mutable {
207 [fn = std::forward<
decltype(fn)>(fn)](
auto stopToken)
mutable {
208 if constexpr (std::is_void_v<FnRetType>) {
209 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken));
212 std::forward<
decltype(fn)>(fn), std::move(stopToken)
236 std::optional<std::chrono::milliseconds> timeout = std::nullopt
239 if constexpr (not std::is_same_v<
240 decltype(TimerContextProvider::getContext(*
this)),
242 return TimerContextProvider::getContext(*this).scheduleAfter(
243 delay, std::forward<
decltype(fn)>(fn), timeout
246 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken,
bool>>;
248 impl::extractAssociatedExecutor(*
this),
250 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto ec)
mutable {
252 [fn = std::forward<
decltype(fn)>(fn),
253 isAborted = (ec == boost::asio::error::operation_aborted)](
256 if constexpr (std::is_void_v<FnRetType>) {
258 std::forward<
decltype(fn)>(fn), std::move(stopToken), isAborted
262 std::forward<
decltype(fn)>(fn), std::move(stopToken), isAborted
287 if constexpr (not std::is_same_v<
288 decltype(TimerContextProvider::getContext(*
this)),
290 return TimerContextProvider::getContext(*this).executeRepeatedly(
291 interval, std::forward<
decltype(fn)>(fn)
294 return RepeatedOperation(
295 impl::extractAssociatedExecutor(*
this), interval, std::forward<
decltype(fn)>(fn)
310 std::optional<std::chrono::milliseconds> timeout = std::nullopt
313 return DispatcherType::dispatch(
315 impl::outcomeForHandler<StopSourceType>(fn),
316 ErrorHandlerType::wrap([
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
317 auto& outcome,
auto& stopSource,
auto stopToken
319 [[maybe_unused]]
auto timeoutHandler = impl::getTimeoutHandleIfNeeded(
320 TimerContextProvider::getContext(*
this), timeout, stopSource
323 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn), StopToken>>;
324 if constexpr (std::is_void_v<FnRetType>) {
325 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken));
329 std::invoke(std::forward<
decltype(fn)>(fn), std::move(stopToken))
349 std::forward<
decltype(fn)>(fn),
350 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
364 return DispatcherType::dispatch(
366 impl::outcomeForHandler<StopSourceType>(fn),
367 ErrorHandlerType::wrap([fn = std::forward<
decltype(fn)>(fn)](
auto& outcome)
mutable {
368 using FnRetType = std::decay_t<std::invoke_result_t<
decltype(fn)>>;
369 if constexpr (std::is_void_v<FnRetType>) {
370 std::invoke(std::forward<
decltype(fn)>(fn));
373 outcome.setValue(std::invoke(std::forward<
decltype(fn)>(fn)));
388 DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
399 return Strand(*
this, context_.makeStrand());
428 typename ContextType::Executor&
431 return context_.getExecutor();
A highly configurable execution context.
Definition BasicExecutionContext.hpp:114
void submit(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIsNoexcept)
Schedule an operation on the execution context without expectations of a result.
Definition BasicExecutionContext.hpp:386
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:308
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:188
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:397
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:344
ContextType::Executor & getExecutor()
Get the underlying executor.
Definition BasicExecutionContext.hpp:429
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIsNoexcept)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:282
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:164
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:233
static constexpr bool kIsNoexcept
Definition BasicExecutionContext.hpp:123
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:362
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:415
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:406
~BasicExecutionContext() override
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:171
Definition Cancellation.hpp:34
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:165
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:157
Specifies that the type must be some std::duration.
Definition Concepts.hpp:173
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:17
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:457
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:203
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:194
BasicExecutionContext< impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy > CoroExecutionContext
A Boost.Coroutine-based (asio yield_context) execution context.
Definition BasicExecutionContext.hpp:445
Tag type for identifying execution context types.
Definition Concepts.hpp:20
Definition BasicExecutionContext.hpp:55
Definition BasicExecutionContext.hpp:42
Definition ErrorHandling.hpp:16
Definition Execution.hpp:13