Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
BasicExecutionContext.hpp
1#pragma once
2
3#include "util/Assert.hpp"
4#include "util/async/Concepts.hpp"
5#include "util/async/Error.hpp"
6#include "util/async/Operation.hpp"
7#include "util/async/Outcome.hpp"
8#include "util/async/context/impl/Cancellation.hpp"
9#include "util/async/context/impl/Execution.hpp"
10#include "util/async/context/impl/Strand.hpp"
11#include "util/async/context/impl/Timer.hpp"
12#include "util/async/context/impl/Utils.hpp"
13#include "util/async/impl/ErrorHandling.hpp"
14
15#include <boost/asio.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/strand.hpp>
18#include <boost/asio/thread_pool.hpp>
19
20#include <chrono>
21#include <cstddef>
22#include <expected>
23#include <memory>
24#include <optional>
25#include <type_traits>
26#include <utility>
27
39namespace util::async {
40namespace impl {
41
43 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
44 using Timer = SteadyTimer<Executor>;
45
46 Executor&
47 getExecutor()
48 {
49 return executor;
50 }
51
52 Executor executor;
53};
54
55struct AsioPoolContext {
56 using Executor = boost::asio::thread_pool;
57 using Timer = SteadyTimer<Executor>;
58 using Strand = AsioPoolStrandContext;
59
60 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
61 {
62 }
63
64 AsioPoolContext(AsioPoolContext const&) = delete;
65 AsioPoolContext(AsioPoolContext&&) = default;
66
67 [[nodiscard]] Strand
68 makeStrand() const
69 {
70 ASSERT(executor, "Called after executor was moved from.");
71 return {boost::asio::make_strand(*executor)};
72 }
73
74 void
75 stop() const
76 {
77 if (executor) // don't call if executor was moved from
78 executor->stop();
79 }
80
81 void
82 join() const
83 {
84 if (executor) // don't call if executor was moved from
85 executor->join();
86 }
87
88 [[nodiscard]] Executor&
89 getExecutor() const
90 {
91 ASSERT(executor, "Called after executor was moved from.");
92 return *executor;
93 }
94
95 std::unique_ptr<Executor> executor;
96};
97
98} // namespace impl
99
108template <
109 typename ContextType,
110 typename StopSourceType,
111 typename DispatcherType,
112 typename TimerContextProvider = impl::SelfContextProvider,
113 typename ErrorHandlerType = impl::DefaultErrorHandler>
115 ContextType context_;
116
120
121public:
123 static constexpr bool kIsNoexcept = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; })) and
124 noexcept(ErrorHandlerType::catchAndAssert([] { throw 0; }));
125
126 using ContextHolderType = ContextType;
127
128 using ExecutorType = typename ContextHolderType::Executor;
129
130 template <typename T>
131 using ValueType = std::expected<T, ExecutionError>;
132
133 using StopSource = StopSourceType;
134
135 using StopToken = typename StopSourceType::Token;
136
137 template <typename T>
138 using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
139
140 template <typename T>
142
143 using Strand = impl::BasicStrand<
145 StopSourceType,
146 DispatcherType,
147 TimerContextProvider,
148 ErrorHandlerType>;
149
150 using Timer = typename ContextHolderType::Timer;
151
152 // note: scheduled operations are always stoppable
153 template <typename T>
155
156 // note: repeating operations are always stoppable and must return void
157 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
158
164 explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
165 {
166 }
167
172 {
173 stop();
174 }
175
178
187 [[nodiscard]] auto
189 SomeStdDuration auto delay,
191 std::optional<std::chrono::milliseconds> timeout = std::nullopt
192 ) noexcept(kIsNoexcept)
193 {
194 if constexpr (not std::is_same_v<
195 decltype(TimerContextProvider::getContext(*this)),
196 decltype(*this)>) {
197 return TimerContextProvider::getContext(*this).scheduleAfter(
198 delay, std::forward<decltype(fn)>(fn), timeout
199 );
200 } else {
201 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
203 impl::extractAssociatedExecutor(*this),
204 delay,
205 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
206 return this->execute(
207 [fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable {
208 if constexpr (std::is_void_v<FnRetType>) {
209 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
210 } else {
211 return std::invoke(
212 std::forward<decltype(fn)>(fn), std::move(stopToken)
213 );
214 }
215 },
216 timeout
217 );
218 }
219 );
220 }
221 }
222
232 [[nodiscard]] auto
234 SomeStdDuration auto delay,
236 std::optional<std::chrono::milliseconds> timeout = std::nullopt
237 ) noexcept(kIsNoexcept)
238 {
239 if constexpr (not std::is_same_v<
240 decltype(TimerContextProvider::getContext(*this)),
241 decltype(*this)>) {
242 return TimerContextProvider::getContext(*this).scheduleAfter(
243 delay, std::forward<decltype(fn)>(fn), timeout
244 );
245 } else {
246 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken, bool>>;
248 impl::extractAssociatedExecutor(*this),
249 delay,
250 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
251 return this->execute(
252 [fn = std::forward<decltype(fn)>(fn),
253 isAborted = (ec == boost::asio::error::operation_aborted)](
254 auto stopToken
255 ) mutable {
256 if constexpr (std::is_void_v<FnRetType>) {
257 std::invoke(
258 std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted
259 );
260 } else {
261 return std::invoke(
262 std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted
263 );
264 }
265 },
266 timeout
267 );
268 }
269 );
270 }
271 }
272
281 [[nodiscard]] auto
283 SomeStdDuration auto interval,
285 ) noexcept(kIsNoexcept)
286 {
287 if constexpr (not std::is_same_v<
288 decltype(TimerContextProvider::getContext(*this)),
289 decltype(*this)>) {
290 return TimerContextProvider::getContext(*this).executeRepeatedly(
291 interval, std::forward<decltype(fn)>(fn)
292 );
293 } else {
294 return RepeatedOperation(
295 impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn)
296 );
297 }
298 }
299
307 [[nodiscard]] auto
310 std::optional<std::chrono::milliseconds> timeout = std::nullopt
311 ) noexcept(kIsNoexcept)
312 {
313 return DispatcherType::dispatch(
314 context_,
315 impl::outcomeForHandler<StopSourceType>(fn),
316 ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
317 auto& outcome, auto& stopSource, auto stopToken
318 ) mutable {
319 [[maybe_unused]] auto timeoutHandler = impl::getTimeoutHandleIfNeeded(
320 TimerContextProvider::getContext(*this), timeout, stopSource
321 );
322
323 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
324 if constexpr (std::is_void_v<FnRetType>) {
325 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
326 outcome.setValue();
327 } else {
328 outcome.setValue(
329 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken))
330 );
331 }
332 })
333 );
334 }
335
343 [[nodiscard]] auto
344 execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) noexcept(
346 )
347 {
348 return execute(
349 std::forward<decltype(fn)>(fn),
350 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
351 );
352 }
353
361 [[nodiscard]] auto
363 {
364 return DispatcherType::dispatch(
365 context_,
366 impl::outcomeForHandler<StopSourceType>(fn),
367 ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
368 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
369 if constexpr (std::is_void_v<FnRetType>) {
370 std::invoke(std::forward<decltype(fn)>(fn));
371 outcome.setValue();
372 } else {
373 outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn)));
374 }
375 })
376 );
377 }
378
385 void
387 {
388 DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
389 }
390
396 [[nodiscard]] Strand
398 {
399 return Strand(*this, context_.makeStrand());
400 }
401
405 void
406 stop() const noexcept
407 {
408 context_.stop();
409 }
410
414 void
415 join() const noexcept
416 {
417 context_.join();
418 }
419
428 typename ContextType::Executor&
430 {
431 return context_.getExecutor();
432 }
433};
434
449
459
460} // namespace util::async
A highly configurable execution context.
Definition BasicExecutionContext.hpp:114
void submit(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIsNoexcept)
Schedule an operation on the execution context without expectations of a result.
Definition BasicExecutionContext.hpp:386
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:308
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:188
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:397
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:344
ContextType::Executor & getExecutor()
Get the underlying executor.
Definition BasicExecutionContext.hpp:429
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIsNoexcept)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:282
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:164
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:233
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIsNoexcept)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:362
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:415
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:406
~BasicExecutionContext() override
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:171
Definition Timer.hpp:8
Definition Cancellation.hpp:34
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:165
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:157
Specifies that the type must be some std::duration.
Definition Concepts.hpp:173
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:17
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:457
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:203
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:194
BasicExecutionContext< impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy > CoroExecutionContext
A Boost.Coroutine-based (asio yield_context) execution context.
Definition BasicExecutionContext.hpp:445
Tag type for identifying execution context types.
Definition Concepts.hpp:20
Definition BasicExecutionContext.hpp:55
Definition BasicExecutionContext.hpp:42
Definition ErrorHandling.hpp:16
Definition Execution.hpp:13