Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
BasicExecutionContext.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Assert.hpp"
23#include "util/async/Concepts.hpp"
24#include "util/async/Error.hpp"
25#include "util/async/Operation.hpp"
26#include "util/async/Outcome.hpp"
27#include "util/async/context/impl/Cancellation.hpp"
28#include "util/async/context/impl/Execution.hpp"
29#include "util/async/context/impl/Strand.hpp"
30#include "util/async/context/impl/Timer.hpp"
31#include "util/async/context/impl/Utils.hpp"
32#include "util/async/impl/ErrorHandling.hpp"
33
34#include <boost/asio.hpp>
35#include <boost/asio/error.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/thread_pool.hpp>
38
39#include <chrono>
40#include <cstddef>
41#include <expected>
42#include <memory>
43#include <optional>
44#include <type_traits>
45#include <utility>
46
57namespace util::async {
58namespace impl {
59
61 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
63
64 Executor const&
65 getExecutor() const
66 {
67 return executor;
68 }
69
70 Executor executor;
71};
72
74 using Executor = boost::asio::thread_pool;
77
78 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
79 {
80 }
81
82 AsioPoolContext(AsioPoolContext const&) = delete;
84
85 Strand
86 makeStrand() const
87 {
88 ASSERT(executor, "Called after executor was moved from.");
89 return {boost::asio::make_strand(*executor)};
90 }
91
92 void
93 stop() const
94 {
95 if (executor) // don't call if executor was moved from
96 executor->stop();
97 }
98
99 void
100 join() const
101 {
102 if (executor) // don't call if executor was moved from
103 executor->join();
104 }
105
106 Executor&
107 getExecutor() const
108 {
109 ASSERT(executor, "Called after executor was moved from.");
110 return *executor;
111 }
112
113 std::unique_ptr<Executor> executor;
114};
115
116} // namespace impl
117
126template <
127 typename ContextType,
128 typename StopSourceType,
129 typename DispatcherType,
130 typename TimerContextProvider = impl::SelfContextProvider,
131 typename ErrorHandlerType = impl::DefaultErrorHandler>
133 ContextType context_;
134
139public:
141 static constexpr bool kIS_NOEXCEPT = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; }));
142
143 using ContextHolderType = ContextType;
144
145 using ExecutorType = typename ContextHolderType::Executor;
146
147 template <typename T>
148 using ValueType = std::expected<T, ExecutionError>;
149
150 using StopSource = StopSourceType;
151
152 using StopToken = typename StopSourceType::Token;
153
154 template <typename T>
156
157 template <typename T>
159
160 using Strand = impl::
161 BasicStrand<BasicExecutionContext, StopSourceType, DispatcherType, TimerContextProvider, ErrorHandlerType>;
162
163 using Timer = typename ContextHolderType::Timer;
164
165 // note: scheduled operations are always stoppable
166 template <typename T>
168
169 // note: repeating operations are always stoppable and must return void
170 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
171
177 explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
178 {
179 }
180
185 {
186 stop();
187 }
188
191
200 [[nodiscard]] auto
202 SomeStdDuration auto delay,
204 std::optional<std::chrono::milliseconds> timeout = std::nullopt
205 ) noexcept(kIS_NOEXCEPT)
206 {
207 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
208 return TimerContextProvider::getContext(*this).scheduleAfter(
209 delay, std::forward<decltype(fn)>(fn), timeout
210 );
211 } else {
212 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
213 return ScheduledOperation<FnRetType>(
214 impl::extractAssociatedExecutor(*this),
215 delay,
216 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
217 return this->execute(
218 [fn = std::forward<decltype(fn)>(fn)](auto stopToken) {
219 if constexpr (std::is_void_v<FnRetType>) {
220 fn(std::move(stopToken));
221 } else {
222 return fn(std::move(stopToken));
223 }
224 },
225 timeout
226 );
227 }
228 );
229 }
230 }
231
240 [[nodiscard]] auto
242 SomeStdDuration auto delay,
244 std::optional<std::chrono::milliseconds> timeout = std::nullopt
245 ) noexcept(kIS_NOEXCEPT)
246 {
247 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
248 return TimerContextProvider::getContext(*this).scheduleAfter(
249 delay, std::forward<decltype(fn)>(fn), timeout
250 );
251 } else {
252 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>(), true))>;
253 return ScheduledOperation<FnRetType>(
254 impl::extractAssociatedExecutor(*this),
255 delay,
256 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
257 return this->execute(
258 [fn = std::forward<decltype(fn)>(fn),
259 isAborted = (ec == boost::asio::error::operation_aborted)](auto stopToken) {
260 if constexpr (std::is_void_v<FnRetType>) {
261 fn(std::move(stopToken), isAborted);
262 } else {
263 return fn(std::move(stopToken), isAborted);
264 }
265 },
266 timeout
267 );
268 }
269 );
270 }
271 }
272
280 [[nodiscard]] auto
282 {
283 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
284 return TimerContextProvider::getContext(*this).executeRepeatedly(interval, std::forward<decltype(fn)>(fn));
285 } else {
286 return RepeatedOperation(impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn));
287 }
288 }
289
297 [[nodiscard]] auto
300 std::optional<std::chrono::milliseconds> timeout = std::nullopt
301 ) noexcept(kIS_NOEXCEPT)
302 {
303 return DispatcherType::dispatch(
304 context_,
305 impl::outcomeForHandler<StopSourceType>(fn),
306 ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
307 auto& outcome, auto& stopSource, auto stopToken
308 ) mutable {
309 [[maybe_unused]] auto timeoutHandler =
310 impl::getTimeoutHandleIfNeeded(TimerContextProvider::getContext(*this), timeout, stopSource);
311
312 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
313 if constexpr (std::is_void_v<FnRetType>) {
314 fn(std::move(stopToken));
315 outcome.setValue();
316 } else {
317 outcome.setValue(fn(std::move(stopToken)));
318 }
319 })
320 );
321 }
322
330 [[nodiscard]] auto
332 {
333 return execute(
334 std::forward<decltype(fn)>(fn),
335 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
336 );
337 }
338
345 [[nodiscard]] auto
347 {
348 return DispatcherType::dispatch(
349 context_,
350 impl::outcomeForHandler<StopSourceType>(fn),
351 ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
352 using FnRetType = std::decay_t<decltype(fn())>;
353 if constexpr (std::is_void_v<FnRetType>) {
354 fn();
355 outcome.setValue();
356 } else {
357 outcome.setValue(fn());
358 }
359 })
360 );
361 }
362
368 [[nodiscard]] Strand
370 {
371 return Strand(*this, context_.makeStrand());
372 }
373
377 void
378 stop() const noexcept
379 {
380 context_.stop();
381 }
382
386 void
387 join() const noexcept
388 {
389 context_.join();
390 }
391};
392
405
415
416} // namespace util::async
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:298
~BasicExecutionContext()
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:184
static constexpr bool kIS_NOEXCEPT
Whether operations on this execution context are noexcept.
Definition BasicExecutionContext.hpp:141
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:369
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:281
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:241
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:331
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:201
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:177
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:346
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:387
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:378
The future side of async operations that automatically repeat until aborted.
Definition Operation.hpp:228
The future side of async operations that can be stopped.
Definition Outcome.hpp:30
Definition Outcome.hpp:35
Definition Timer.hpp:27
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:154
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:146
Specifies that the type must be some std::duration.
Definition Concepts.hpp:162
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36
Definition BasicExecutionContext.hpp:73
Definition BasicExecutionContext.hpp:60
Definition ErrorHandling.hpp:34