Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Operation.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/MoveTracker.hpp"
23#include "util/Repeat.hpp"
24#include "util/async/Concepts.hpp"
25#include "util/async/Outcome.hpp"
26#include "util/async/context/impl/Cancellation.hpp"
27#include "util/async/context/impl/Timer.hpp"
28
29#include <fmt/core.h>
30
31#include <chrono>
32#include <concepts>
33#include <condition_variable>
34#include <expected>
35#include <future>
36#include <memory>
37#include <mutex>
38#include <optional>
39
40namespace util::async {
41namespace impl {
42
43template <typename OutcomeType>
44class BasicOperation {
45protected:
46 std::future<typename OutcomeType::DataType> future_;
47
48public:
49 using DataType = typename OutcomeType::DataType;
50
51 explicit BasicOperation(OutcomeType* outcome) : future_{outcome->getStdFuture()}
52 {
53 }
54
55 BasicOperation(BasicOperation&&) = default;
56
57 BasicOperation(BasicOperation const&) = delete;
58
59 [[nodiscard]] auto
60 get()
61 {
62 return future_.get();
63 }
64
65 void
66 wait()
67 {
68 future_.wait();
69 }
70};
71
72template <typename CtxType, typename OpType>
74 class State {
75 std::mutex m_;
76 std::condition_variable ready_;
77 std::optional<OpType> op_{std::nullopt};
78
79 public:
80 void
81 emplace(auto&& op)
82 {
83 std::lock_guard const lock{m_};
84 op_.emplace(std::forward<decltype(op)>(op));
85 ready_.notify_all();
86 }
87
88 [[nodiscard]] OpType&
89 get()
90 {
91 std::unique_lock lock{m_};
92 ready_.wait(lock, [this] { return op_.has_value(); });
93 return op_.value();
94 }
95 };
96
97 std::shared_ptr<State> state = std::make_shared<State>();
98 typename CtxType::Timer timer;
99
100 BasicScheduledOperation(auto& executor, auto delay, auto&& fn)
101 : timer(executor, delay, [state = state, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
102 state->emplace(fn(ec));
103 })
104 {
105 }
106
107 ~BasicScheduledOperation() override
108 {
109 if (not wasMoved())
110 abort();
111 }
112
113 BasicScheduledOperation(BasicScheduledOperation const&) = default;
114 BasicScheduledOperation&
115 operator=(BasicScheduledOperation const&) = default;
116 BasicScheduledOperation(BasicScheduledOperation&&) = default;
117 BasicScheduledOperation&
118 operator=(BasicScheduledOperation&&) = default;
119
120 [[nodiscard]] auto
121 get()
122 {
123 return state->get().get();
124 }
125
126 void
127 wait() noexcept
128 {
129 state->get().wait();
130 }
131
132 void
133 cancel() noexcept
134 {
135 timer.cancel();
136 }
137
138 void
139 requestStop() noexcept
140 requires(SomeStoppableOperation<OpType>)
141 {
142 state->get().requestStop();
143 }
144
145 void
146 abort() noexcept
147 {
148 cancel();
149
150 if constexpr (SomeStoppableOperation<OpType>)
151 requestStop();
152 }
153};
154
155} // namespace impl
156
163template <typename RetType, typename StopSourceType>
164class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>>,
165 public util::MoveTracker {
166 using OutcomeType = StoppableOutcome<RetType, StopSourceType>;
167
168 StopSourceType stopSource_;
169
170public:
177 : impl::BasicOperation<OutcomeType>(outcome), stopSource_(outcome->getStopSource())
178 {
179 }
180
181 ~StoppableOperation() override
182 {
183 if (not wasMoved())
184 requestStop();
185 }
186
187 StoppableOperation(StoppableOperation const&) = delete;
188 StoppableOperation&
189 operator=(StoppableOperation const&) = delete;
190 StoppableOperation(StoppableOperation&&) = default;
191 StoppableOperation&
192 operator=(StoppableOperation&&) = default;
193
195 void
196 requestStop() noexcept
197 {
198 stopSource_.requestStop();
199 }
200};
201
207template <typename RetType>
209
216template <typename CtxType, typename OpType>
218
227template <typename CtxType>
229 util::Repeat repeat_;
230
231public:
240 RepeatingOperation(auto& executor, std::chrono::steady_clock::duration interval, std::invocable auto&& fn)
241 : repeat_(executor)
242 {
243 repeat_.start(interval, std::forward<decltype(fn)>(fn));
244 }
245
246 ~RepeatingOperation() override
247 {
248 if (not wasMoved())
249 abort();
250 }
251
252 RepeatingOperation(RepeatingOperation const&) = delete;
253 RepeatingOperation&
254 operator=(RepeatingOperation const&) = delete;
255 RepeatingOperation(RepeatingOperation&&) = default;
256 RepeatingOperation&
257 operator=(RepeatingOperation&&) = default;
258
264 void
265 abort() noexcept
266 {
267 repeat_.stop();
268 }
269};
270
271} // namespace util::async
A base-class that can be used to check whether the current instance was moved from.
Definition MoveTracker.hpp:29
MoveTracker & operator=(MoveTracker &&other)
Move operator sets the moved-from state on other and resets the state on this
Definition MoveTracker.hpp:63
bool wasMoved() const noexcept
The function to be used by clients in order to check whether the instance was moved from.
Definition MoveTracker.hpp:38
A class to repeat some action at a regular interval.
Definition Repeat.hpp:41
void stop()
Stop repeating.
Definition Repeat.cpp:25
void start(std::chrono::steady_clock::duration interval, Action &&action)
Start asynchronously repeating.
Definition Repeat.hpp:89
The future side of async operations that automatically repeat until aborted.
Definition Operation.hpp:228
RepeatingOperation(auto &executor, std::chrono::steady_clock::duration interval, std::invocable auto &&fn)
Construct a new Repeating Operation object.
Definition Operation.hpp:240
void abort() noexcept
Aborts the operation and the repeating timer.
Definition Operation.hpp:265
The future side of async operations that can be stopped.
Definition Outcome.hpp:30
StoppableOperation(OutcomeType *outcome)
Construct a new Stoppable Operation object.
Definition Operation.hpp:176
void requestStop() noexcept
Requests the operation to stop.
Definition Operation.hpp:196
Stoppable outcome.
Definition Outcome.hpp:116
Definition Outcome.hpp:35
This namespace implements an async framework built on top of execution contexts.
Definition AnyExecutionContext.hpp:36