Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RepeatedTask.hpp
1#pragma once
2
3#include "util/Assert.hpp"
4#include "util/Spawn.hpp"
5
6#include <boost/asio/bind_cancellation_slot.hpp>
7#include <boost/asio/cancellation_signal.hpp>
8#include <boost/asio/cancellation_type.hpp>
9#include <boost/asio/error.hpp>
10#include <boost/asio/executor.hpp>
11#include <boost/asio/spawn.hpp>
12#include <boost/asio/steady_timer.hpp>
13#include <boost/asio/strand.hpp>
14
15#include <atomic>
16#include <chrono>
17#include <concepts>
18#include <semaphore>
19
20namespace cluster::impl {
21
22// TODO: Try to replace util::Repeat by this. https://github.com/XRPLF/clio/issues/2926
23template <typename Context>
24class RepeatedTask {
25 std::chrono::steady_clock::duration interval_;
26 boost::asio::strand<typename Context::executor_type> strand_;
27
28 enum class State { Running, Stopped };
29 std::atomic<State> state_ = State::Stopped;
30
31 std::binary_semaphore semaphore_{0};
32 boost::asio::steady_timer timer_;
33
34public:
35 RepeatedTask(std::chrono::steady_clock::duration interval, Context& ctx)
36 : interval_(interval), strand_(boost::asio::make_strand(ctx)), timer_(strand_)
37 {
38 }
39
40 ~RepeatedTask()
41 {
42 stop();
43 }
44
45 template <typename Fn>
46 requires std::invocable<Fn, boost::asio::yield_context> or std::invocable<Fn>
47 void
48 run(Fn&& f)
49 {
50 ASSERT(state_ == State::Stopped, "Can only be ran once");
51 state_ = State::Running;
52 util::spawn(strand_, [this, f = std::forward<Fn>(f)](boost::asio::yield_context yield) {
53 boost::system::error_code ec;
54
55 while (state_ == State::Running) {
56 timer_.expires_after(interval_);
57 timer_.async_wait(yield[ec]);
58
59 if (ec or state_ != State::Running)
60 break;
61
62 if constexpr (std::invocable<decltype(f), boost::asio::yield_context>) {
63 f(yield);
64 } else {
65 f();
66 }
67 }
68
69 semaphore_.release();
70 });
71 }
72
73 void
74 stop()
75 {
76 if (auto expected = State::Running;
77 not state_.compare_exchange_strong(expected, State::Stopped))
78 return; // Already stopped or not started
79
80 std::binary_semaphore cancelSemaphore{0};
81 boost::asio::post(strand_, [this, &cancelSemaphore]() {
82 timer_.cancel();
83 cancelSemaphore.release();
84 });
85 cancelSemaphore.acquire();
86 semaphore_.acquire();
87 }
88};
89
90} // namespace cluster::impl
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn.
Definition Spawn.hpp:53