Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Operation.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/MoveTracker.hpp"
23#include "util/Repeat.hpp"
24#include "util/async/Concepts.hpp"
25#include "util/async/Outcome.hpp"
26#include "util/async/context/impl/Cancellation.hpp"
27#include "util/async/context/impl/Timer.hpp"
28
29#include <fmt/format.h>
30
31#include <chrono>
32#include <concepts>
33#include <condition_variable>
34#include <expected>
35#include <functional>
36#include <future>
37#include <memory>
38#include <mutex>
39#include <optional>
40
41namespace util::async {
42namespace impl {
43
44template <typename OutcomeType>
45class BasicOperation {
46protected:
47 std::future<typename OutcomeType::DataType> future_;
48
49public:
50 using DataType = typename OutcomeType::DataType;
51
52 explicit BasicOperation(OutcomeType* outcome) : future_{outcome->getStdFuture()}
53 {
54 }
55
56 BasicOperation(BasicOperation&&) = default;
57
58 BasicOperation(BasicOperation const&) = delete;
59
60 [[nodiscard]] auto
61 get()
62 {
63 return future_.get();
64 }
65
66 void
67 wait()
68 {
69 future_.wait();
70 }
71};
72
73template <typename CtxType, typename OpType>
74struct BasicScheduledOperation : util::MoveTracker {
75 class State {
76 std::mutex m_;
77 std::condition_variable ready_;
78 std::optional<OpType> op_{std::nullopt};
79
80 public:
81 void
82 emplace(auto&& op)
83 {
84 std::lock_guard const lock{m_};
85 op_.emplace(std::forward<decltype(op)>(op));
86 ready_.notify_all();
87 }
88
89 [[nodiscard]] OpType&
90 get()
91 {
92 std::unique_lock lock{m_};
93 ready_.wait(lock, [this] { return op_.has_value(); });
94 return op_.value();
95 }
96 };
97
98 std::shared_ptr<State> state = std::make_shared<State>();
99 typename CtxType::Timer timer;
100
101 BasicScheduledOperation(auto& executor, auto delay, auto&& fn)
102 : timer(
103 executor,
104 delay,
105 [state = state, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
106 state->emplace(fn(ec));
107 }
108 )
109 {
110 }
111
112 ~BasicScheduledOperation() override
113 {
114 if (not wasMoved())
115 abort();
116 }
117
118 BasicScheduledOperation(BasicScheduledOperation const&) = default;
119 BasicScheduledOperation&
120 operator=(BasicScheduledOperation const&) = default;
121 BasicScheduledOperation(BasicScheduledOperation&&) = default;
122 BasicScheduledOperation&
123 operator=(BasicScheduledOperation&&) = default;
124
125 [[nodiscard]] auto
126 get()
127 {
128 return state->get().get();
129 }
130
131 void
132 wait() noexcept
133 {
134 state->get().wait();
135 }
136
137 void
138 cancel() noexcept
139 {
140 timer.cancel();
141 }
142
143 void
144 requestStop() noexcept
145 requires(SomeStoppableOperation<OpType>)
146 {
147 state->get().requestStop();
148 }
149
150 void
151 abort() noexcept
152 {
153 cancel();
154
155 if constexpr (SomeStoppableOperation<OpType>)
156 requestStop();
157 }
158};
159
160} // namespace impl
161
168template <typename RetType, typename StopSourceType>
169class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>>,
170 public util::MoveTracker {
172
173 StopSourceType stopSource_;
174
175public:
181 explicit StoppableOperation(OutcomeType* outcome)
182 : impl::BasicOperation<OutcomeType>(outcome), stopSource_(outcome->getStopSource())
183 {
184 }
185
186 ~StoppableOperation() override
187 {
188 if (not wasMoved())
189 requestStop();
190 }
191
192 StoppableOperation(StoppableOperation const&) = delete;
193 StoppableOperation&
194 operator=(StoppableOperation const&) = delete;
195 StoppableOperation(StoppableOperation&&) = default;
196 StoppableOperation&
197 operator=(StoppableOperation&&) = default;
198
200 void
201 requestStop() noexcept
202 {
203 stopSource_.requestStop();
204 }
205};
206
212template <typename RetType>
214
221template <typename CtxType, typename OpType>
223
233template <typename CtxType>
235 util::Repeat repeat_;
236 std::function<void()> action_;
237
238public:
247 template <std::invocable FnType>
248 RepeatingOperation(auto& executor, std::chrono::steady_clock::duration interval, FnType&& fn)
249 : repeat_(executor)
250 , action_([fn = std::forward<FnType>(fn), &executor] { boost::asio::post(executor, fn); })
251 {
252 repeat_.start(interval, action_);
253 }
254
255 ~RepeatingOperation() override
256 {
257 if (not wasMoved())
258 abort();
259 }
260
261 RepeatingOperation(RepeatingOperation const&) = delete;
262 RepeatingOperation&
263 operator=(RepeatingOperation const&) = delete;
264 RepeatingOperation(RepeatingOperation&&) = default;
265 RepeatingOperation&
266 operator=(RepeatingOperation&&) = default;
267
273 void
274 abort() noexcept
275 {
276 repeat_.stop();
277 }
278
285 void
287 {
288 action_();
289 }
290};
291
292} // namespace util::async
A base-class that can be used to check whether the current instance was moved from.
Definition MoveTracker.hpp:29
A class to repeat some action at a regular interval.
Definition Repeat.hpp:44
void start(std::chrono::steady_clock::duration interval, Action &&action)
Start asynchronously repeating.
Definition Repeat.hpp:95
void invoke()
Force-invoke the operation.
Definition Operation.hpp:286
RepeatingOperation(auto &executor, std::chrono::steady_clock::duration interval, FnType &&fn)
Construct a new Repeating Operation object.
Definition Operation.hpp:248
void abort() noexcept
Aborts the operation and the repeating timer.
Definition Operation.hpp:274
The future side of async operations that can be stopped.
Definition Operation.hpp:170
StoppableOperation(OutcomeType *outcome)
Construct a new Stoppable Operation object.
Definition Operation.hpp:181
void requestStop() noexcept
Requests the operation to stop.
Definition Operation.hpp:201
Stoppable outcome.
Definition Outcome.hpp:116
Definition Operation.hpp:45
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36
impl::BasicScheduledOperation< CtxType, OpType > ScheduledOperation
The future side of async operations that can be scheduled.
Definition Operation.hpp:222
impl::BasicOperation< Outcome< RetType > > Operation
The future side of async operations that cannot be stopped.
Definition Operation.hpp:213