Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
BasicExecutionContext.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Assert.hpp"
23#include "util/async/Concepts.hpp"
24#include "util/async/Error.hpp"
25#include "util/async/Operation.hpp"
26#include "util/async/Outcome.hpp"
27#include "util/async/context/impl/Cancellation.hpp"
28#include "util/async/context/impl/Execution.hpp"
29#include "util/async/context/impl/Strand.hpp"
30#include "util/async/context/impl/Timer.hpp"
31#include "util/async/context/impl/Utils.hpp"
32#include "util/async/impl/ErrorHandling.hpp"
33
34#include <boost/asio.hpp>
35#include <boost/asio/error.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/thread_pool.hpp>
38
39#include <chrono>
40#include <cstddef>
41#include <expected>
42#include <memory>
43#include <optional>
44#include <type_traits>
45#include <utility>
46
57namespace util::async {
58namespace impl {
59
61 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
62 using Timer = SteadyTimer<Executor>;
63
64 Executor&
65 getExecutor()
66 {
67 return executor;
68 }
69
70 Executor executor;
71};
72
73struct AsioPoolContext {
74 using Executor = boost::asio::thread_pool;
75 using Timer = SteadyTimer<Executor>;
76 using Strand = AsioPoolStrandContext;
77
78 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
79 {
80 }
81
82 AsioPoolContext(AsioPoolContext const&) = delete;
83 AsioPoolContext(AsioPoolContext&&) = default;
84
85 Strand
86 makeStrand() const
87 {
88 ASSERT(executor, "Called after executor was moved from.");
89 return {boost::asio::make_strand(*executor)};
90 }
91
92 void
93 stop() const
94 {
95 if (executor) // don't call if executor was moved from
96 executor->stop();
97 }
98
99 void
100 join() const
101 {
102 if (executor) // don't call if executor was moved from
103 executor->join();
104 }
105
106 Executor&
107 getExecutor() const
108 {
109 ASSERT(executor, "Called after executor was moved from.");
110 return *executor;
111 }
112
113 std::unique_ptr<Executor> executor;
114};
115
116} // namespace impl
117
126template <
127 typename ContextType,
128 typename StopSourceType,
129 typename DispatcherType,
130 typename TimerContextProvider = impl::SelfContextProvider,
131 typename ErrorHandlerType = impl::DefaultErrorHandler>
133 ContextType context_;
134
138
139public:
141 static constexpr bool kIS_NOEXCEPT = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; })) and
142 noexcept(ErrorHandlerType::catchAndAssert([] { throw 0; }));
143
144 using ContextHolderType = ContextType;
145
146 using ExecutorType = typename ContextHolderType::Executor;
147
148 template <typename T>
149 using ValueType = std::expected<T, ExecutionError>;
150
151 using StopSource = StopSourceType;
152
153 using StopToken = typename StopSourceType::Token;
154
155 template <typename T>
156 using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
157
158 template <typename T>
160
161 using Strand = impl::
162 BasicStrand<BasicExecutionContext, StopSourceType, DispatcherType, TimerContextProvider, ErrorHandlerType>;
163
164 using Timer = typename ContextHolderType::Timer;
165
166 // note: scheduled operations are always stoppable
167 template <typename T>
169
170 // note: repeating operations are always stoppable and must return void
171 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
172
178 explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
179 {
180 }
181
186 {
187 stop();
188 }
189
192
201 [[nodiscard]] auto
203 SomeStdDuration auto delay,
205 std::optional<std::chrono::milliseconds> timeout = std::nullopt
206 ) noexcept(kIS_NOEXCEPT)
207 {
208 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
209 return TimerContextProvider::getContext(*this).scheduleAfter(
210 delay, std::forward<decltype(fn)>(fn), timeout
211 );
212 } else {
213 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
215 impl::extractAssociatedExecutor(*this),
216 delay,
217 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
218 return this->execute(
219 [fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable {
220 if constexpr (std::is_void_v<FnRetType>) {
221 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
222 } else {
223 return std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
224 }
225 },
226 timeout
227 );
228 }
229 );
230 }
231 }
232
241 [[nodiscard]] auto
243 SomeStdDuration auto delay,
245 std::optional<std::chrono::milliseconds> timeout = std::nullopt
246 ) noexcept(kIS_NOEXCEPT)
247 {
248 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
249 return TimerContextProvider::getContext(*this).scheduleAfter(
250 delay, std::forward<decltype(fn)>(fn), timeout
251 );
252 } else {
253 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken, bool>>;
255 impl::extractAssociatedExecutor(*this),
256 delay,
257 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
258 return this->execute(
259 [fn = std::forward<decltype(fn)>(fn),
260 isAborted = (ec == boost::asio::error::operation_aborted)](auto stopToken) mutable {
261 if constexpr (std::is_void_v<FnRetType>) {
262 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted);
263 } else {
264 return std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted);
265 }
266 },
267 timeout
268 );
269 }
270 );
271 }
272 }
273
282 [[nodiscard]] auto
284 {
285 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
286 return TimerContextProvider::getContext(*this).executeRepeatedly(interval, std::forward<decltype(fn)>(fn));
287 } else {
288 return RepeatedOperation(impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn));
289 }
290 }
291
299 [[nodiscard]] auto
302 std::optional<std::chrono::milliseconds> timeout = std::nullopt
303 ) noexcept(kIS_NOEXCEPT)
304 {
305 return DispatcherType::dispatch(
306 context_,
307 impl::outcomeForHandler<StopSourceType>(fn),
308 ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
309 auto& outcome, auto& stopSource, auto stopToken
310 ) mutable {
311 [[maybe_unused]] auto timeoutHandler =
312 impl::getTimeoutHandleIfNeeded(TimerContextProvider::getContext(*this), timeout, stopSource);
313
314 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
315 if constexpr (std::is_void_v<FnRetType>) {
316 std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
317 outcome.setValue();
318 } else {
319 outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
320 }
321 })
322 );
323 }
324
332 [[nodiscard]] auto
334 {
335 return execute(
336 std::forward<decltype(fn)>(fn),
337 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
338 );
339 }
340
347 [[nodiscard]] auto
349 {
350 return DispatcherType::dispatch(
351 context_,
352 impl::outcomeForHandler<StopSourceType>(fn),
353 ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
354 using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
355 if constexpr (std::is_void_v<FnRetType>) {
356 std::invoke(std::forward<decltype(fn)>(fn));
357 outcome.setValue();
358 } else {
359 outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn)));
360 }
361 })
362 );
363 }
364
371 void
373 {
374 DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
375 }
376
382 [[nodiscard]] Strand
384 {
385 return Strand(*this, context_.makeStrand());
386 }
387
391 void
392 stop() const noexcept
393 {
394 context_.stop();
395 }
396
400 void
401 join() const noexcept
402 {
403 context_.join();
404 }
405};
406
419
429
430} // namespace util::async
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:300
~BasicExecutionContext()
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:185
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:383
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:283
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:242
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:333
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:202
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:178
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:348
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:401
void submit(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context without expectations of a result.
Definition BasicExecutionContext.hpp:372
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:392
Definition Timer.hpp:27
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:162
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:154
Specifies that the type must be some std::duration.
Definition Concepts.hpp:170
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:218
BasicExecutionContext< impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy > CoroExecutionContext
A Boost.Coroutine-based (asio yield_context) execution context.
Definition BasicExecutionContext.hpp:417
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:209
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:427
Definition BasicExecutionContext.hpp:60
Definition ErrorHandling.hpp:35