Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Scheduling.hpp
1#pragma once
2
3#include "etl/Models.hpp"
5#include "etl/SchedulerInterface.hpp"
6
7#include <sys/types.h>
8
9#include <atomic>
10#include <cstdint>
11#include <functional>
12#include <limits>
13#include <memory>
14#include <optional>
15#include <tuple>
16#include <type_traits>
17#include <utility>
18
19namespace etl::impl {
20
21template <typename T>
22concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
23
24class ForwardScheduler : public SchedulerInterface {
25 std::reference_wrapper<NetworkValidatedLedgersInterface> ledgers_;
26
27 uint32_t startSeq_;
28 std::optional<uint32_t> maxSeq_;
29 std::atomic_uint32_t seq_;
30
31public:
32 ForwardScheduler(ForwardScheduler const& other)
33 : ledgers_(other.ledgers_)
34 , startSeq_(other.startSeq_)
35 , maxSeq_(other.maxSeq_)
36 , seq_(other.seq_.load())
37 {
38 }
39
40 ForwardScheduler(
41 std::reference_wrapper<NetworkValidatedLedgersInterface> ledgers,
42 uint32_t startSeq,
43 std::optional<uint32_t> maxSeq = std::nullopt
44 )
45 : ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
46 {
47 }
48
49 [[nodiscard]] std::optional<model::Task>
50 next() override
51 {
52 static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
53 uint32_t currentSeq = seq_;
54
55 if (ledgers_.get().getMostRecent() >= currentSeq) {
56 while (currentSeq < maxSeq_.value_or(kMAX)) {
57 if (seq_.compare_exchange_weak(
58 currentSeq, currentSeq + 1u, std::memory_order_acq_rel
59 )) {
60 return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
61 }
62 }
63 }
64
65 return std::nullopt;
66 }
67};
68
69class BackfillScheduler : public SchedulerInterface {
70 uint32_t startSeq_;
71 uint32_t minSeq_ = 0u;
72
73 std::atomic_uint32_t seq_;
74
75public:
76 BackfillScheduler(BackfillScheduler const& other)
77 : startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
78 {
79 }
80
81 BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
82 : startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
83 {
84 }
85
86 [[nodiscard]] std::optional<model::Task>
87 next() override
88 {
89 uint32_t currentSeq = seq_;
90 while (currentSeq > minSeq_) {
91 if (seq_.compare_exchange_weak(
92 currentSeq, currentSeq - 1u, std::memory_order_acq_rel
93 )) {
94 return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
95 }
96 }
97
98 return std::nullopt;
99 }
100};
101
102template <SomeScheduler... Schedulers>
103class SchedulerChain : public SchedulerInterface {
104 std::tuple<Schedulers...> schedulers_;
105
106public:
107 template <SomeScheduler... Ts>
108 requires(std::is_same_v<Ts, Schedulers> and ...)
109 SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
110 {
111 }
112
113 [[nodiscard]] std::optional<model::Task>
114 next() override
115 {
116 std::optional<model::Task> task;
117 auto const expand = [&](auto& s) {
118 if (task.has_value())
119 return false;
120
121 task = s.next();
122 return task.has_value();
123 };
124
125 std::apply([&expand](auto&&... xs) { (... || expand(xs)); }, schedulers_);
126
127 return task;
128 }
129};
130
131static auto
132makeScheduler(SomeScheduler auto&&... schedulers)
133{
134 return std::make_unique<SchedulerChain<std::decay_t<decltype(schedulers)>...>>(
135 std::forward<decltype(schedulers)>(schedulers)...
136 );
137}
138
139} // namespace etl::impl
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:87
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:50
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:114
Definition Scheduling.hpp:22
The interface of a scheduler for the extraction process.
Definition SchedulerInterface.hpp:12