Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Operation.hpp
1#pragma once
2
3#include "util/MoveTracker.hpp"
4#include "util/Repeat.hpp"
5#include "util/async/Concepts.hpp"
6#include "util/async/Outcome.hpp"
7#include "util/async/context/impl/Cancellation.hpp"
8#include "util/async/context/impl/Timer.hpp"
9
10#include <fmt/format.h>
11
12#include <chrono>
13#include <concepts>
14#include <condition_variable>
15#include <expected>
16#include <functional>
17#include <future>
18#include <memory>
19#include <mutex>
20#include <optional>
21
22namespace util::async {
23namespace impl {
24
25template <typename OutcomeType>
26class BasicOperation {
27protected:
28 std::future<typename OutcomeType::DataType> future_;
29
30public:
31 using DataType = typename OutcomeType::DataType;
32
33 explicit BasicOperation(OutcomeType* outcome) : future_{outcome->getStdFuture()}
34 {
35 }
36
37 BasicOperation(BasicOperation&&) = default;
38
39 BasicOperation(BasicOperation const&) = delete;
40
41 [[nodiscard]] auto
42 get()
43 {
44 return future_.get();
45 }
46
47 void
48 wait()
49 {
50 future_.wait();
51 }
52};
53
54template <typename CtxType, typename OpType>
55struct BasicScheduledOperation : util::MoveTracker {
56 class State {
57 std::mutex m_;
58 std::condition_variable ready_;
59 std::optional<OpType> op_{std::nullopt};
60
61 public:
62 void
63 emplace(auto&& op)
64 {
65 std::lock_guard const lock{m_};
66 op_.emplace(std::forward<decltype(op)>(op));
67 ready_.notify_all();
68 }
69
70 [[nodiscard]] OpType&
71 get()
72 {
73 std::unique_lock lock{m_};
74 ready_.wait(lock, [this] { return op_.has_value(); });
75 return op_.value();
76 }
77 };
78
79 std::shared_ptr<State> state = std::make_shared<State>();
80 typename CtxType::Timer timer;
81
82 BasicScheduledOperation(auto& executor, auto delay, auto&& fn)
83 : timer(
84 executor,
85 delay,
86 [state = state, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
87 state->emplace(fn(ec));
88 }
89 )
90 {
91 }
92
93 ~BasicScheduledOperation() override
94 {
95 if (not wasMoved())
96 abort();
97 }
98
99 BasicScheduledOperation(BasicScheduledOperation const&) = default;
100 BasicScheduledOperation&
101 operator=(BasicScheduledOperation const&) = default;
102 BasicScheduledOperation(BasicScheduledOperation&&) = default;
103 BasicScheduledOperation&
104 operator=(BasicScheduledOperation&&) = default;
105
106 [[nodiscard]] auto
107 get()
108 {
109 return state->get().get();
110 }
111
112 void
113 wait() noexcept
114 {
115 state->get().wait();
116 }
117
118 void
119 cancel() noexcept
120 {
121 timer.cancel();
122 }
123
124 void
125 requestStop() noexcept
126 requires(SomeStoppableOperation<OpType>)
127 {
128 state->get().requestStop();
129 }
130
131 void
132 abort() noexcept
133 {
134 cancel();
135
136 if constexpr (SomeStoppableOperation<OpType>)
137 requestStop();
138 }
139};
140
141} // namespace impl
142
149template <typename RetType, typename StopSourceType>
150class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>>,
151 public util::MoveTracker {
153
154 StopSourceType stopSource_;
155
156public:
162 explicit StoppableOperation(OutcomeType* outcome)
163 : impl::BasicOperation<OutcomeType>(outcome), stopSource_(outcome->getStopSource())
164 {
165 }
166
167 ~StoppableOperation() override
168 {
169 if (not wasMoved())
170 requestStop();
171 }
172
173 StoppableOperation(StoppableOperation const&) = delete;
174 StoppableOperation&
175 operator=(StoppableOperation const&) = delete;
176 StoppableOperation(StoppableOperation&&) = default;
177 StoppableOperation&
178 operator=(StoppableOperation&&) = default;
179
181 void
182 requestStop() noexcept
183 {
184 stopSource_.requestStop();
185 }
186};
187
193template <typename RetType>
195
202template <typename CtxType, typename OpType>
204
214template <typename CtxType>
216 util::Repeat repeat_;
217 std::function<void()> action_;
218
219public:
228 template <std::invocable FnType>
229 RepeatingOperation(auto& executor, std::chrono::steady_clock::duration interval, FnType&& fn)
230 : repeat_(executor)
231 , action_([fn = std::forward<FnType>(fn), &executor] { boost::asio::post(executor, fn); })
232 {
233 repeat_.start(interval, action_);
234 }
235
236 ~RepeatingOperation() override
237 {
238 if (not wasMoved())
239 abort();
240 }
241
242 RepeatingOperation(RepeatingOperation const&) = delete;
243 RepeatingOperation&
244 operator=(RepeatingOperation const&) = delete;
245 RepeatingOperation(RepeatingOperation&&) = default;
246 RepeatingOperation&
247 operator=(RepeatingOperation&&) = default;
248
254 void
255 abort() noexcept
256 {
257 repeat_.stop();
258 }
259
266 void
268 {
269 action_();
270 }
271};
272
273} // namespace util::async
A base-class that can be used to check whether the current instance was moved from.
Definition MoveTracker.hpp:10
A class to repeat some action at a regular interval.
Definition Repeat.hpp:25
void start(std::chrono::steady_clock::duration interval, Action &&action)
Start asynchronously repeating.
Definition Repeat.hpp:76
void invoke()
Force-invoke the operation.
Definition Operation.hpp:267
RepeatingOperation(auto &executor, std::chrono::steady_clock::duration interval, FnType &&fn)
Construct a new Repeating Operation object.
Definition Operation.hpp:229
void abort() noexcept
Aborts the operation and the repeating timer.
Definition Operation.hpp:255
The future side of async operations that can be stopped.
Definition Operation.hpp:151
StoppableOperation(OutcomeType *outcome)
Construct a new Stoppable Operation object.
Definition Operation.hpp:162
void requestStop() noexcept
Requests the operation to stop.
Definition Operation.hpp:182
Stoppable outcome.
Definition Outcome.hpp:97
Definition Operation.hpp:26
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:17
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:203
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:194