Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Scheduling.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
23#include "etlng/Models.hpp"
24#include "etlng/SchedulerInterface.hpp"
25
26#include <sys/types.h>
27
28#include <atomic>
29#include <cstdint>
30#include <functional>
31#include <limits>
32#include <memory>
33#include <optional>
34#include <tuple>
35#include <type_traits>
36#include <utility>
37
38namespace etlng::impl {
39
40template <typename T>
41concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
42
44 std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers_;
45
46 uint32_t startSeq_;
47 std::optional<uint32_t> maxSeq_;
48 std::atomic_uint32_t seq_;
49
50public:
52 : ledgers_(other.ledgers_), startSeq_(other.startSeq_), maxSeq_(other.maxSeq_), seq_(other.seq_.load())
53 {
54 }
55
57 std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers,
58 uint32_t startSeq,
59 std::optional<uint32_t> maxSeq = std::nullopt
60 )
61 : ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
62 {
63 }
64
65 [[nodiscard]] std::optional<model::Task>
66 next() override
67 {
68 static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
69 uint32_t currentSeq = seq_;
70
71 if (ledgers_.get().getMostRecent() >= currentSeq) {
72 while (currentSeq < maxSeq_.value_or(kMAX)) {
73 if (seq_.compare_exchange_weak(currentSeq, currentSeq + 1u, std::memory_order_acq_rel)) {
74 return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
75 }
76 }
77 }
78
79 return std::nullopt;
80 }
81};
82
84 uint32_t startSeq_;
85 uint32_t minSeq_ = 0u;
86
87 std::atomic_uint32_t seq_;
88
89public:
91 : startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
92 {
93 }
94
95 BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
96 : startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
97 {
98 }
99
100 [[nodiscard]] std::optional<model::Task>
101 next() override
102 {
103 uint32_t currentSeq = seq_;
104 while (currentSeq > minSeq_) {
105 if (seq_.compare_exchange_weak(currentSeq, currentSeq - 1u, std::memory_order_acq_rel)) {
106 return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
107 }
108 }
109
110 return std::nullopt;
111 }
112};
113
114template <SomeScheduler... Schedulers>
116 std::tuple<Schedulers...> schedulers_;
117
118public:
119 template <SomeScheduler... Ts>
120 requires(std::is_same_v<Ts, Schedulers> and ...)
121 SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
122 {
123 }
124
125 [[nodiscard]] std::optional<model::Task>
126 next() override
127 {
128 std::optional<model::Task> task;
129 auto const expand = [&](auto& s) {
130 if (task.has_value())
131 return false;
132
133 task = s.next();
134 return task.has_value();
135 };
136
137 std::apply([&expand](auto&&... xs) { (... || expand(xs)); }, schedulers_);
138
139 return task;
140 }
141};
142
143static auto
144makeScheduler(SomeScheduler auto&&... schedulers)
145{
146 return std::make_unique<SchedulerChain<std::decay_t<decltype(schedulers)>...>>(
147 std::forward<decltype(schedulers)>(schedulers)...
148 );
149}
150
151} // namespace etlng::impl
Definition Scheduling.hpp:83
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:101
Definition Scheduling.hpp:43
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:66
Definition Scheduling.hpp:115
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:126
Definition Scheduling.hpp:41
The interface of a scheduler for the extraction proccess.
Definition SchedulerInterface.hpp:31