3#include "etl/Models.hpp"
5#include "etl/SchedulerInterface.hpp"
22concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
25 std::reference_wrapper<NetworkValidatedLedgersInterface> ledgers_;
28 std::optional<uint32_t> maxSeq_;
29 std::atomic_uint32_t seq_;
32 ForwardScheduler(ForwardScheduler
const& other)
33 : ledgers_(other.ledgers_)
34 , startSeq_(other.startSeq_)
35 , maxSeq_(other.maxSeq_)
36 , seq_(other.seq_.load())
41 std::reference_wrapper<NetworkValidatedLedgersInterface> ledgers,
43 std::optional<uint32_t> maxSeq = std::nullopt
45 : ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
49 [[nodiscard]] std::optional<model::Task>
52 static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
53 uint32_t currentSeq = seq_;
55 if (ledgers_.get().getMostRecent() >= currentSeq) {
56 while (currentSeq < maxSeq_.value_or(kMAX)) {
57 if (seq_.compare_exchange_weak(
58 currentSeq, currentSeq + 1u, std::memory_order_acq_rel
60 return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
71 uint32_t minSeq_ = 0u;
73 std::atomic_uint32_t seq_;
76 BackfillScheduler(BackfillScheduler
const& other)
77 : startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
81 BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
82 : startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
86 [[nodiscard]] std::optional<model::Task>
89 uint32_t currentSeq = seq_;
90 while (currentSeq > minSeq_) {
91 if (seq_.compare_exchange_weak(
92 currentSeq, currentSeq - 1u, std::memory_order_acq_rel
94 return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
102template <SomeScheduler... Schedulers>
104 std::tuple<Schedulers...> schedulers_;
108 requires(std::is_same_v<Ts, Schedulers> and ...)
109 SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
113 [[nodiscard]] std::optional<model::Task>
116 std::optional<model::Task> task;
117 auto const expand = [&](
auto& s) {
118 if (task.has_value())
122 return task.has_value();
125 std::apply([&expand](
auto&&... xs) { (... || expand(xs)); }, schedulers_);
132makeScheduler(SomeScheduler
auto&&... schedulers)
134 return std::make_unique<SchedulerChain<std::decay_t<
decltype(schedulers)>...>>(
135 std::forward<
decltype(schedulers)>(schedulers)...
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:87
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:50
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:114
Definition Scheduling.hpp:22
The interface of a scheduler for the extraction process.
Definition SchedulerInterface.hpp:12