Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
BasicExecutionContext.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Assert.hpp"
23#include "util/async/Concepts.hpp"
24#include "util/async/Error.hpp"
25#include "util/async/Operation.hpp"
26#include "util/async/Outcome.hpp"
27#include "util/async/context/impl/Cancellation.hpp"
28#include "util/async/context/impl/Execution.hpp"
29#include "util/async/context/impl/Strand.hpp"
30#include "util/async/context/impl/Timer.hpp"
31#include "util/async/context/impl/Utils.hpp"
32#include "util/async/impl/ErrorHandling.hpp"
33
34#include <boost/asio.hpp>
35#include <boost/asio/error.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/thread_pool.hpp>
38
39#include <chrono>
40#include <cstddef>
41#include <expected>
42#include <memory>
43#include <optional>
44#include <type_traits>
45#include <utility>
46
57namespace util::async {
58namespace impl {
59
61 using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
63
64 Executor&
65 getExecutor()
66 {
67 return executor;
68 }
69
70 Executor executor;
71};
72
74 using Executor = boost::asio::thread_pool;
77
78 AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
79 {
80 }
81
82 AsioPoolContext(AsioPoolContext const&) = delete;
84
85 Strand
86 makeStrand() const
87 {
88 ASSERT(executor, "Called after executor was moved from.");
89 return {boost::asio::make_strand(*executor)};
90 }
91
92 void
93 stop() const
94 {
95 if (executor) // don't call if executor was moved from
96 executor->stop();
97 }
98
99 void
100 join() const
101 {
102 if (executor) // don't call if executor was moved from
103 executor->join();
104 }
105
106 Executor&
107 getExecutor() const
108 {
109 ASSERT(executor, "Called after executor was moved from.");
110 return *executor;
111 }
112
113 std::unique_ptr<Executor> executor;
114};
115
116} // namespace impl
117
126template <
127 typename ContextType,
128 typename StopSourceType,
129 typename DispatcherType,
130 typename TimerContextProvider = impl::SelfContextProvider,
131 typename ErrorHandlerType = impl::DefaultErrorHandler>
133 ContextType context_;
134
139public:
141 static constexpr bool kIS_NOEXCEPT = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; }));
142
143 using ContextHolderType = ContextType;
144
145 using ExecutorType = typename ContextHolderType::Executor;
146
147 template <typename T>
148 using ValueType = std::expected<T, ExecutionError>;
149
150 using StopSource = StopSourceType;
151
152 using StopToken = typename StopSourceType::Token;
153
154 template <typename T>
156
157 template <typename T>
159
160 using Strand = impl::
161 BasicStrand<BasicExecutionContext, StopSourceType, DispatcherType, TimerContextProvider, ErrorHandlerType>;
162
163 using Timer = typename ContextHolderType::Timer;
164
165 // note: scheduled operations are always stoppable
166 template <typename T>
168
169 // note: repeating operations are always stoppable and must return void
170 using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
171
177 explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
178 {
179 }
180
185 {
186 stop();
187 }
188
191
200 [[nodiscard]] auto
202 SomeStdDuration auto delay,
204 std::optional<std::chrono::milliseconds> timeout = std::nullopt
205 ) noexcept(kIS_NOEXCEPT)
206 {
207 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
208 return TimerContextProvider::getContext(*this).scheduleAfter(
209 delay, std::forward<decltype(fn)>(fn), timeout
210 );
211 } else {
212 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
213 return ScheduledOperation<FnRetType>(
214 impl::extractAssociatedExecutor(*this),
215 delay,
216 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
217 return this->execute(
218 [fn = std::forward<decltype(fn)>(fn)](auto stopToken) {
219 if constexpr (std::is_void_v<FnRetType>) {
220 fn(std::move(stopToken));
221 } else {
222 return fn(std::move(stopToken));
223 }
224 },
225 timeout
226 );
227 }
228 );
229 }
230 }
231
240 [[nodiscard]] auto
242 SomeStdDuration auto delay,
244 std::optional<std::chrono::milliseconds> timeout = std::nullopt
245 ) noexcept(kIS_NOEXCEPT)
246 {
247 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
248 return TimerContextProvider::getContext(*this).scheduleAfter(
249 delay, std::forward<decltype(fn)>(fn), timeout
250 );
251 } else {
252 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>(), true))>;
253 return ScheduledOperation<FnRetType>(
254 impl::extractAssociatedExecutor(*this),
255 delay,
256 [this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
257 return this->execute(
258 [fn = std::forward<decltype(fn)>(fn),
259 isAborted = (ec == boost::asio::error::operation_aborted)](auto stopToken) {
260 if constexpr (std::is_void_v<FnRetType>) {
261 fn(std::move(stopToken), isAborted);
262 } else {
263 return fn(std::move(stopToken), isAborted);
264 }
265 },
266 timeout
267 );
268 }
269 );
270 }
271 }
272
281 [[nodiscard]] auto
283 {
284 if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
285 return TimerContextProvider::getContext(*this).executeRepeatedly(interval, std::forward<decltype(fn)>(fn));
286 } else {
287 return RepeatedOperation(impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn));
288 }
289 }
290
298 [[nodiscard]] auto
301 std::optional<std::chrono::milliseconds> timeout = std::nullopt
302 ) noexcept(kIS_NOEXCEPT)
303 {
304 return DispatcherType::dispatch(
305 context_,
306 impl::outcomeForHandler<StopSourceType>(fn),
307 ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
308 auto& outcome, auto& stopSource, auto stopToken
309 ) mutable {
310 [[maybe_unused]] auto timeoutHandler =
311 impl::getTimeoutHandleIfNeeded(TimerContextProvider::getContext(*this), timeout, stopSource);
312
313 using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
314 if constexpr (std::is_void_v<FnRetType>) {
315 fn(std::move(stopToken));
316 outcome.setValue();
317 } else {
318 outcome.setValue(fn(std::move(stopToken)));
319 }
320 })
321 );
322 }
323
331 [[nodiscard]] auto
333 {
334 return execute(
335 std::forward<decltype(fn)>(fn),
336 std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
337 );
338 }
339
346 [[nodiscard]] auto
348 {
349 return DispatcherType::dispatch(
350 context_,
351 impl::outcomeForHandler<StopSourceType>(fn),
352 ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
353 using FnRetType = std::decay_t<decltype(fn())>;
354 if constexpr (std::is_void_v<FnRetType>) {
355 fn();
356 outcome.setValue();
357 } else {
358 outcome.setValue(fn());
359 }
360 })
361 );
362 }
363
369 [[nodiscard]] Strand
371 {
372 return Strand(*this, context_.makeStrand());
373 }
374
378 void
379 stop() const noexcept
380 {
381 context_.stop();
382 }
383
387 void
388 join() const noexcept
389 {
390 context_.join();
391 }
392};
393
406
416
417} // namespace util::async
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:299
~BasicExecutionContext()
Stops the underlying thread pool.
Definition BasicExecutionContext.hpp:184
static constexpr bool kIS_NOEXCEPT
Whether operations on this execution context are noexcept.
Definition BasicExecutionContext.hpp:141
Strand makeStrand()
Create a strand for this execution context.
Definition BasicExecutionContext.hpp:370
auto executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule a repeating operation on the execution context.
Definition BasicExecutionContext.hpp:282
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken, bool > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:241
auto execute(SomeHandlerWith< StopToken > auto &&fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:332
auto scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:201
BasicExecutionContext(std::size_t numThreads=1) noexcept
Create a new execution context with the given number of threads.
Definition BasicExecutionContext.hpp:177
auto execute(SomeHandlerWithoutStopToken auto &&fn) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:347
void join() const noexcept
Block until all operations are completed.
Definition BasicExecutionContext.hpp:388
void stop() const noexcept
Stop the execution context as soon as possible.
Definition BasicExecutionContext.hpp:379
The future side of async operations that automatically repeat until aborted.
Definition Operation.hpp:229
The future side of async operations that can be stopped.
Definition Outcome.hpp:30
Definition Outcome.hpp:35
Definition Timer.hpp:27
Specifies the interface for a handler that can be invoked with the specified args.
Definition Concepts.hpp:162
Specifies the interface for a handler that can be stopped.
Definition Concepts.hpp:154
Specifies that the type must be some std::duration.
Definition Concepts.hpp:170
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36
Definition BasicExecutionContext.hpp:73
Definition BasicExecutionContext.hpp:60
Definition ErrorHandling.hpp:34