Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Operation.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/MoveTracker.hpp"
23#include "util/Repeat.hpp"
24#include "util/async/Concepts.hpp"
25#include "util/async/Outcome.hpp"
26#include "util/async/context/impl/Cancellation.hpp"
27#include "util/async/context/impl/Timer.hpp"
28
29#include <fmt/core.h>
30
31#include <chrono>
32#include <concepts>
33#include <condition_variable>
34#include <expected>
35#include <functional>
36#include <future>
37#include <memory>
38#include <mutex>
39#include <optional>
40
41namespace util::async {
42namespace impl {
43
44template <typename OutcomeType>
45class BasicOperation {
46protected:
47 std::future<typename OutcomeType::DataType> future_;
48
49public:
50 using DataType = typename OutcomeType::DataType;
51
52 explicit BasicOperation(OutcomeType* outcome) : future_{outcome->getStdFuture()}
53 {
54 }
55
56 BasicOperation(BasicOperation&&) = default;
57
58 BasicOperation(BasicOperation const&) = delete;
59
60 [[nodiscard]] auto
61 get()
62 {
63 return future_.get();
64 }
65
66 void
67 wait()
68 {
69 future_.wait();
70 }
71};
72
73template <typename CtxType, typename OpType>
75 class State {
76 std::mutex m_;
77 std::condition_variable ready_;
78 std::optional<OpType> op_{std::nullopt};
79
80 public:
81 void
82 emplace(auto&& op)
83 {
84 std::lock_guard const lock{m_};
85 op_.emplace(std::forward<decltype(op)>(op));
86 ready_.notify_all();
87 }
88
89 [[nodiscard]] OpType&
90 get()
91 {
92 std::unique_lock lock{m_};
93 ready_.wait(lock, [this] { return op_.has_value(); });
94 return op_.value();
95 }
96 };
97
98 std::shared_ptr<State> state = std::make_shared<State>();
99 typename CtxType::Timer timer;
100
101 BasicScheduledOperation(auto& executor, auto delay, auto&& fn)
102 : timer(executor, delay, [state = state, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
103 state->emplace(fn(ec));
104 })
105 {
106 }
107
108 ~BasicScheduledOperation() override
109 {
110 if (not wasMoved())
111 abort();
112 }
113
114 BasicScheduledOperation(BasicScheduledOperation const&) = default;
115 BasicScheduledOperation&
116 operator=(BasicScheduledOperation const&) = default;
117 BasicScheduledOperation(BasicScheduledOperation&&) = default;
118 BasicScheduledOperation&
119 operator=(BasicScheduledOperation&&) = default;
120
121 [[nodiscard]] auto
122 get()
123 {
124 return state->get().get();
125 }
126
127 void
128 wait() noexcept
129 {
130 state->get().wait();
131 }
132
133 void
134 cancel() noexcept
135 {
136 timer.cancel();
137 }
138
139 void
140 requestStop() noexcept
141 requires(SomeStoppableOperation<OpType>)
142 {
143 state->get().requestStop();
144 }
145
146 void
147 abort() noexcept
148 {
149 cancel();
150
151 if constexpr (SomeStoppableOperation<OpType>)
152 requestStop();
153 }
154};
155
156} // namespace impl
157
164template <typename RetType, typename StopSourceType>
165class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>>,
166 public util::MoveTracker {
167 using OutcomeType = StoppableOutcome<RetType, StopSourceType>;
168
169 StopSourceType stopSource_;
170
171public:
178 : impl::BasicOperation<OutcomeType>(outcome), stopSource_(outcome->getStopSource())
179 {
180 }
181
182 ~StoppableOperation() override
183 {
184 if (not wasMoved())
185 requestStop();
186 }
187
188 StoppableOperation(StoppableOperation const&) = delete;
189 StoppableOperation&
190 operator=(StoppableOperation const&) = delete;
191 StoppableOperation(StoppableOperation&&) = default;
192 StoppableOperation&
193 operator=(StoppableOperation&&) = default;
194
196 void
197 requestStop() noexcept
198 {
199 stopSource_.requestStop();
200 }
201};
202
208template <typename RetType>
210
217template <typename CtxType, typename OpType>
219
228template <typename CtxType>
230 util::Repeat repeat_;
231 std::function<void()> action_;
232
233public:
242 template <std::invocable FnType>
243 RepeatingOperation(auto& executor, std::chrono::steady_clock::duration interval, FnType&& fn)
244 : repeat_(executor), action_([fn = std::forward<FnType>(fn), &executor] { boost::asio::post(executor, fn); })
245 {
246 repeat_.start(interval, action_);
247 }
248
249 ~RepeatingOperation() override
250 {
251 if (not wasMoved())
252 abort();
253 }
254
255 RepeatingOperation(RepeatingOperation const&) = delete;
256 RepeatingOperation&
257 operator=(RepeatingOperation const&) = delete;
258 RepeatingOperation(RepeatingOperation&&) = default;
259 RepeatingOperation&
260 operator=(RepeatingOperation&&) = default;
261
267 void
268 abort() noexcept
269 {
270 repeat_.stop();
271 }
272
279 void
281 {
282 action_();
283 }
284};
285
286} // namespace util::async
A base-class that can be used to check whether the current instance was moved from.
Definition MoveTracker.hpp:29
MoveTracker & operator=(MoveTracker &&other)
Move operator sets the moved-from state on other and resets the state on this
Definition MoveTracker.hpp:63
bool wasMoved() const noexcept
The function to be used by clients in order to check whether the instance was moved from.
Definition MoveTracker.hpp:38
A class to repeat some action at a regular interval.
Definition Repeat.hpp:41
void stop()
Stop repeating.
Definition Repeat.cpp:25
void start(std::chrono::steady_clock::duration interval, Action &&action)
Start asynchronously repeating.
Definition Repeat.hpp:89
The future side of async operations that automatically repeat until aborted.
Definition Operation.hpp:229
void invoke()
Force-invoke the operation.
Definition Operation.hpp:280
RepeatingOperation(auto &executor, std::chrono::steady_clock::duration interval, FnType &&fn)
Construct a new Repeating Operation object.
Definition Operation.hpp:243
void abort() noexcept
Aborts the operation and the repeating timer.
Definition Operation.hpp:268
The future side of async operations that can be stopped.
Definition Outcome.hpp:30
StoppableOperation(OutcomeType *outcome)
Construct a new Stoppable Operation object.
Definition Operation.hpp:177
void requestStop() noexcept
Requests the operation to stop.
Definition Operation.hpp:197
Stoppable outcome.
Definition Outcome.hpp:116
Definition Outcome.hpp:35
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36