Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
BasicExecutionContext.hpp
1#pragma once
2
3#include "util/Assert.hpp"
4#include "util/async/Concepts.hpp"
5#include "util/async/Error.hpp"
6#include "util/async/Operation.hpp"
7#include "util/async/Outcome.hpp"
8#include "util/async/context/impl/Cancellation.hpp"
9#include "util/async/context/impl/Execution.hpp"
10#include "util/async/context/impl/Strand.hpp"
11#include "util/async/context/impl/Timer.hpp"
12#include "util/async/context/impl/Utils.hpp"
13#include "util/async/impl/ErrorHandling.hpp"
14
15#include <boost/asio.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/strand.hpp>
18#include <boost/asio/thread_pool.hpp>
19
20#include <chrono>
21#include <cstddef>
22#include <expected>
23#include <memory>
24#include <optional>
25#include <type_traits>
26#include <utility>
27
39namespace util::async {
40namespace impl {
41
43 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
44 using Timer = SteadyTimer<Executor>;
45
46 Executor&
47 getExecutor()
48 {
49 return executor;
50 }
51
52 Executor executor;
53};
54
55struct AsioPoolContext {
56 using Executor = boost::asio::thread_pool;
57 using Timer = SteadyTimer<Executor>;
58 using Strand = AsioPoolStrandContext;
59
60 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
61 {
62 }
63
64 AsioPoolContext(AsioPoolContext const&) = delete;
65 AsioPoolContext(AsioPoolContext&&) = default;
66
67 Strand
68 makeStrand() const
69 {
70 ASSERT(executor, "Called after executor was moved from.");
71 return {boost::asio::make_strand(*executor)};
72 }
73
74 void
75 stop() const
76 {
77 if (executor) // don't call if executor was moved from
78 executor->stop();
79 }
80
81 void
82 join() const
83 {
84 if (executor) // don't call if executor was moved from
85 executor->join();
86 }
87
88 Executor&
89 getExecutor() const
90 {
91 ASSERT(executor, "Called after executor was moved from.");
92 return *executor;
93 }
94
95 std::unique_ptr<Executor> executor;
96};
97
98} // namespace impl
99
108template <
109 typename ContextType,
110 typename StopSourceType,
111 typename DispatcherType,
112 typename TimerContextProvider = impl::SelfContextProvider,
113 typename ErrorHandlerType = impl::DefaultErrorHandler>
115 ContextType context_;
116
120
121public:
123 static constexpr bool kIS_NOEXCEPT =
124 noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; })) and
125 noexcept(ErrorHandlerType::catchAndAssert([] { throw 0; }));
126
127 using ContextHolderType = ContextType;
128
129 using ExecutorType = typename ContextHolderType::Executor;
130
131 template <typename T>
132 using ValueType = std::expected<T, ExecutionError>;
133
134 using StopSource = StopSourceType;
135
136 using StopToken = typename StopSourceType::Token;
137
138 template <typename T>
139 using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
140
141 template <typename T>
143
144 using Strand = impl::BasicStrand<
146 StopSourceType,
147 DispatcherType,
148 TimerContextProvider,
149 ErrorHandlerType>;
150
151 using Timer = typename ContextHolderType::Timer;
152
153 // note: scheduled operations are always stoppable
154 template <typename T>
156
157 // note: repeating operations are always stoppable and must return void
158 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
159
165 explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
166 {
167 }
168
173 {
174 stop();
175 }
176
179
188 [[nodiscard]] auto
190 SomeStdDuration auto delay,
192 std::optional<std::chrono::milliseconds> timeout = std::nullopt
193 ) noexcept(kIS_NOEXCEPT)
194 {
195 if constexpr (not std::is_same_v<
196 decltype(TimerContextProvider::getContext(*this)),
197 decltype(*this)>) {
198 return TimerContextProvider::getContext(*this).scheduleAfter(
199 delay, std::forward<decltype(fn)>(fn), timeout
200 );
201 } else {
202 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
204 impl::extractAssociatedExecutor(*this),
205 delay,
206 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
207 return this->execute(
208 [fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable {
209 if constexpr (std::is_void_v<FnRetType>) {
210 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
211 } else {
212 return std::invoke(
213 std::forward<decltype(fn)>(fn), std::move(stopToken)
214 );
215 }
216 },
217 timeout
218 );
219 }
220 );
221 }
222 }
223
233 [[nodiscard]] auto
235 SomeStdDuration auto delay,
237 std::optional<std::chrono::milliseconds> timeout = std::nullopt
238 ) noexcept(kIS_NOEXCEPT)
239 {
240 if constexpr (not std::is_same_v<
241 decltype(TimerContextProvider::getContext(*this)),
242 decltype(*this)>) {
243 return TimerContextProvider::getContext(*this).scheduleAfter(
244 delay, std::forward<decltype(fn)>(fn), timeout
245 );
246 } else {
247 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken, bool>>;
249 impl::extractAssociatedExecutor(*this),
250 delay,
251 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
252 return this->execute(
253 [fn = std::forward<decltype(fn)>(fn),
254 isAborted = (ec == boost::asio::error::operation_aborted)](
255 auto stopToken
256 ) mutable {
257 if constexpr (std::is_void_v<FnRetType>) {
258 std::invoke(
259 std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted
260 );
261 } else {
262 return std::invoke(
263 std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted
264 );
265 }
266 },
267 timeout
268 );
269 }
270 );
271 }
272 }
273
282 [[nodiscard]] auto
284 SomeStdDuration auto interval,
286 ) noexcept(kIS_NOEXCEPT)
287 {
288 if constexpr (not std::is_same_v<
289 decltype(TimerContextProvider::getContext(*this)),
290 decltype(*this)>) {
291 return TimerContextProvider::getContext(*this).executeRepeatedly(
292 interval, std::forward<decltype(fn)>(fn)
293 );
294 } else {
295 return RepeatedOperation(
296 impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn)
297 );
298 }
299 }
300
308 [[nodiscard]] auto
311 std::optional<std::chrono::milliseconds> timeout = std::nullopt
312 ) noexcept(kIS_NOEXCEPT)
313 {
314 return DispatcherType::dispatch(
315 context_,
316 impl::outcomeForHandler<StopSourceType>(fn),
317 ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
318 auto& outcome, auto& stopSource, auto stopToken
319 ) mutable {
320 [[maybe_unused]] auto timeoutHandler = impl::getTimeoutHandleIfNeeded(
321 TimerContextProvider::getContext(*this), timeout, stopSource
322 );
323
324 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
325 if constexpr (std::is_void_v<FnRetType>) {
326 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
327 outcome.setValue();
328 } else {
329 outcome.setValue(
330 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken))
331 );
332 }
333 })
334 );
335 }
336
344 [[nodiscard]] auto
345 execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) noexcept(
347 )
348 {
349 return execute(
350 std::forward<decltype(fn)>(fn),
351 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
352 );
353 }
354
362 [[nodiscard]] auto
364 {
365 return DispatcherType::dispatch(
366 context_,
367 impl::outcomeForHandler<StopSourceType>(fn),
368 ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
369 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
370 if constexpr (std::is_void_v<FnRetType>) {
371 std::invoke(std::forward<decltype(fn)>(fn));
372 outcome.setValue();
373 } else {
374 outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn)));
375 }
376 })
377 );
378 }
379
386 void
388 {
389 DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
390 }
391
397 [[nodiscard]] Strand
399 {
400 return Strand(*this, context_.makeStrand());
401 }
402
406 void
407 stop() const noexcept
408 {
409 context_.stop();
410 }
411
415 void
416 join() const noexcept
417 {
418 context_.join();
419 }
420
429 typename ContextType::Executor&
431 {
432 return context_.getExecutor();
433 }
434};
435
450
460
461} // namespace util::async
A highly configurable execution context.
Definition BasicExecutionContext.hpp:114
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:309
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:398
ContextType::Executor & getExecutor()
Get the underlying executor.
Definition BasicExecutionContext.hpp:430
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:283
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:234
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:345
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:189
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:165
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:363
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:416
void submit(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context without expectations of a result.
Definition BasicExecutionContext.hpp:387
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:407
~BasicExecutionContext() override
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:172
Definition Timer.hpp:8
Definition Cancellation.hpp:34
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:165
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:157
Specifies that the type must be some std::duration.
Definition Concepts.hpp:173
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:17
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:458
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:203
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:194
BasicExecutionContext< impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy > CoroExecutionContext
A Boost.Coroutine-based (asio yield_context) execution context.
Definition BasicExecutionContext.hpp:446
Tag type for identifying execution context types.
Definition Concepts.hpp:20
Definition BasicExecutionContext.hpp:55
Definition BasicExecutionContext.hpp:42
Definition ErrorHandling.hpp:16
Definition Execution.hpp:13