Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
BasicExecutionContext.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Assert.hpp"
23#include "util/async/Concepts.hpp"
24#include "util/async/Error.hpp"
25#include "util/async/Operation.hpp"
26#include "util/async/Outcome.hpp"
27#include "util/async/context/impl/Cancellation.hpp"
28#include "util/async/context/impl/Execution.hpp"
29#include "util/async/context/impl/Strand.hpp"
30#include "util/async/context/impl/Timer.hpp"
31#include "util/async/context/impl/Utils.hpp"
32#include "util/async/impl/ErrorHandling.hpp"
33
34#include <boost/asio.hpp>
35#include <boost/asio/error.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/thread_pool.hpp>
38
39#include <chrono>
40#include <cstddef>
41#include <expected>
42#include <memory>
43#include <optional>
44#include <type_traits>
45#include <utility>
46
58namespace util::async {
59namespace impl {
60
62 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
63 using Timer = SteadyTimer<Executor>;
64
65 Executor&
66 getExecutor()
67 {
68 return executor;
69 }
70
71 Executor executor;
72};
73
74struct AsioPoolContext {
75 using Executor = boost::asio::thread_pool;
76 using Timer = SteadyTimer<Executor>;
77 using Strand = AsioPoolStrandContext;
78
79 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
80 {
81 }
82
83 AsioPoolContext(AsioPoolContext const&) = delete;
84 AsioPoolContext(AsioPoolContext&&) = default;
85
86 Strand
87 makeStrand() const
88 {
89 ASSERT(executor, "Called after executor was moved from.");
90 return {boost::asio::make_strand(*executor)};
91 }
92
93 void
94 stop() const
95 {
96 if (executor) // don't call if executor was moved from
97 executor->stop();
98 }
99
100 void
101 join() const
102 {
103 if (executor) // don't call if executor was moved from
104 executor->join();
105 }
106
107 Executor&
108 getExecutor() const
109 {
110 ASSERT(executor, "Called after executor was moved from.");
111 return *executor;
112 }
113
114 std::unique_ptr<Executor> executor;
115};
116
117} // namespace impl
118
127template <
128 typename ContextType,
129 typename StopSourceType,
130 typename DispatcherType,
131 typename TimerContextProvider = impl::SelfContextProvider,
132 typename ErrorHandlerType = impl::DefaultErrorHandler>
134 ContextType context_;
135
139
140public:
142 static constexpr bool kIS_NOEXCEPT =
143 noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; })) and
144 noexcept(ErrorHandlerType::catchAndAssert([] { throw 0; }));
145
146 using ContextHolderType = ContextType;
147
148 using ExecutorType = typename ContextHolderType::Executor;
149
150 template <typename T>
151 using ValueType = std::expected<T, ExecutionError>;
152
153 using StopSource = StopSourceType;
154
155 using StopToken = typename StopSourceType::Token;
156
157 template <typename T>
158 using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
159
160 template <typename T>
162
163 using Strand = impl::BasicStrand<
165 StopSourceType,
166 DispatcherType,
167 TimerContextProvider,
168 ErrorHandlerType>;
169
170 using Timer = typename ContextHolderType::Timer;
171
172 // note: scheduled operations are always stoppable
173 template <typename T>
175
176 // note: repeating operations are always stoppable and must return void
177 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
178
184 explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
185 {
186 }
187
192 {
193 stop();
194 }
195
198
207 [[nodiscard]] auto
209 SomeStdDuration auto delay,
211 std::optional<std::chrono::milliseconds> timeout = std::nullopt
212 ) noexcept(kIS_NOEXCEPT)
213 {
214 if constexpr (not std::is_same_v<
215 decltype(TimerContextProvider::getContext(*this)),
216 decltype(*this)>) {
217 return TimerContextProvider::getContext(*this).scheduleAfter(
218 delay, std::forward<decltype(fn)>(fn), timeout
219 );
220 } else {
221 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
223 impl::extractAssociatedExecutor(*this),
224 delay,
225 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
226 return this->execute(
227 [fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable {
228 if constexpr (std::is_void_v<FnRetType>) {
229 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
230 } else {
231 return std::invoke(
232 std::forward<decltype(fn)>(fn), std::move(stopToken)
233 );
234 }
235 },
236 timeout
237 );
238 }
239 );
240 }
241 }
242
252 [[nodiscard]] auto
254 SomeStdDuration auto delay,
256 std::optional<std::chrono::milliseconds> timeout = std::nullopt
257 ) noexcept(kIS_NOEXCEPT)
258 {
259 if constexpr (not std::is_same_v<
260 decltype(TimerContextProvider::getContext(*this)),
261 decltype(*this)>) {
262 return TimerContextProvider::getContext(*this).scheduleAfter(
263 delay, std::forward<decltype(fn)>(fn), timeout
264 );
265 } else {
266 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken, bool>>;
268 impl::extractAssociatedExecutor(*this),
269 delay,
270 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
271 return this->execute(
272 [fn = std::forward<decltype(fn)>(fn),
273 isAborted = (ec == boost::asio::error::operation_aborted)](
274 auto stopToken
275 ) mutable {
276 if constexpr (std::is_void_v<FnRetType>) {
277 std::invoke(
278 std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted
279 );
280 } else {
281 return std::invoke(
282 std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted
283 );
284 }
285 },
286 timeout
287 );
288 }
289 );
290 }
291 }
292
301 [[nodiscard]] auto
303 SomeStdDuration auto interval,
305 ) noexcept(kIS_NOEXCEPT)
306 {
307 if constexpr (not std::is_same_v<
308 decltype(TimerContextProvider::getContext(*this)),
309 decltype(*this)>) {
310 return TimerContextProvider::getContext(*this).executeRepeatedly(
311 interval, std::forward<decltype(fn)>(fn)
312 );
313 } else {
314 return RepeatedOperation(
315 impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn)
316 );
317 }
318 }
319
327 [[nodiscard]] auto
330 std::optional<std::chrono::milliseconds> timeout = std::nullopt
331 ) noexcept(kIS_NOEXCEPT)
332 {
333 return DispatcherType::dispatch(
334 context_,
335 impl::outcomeForHandler<StopSourceType>(fn),
336 ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
337 auto& outcome, auto& stopSource, auto stopToken
338 ) mutable {
339 [[maybe_unused]] auto timeoutHandler = impl::getTimeoutHandleIfNeeded(
340 TimerContextProvider::getContext(*this), timeout, stopSource
341 );
342
343 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
344 if constexpr (std::is_void_v<FnRetType>) {
345 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
346 outcome.setValue();
347 } else {
348 outcome.setValue(
349 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken))
350 );
351 }
352 })
353 );
354 }
355
363 [[nodiscard]] auto
364 execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) noexcept(
366 )
367 {
368 return execute(
369 std::forward<decltype(fn)>(fn),
370 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
371 );
372 }
373
381 [[nodiscard]] auto
383 {
384 return DispatcherType::dispatch(
385 context_,
386 impl::outcomeForHandler<StopSourceType>(fn),
387 ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
388 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
389 if constexpr (std::is_void_v<FnRetType>) {
390 std::invoke(std::forward<decltype(fn)>(fn));
391 outcome.setValue();
392 } else {
393 outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn)));
394 }
395 })
396 );
397 }
398
405 void
407 {
408 DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
409 }
410
416 [[nodiscard]] Strand
418 {
419 return Strand(*this, context_.makeStrand());
420 }
421
425 void
426 stop() const noexcept
427 {
428 context_.stop();
429 }
430
434 void
435 join() const noexcept
436 {
437 context_.join();
438 }
439
448 typename ContextType::Executor&
450 {
451 return context_.getExecutor();
452 }
453};
454
469
479
480} // namespace util::async
A highly configurable execution context.
Definition BasicExecutionContext.hpp:133
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:328
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:417
ContextType::Executor & getExecutor()
Get the underlying executor.
Definition BasicExecutionContext.hpp:449
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:302
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:253
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:364
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:208
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:184
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:382
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:435
void submit(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context without expectations of a result.
Definition BasicExecutionContext.hpp:406
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:426
~BasicExecutionContext() override
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:191
Definition Timer.hpp:27
Definition Cancellation.hpp:53
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:184
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:176
Specifies that the type must be some std::duration.
Definition Concepts.hpp:192
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:477
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:222
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:213
BasicExecutionContext< impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy > CoroExecutionContext
A Boost.Coroutine-based (asio yield_context) execution context.
Definition BasicExecutionContext.hpp:465
Tag type for identifying execution context types.
Definition Concepts.hpp:39
Definition BasicExecutionContext.hpp:74
Definition BasicExecutionContext.hpp:61
Definition ErrorHandling.hpp:35
Definition Execution.hpp:32