22#include "util/Assert.hpp"
23#include "util/async/Concepts.hpp"
24#include "util/async/Error.hpp"
25#include "util/async/Operation.hpp"
26#include "util/async/Outcome.hpp"
27#include "util/async/context/impl/Cancellation.hpp"
28#include "util/async/context/impl/Execution.hpp"
29#include "util/async/context/impl/Strand.hpp"
30#include "util/async/context/impl/Timer.hpp"
31#include "util/async/context/impl/Utils.hpp"
32#include "util/async/impl/ErrorHandling.hpp"
34#include <boost/asio.hpp>
35#include <boost/asio/error.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/thread_pool.hpp>
61 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
74 using Executor = boost::asio::thread_pool;
78 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
88 ASSERT(executor,
"Called after executor was moved from.");
89 return {boost::asio::make_strand(*executor)};
109 ASSERT(executor,
"Called after executor was moved from.");
113 std::unique_ptr<Executor> executor;
127 typename ContextType,
128 typename StopSourceType,
129 typename DispatcherType,
133 ContextType context_;
141 static constexpr bool kIS_NOEXCEPT =
noexcept(ErrorHandlerType::wrap([](
auto&) {
throw 0; }));
143 using ContextHolderType = ContextType;
145 using ExecutorType =
typename ContextHolderType::Executor;
147 template <
typename T>
148 using ValueType = std::expected<T, ExecutionError>;
150 using StopSource = StopSourceType;
152 using StopToken =
typename StopSourceType::Token;
154 template <
typename T>
157 template <
typename T>
160 using Strand = impl::
161 BasicStrand<BasicExecutionContext, StopSourceType, DispatcherType, TimerContextProvider, ErrorHandlerType>;
163 using Timer =
typename ContextHolderType::Timer;
166 template <
typename T>
204 std::optional<std::chrono::milliseconds> timeout = std::nullopt
207 if constexpr (not std::is_same_v<
decltype(TimerContextProvider::getContext(*
this)),
decltype(*this)>) {
208 return TimerContextProvider::getContext(*this).scheduleAfter(
209 delay, std::forward<
decltype(fn)>(fn), timeout
212 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
213 return ScheduledOperation<FnRetType>(
214 impl::extractAssociatedExecutor(*
this),
216 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto)
mutable {
218 [fn = std::forward<
decltype(fn)>(fn)](
auto stopToken) {
219 if constexpr (std::is_void_v<FnRetType>) {
220 fn(std::move(stopToken));
222 return fn(std::move(stopToken));
244 std::optional<std::chrono::milliseconds> timeout = std::nullopt
247 if constexpr (not std::is_same_v<
decltype(TimerContextProvider::getContext(*
this)),
decltype(*this)>) {
248 return TimerContextProvider::getContext(*this).scheduleAfter(
249 delay, std::forward<
decltype(fn)>(fn), timeout
252 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>(),
true))>;
253 return ScheduledOperation<FnRetType>(
254 impl::extractAssociatedExecutor(*
this),
256 [
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
auto ec)
mutable {
258 [fn = std::forward<
decltype(fn)>(fn),
259 isAborted = (ec == boost::asio::error::operation_aborted)](
auto stopToken) {
260 if constexpr (std::is_void_v<FnRetType>) {
261 fn(std::move(stopToken), isAborted);
263 return fn(std::move(stopToken), isAborted);
283 if constexpr (not std::is_same_v<
decltype(TimerContextProvider::getContext(*
this)),
decltype(*this)>) {
284 return TimerContextProvider::getContext(*this).executeRepeatedly(interval, std::forward<
decltype(fn)>(fn));
286 return RepeatedOperation(impl::extractAssociatedExecutor(*
this), interval, std::forward<
decltype(fn)>(fn));
300 std::optional<std::chrono::milliseconds> timeout = std::nullopt
303 return DispatcherType::dispatch(
305 impl::outcomeForHandler<StopSourceType>(fn),
306 ErrorHandlerType::wrap([
this, timeout, fn = std::forward<
decltype(fn)>(fn)](
307 auto& outcome,
auto& stopSource,
auto stopToken
309 [[maybe_unused]]
auto timeoutHandler =
310 impl::getTimeoutHandleIfNeeded(TimerContextProvider::getContext(*
this), timeout, stopSource);
312 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
313 if constexpr (std::is_void_v<FnRetType>) {
314 fn(std::move(stopToken));
317 outcome.setValue(fn(std::move(stopToken)));
334 std::forward<
decltype(fn)>(fn),
335 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
348 return DispatcherType::dispatch(
350 impl::outcomeForHandler<StopSourceType>(fn),
351 ErrorHandlerType::wrap([fn = std::forward<
decltype(fn)>(fn)](
auto& outcome)
mutable {
352 using FnRetType = std::decay_t<
decltype(fn())>;
353 if constexpr (std::is_void_v<FnRetType>) {
357 outcome.setValue(fn());
371 return Strand(*
this, context_.makeStrand());
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:298
~BasicExecutionContext()
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:184
static constexpr bool kIS_NOEXCEPT
Whether operations on this execution context are noexcept.
Definition BasicExecutionContext.hpp:141
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:369
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:281
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:241
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:331
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:201
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:177
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:346
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:387
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:378
The future side of async operations that automatically repeat until aborted.
Definition Operation.hpp:228
The future side of async operations that can be stopped.
Definition Outcome.hpp:30
Definition Outcome.hpp:35
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:154
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:146
Specifies that the type must be some std::duration.
Definition Concepts.hpp:162
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36
Definition BasicExecutionContext.hpp:73
Definition BasicExecutionContext.hpp:60
Definition Operation.hpp:73
Definition ErrorHandling.hpp:34