22#include "etl/Models.hpp"
24#include "etl/SchedulerInterface.hpp"
41concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
44 std::reference_wrapper<NetworkValidatedLedgersInterface> ledgers_;
47 std::optional<uint32_t> maxSeq_;
48 std::atomic_uint32_t seq_;
51 ForwardScheduler(ForwardScheduler
const& other)
52 : ledgers_(other.ledgers_)
53 , startSeq_(other.startSeq_)
54 , maxSeq_(other.maxSeq_)
55 , seq_(other.seq_.load())
60 std::reference_wrapper<NetworkValidatedLedgersInterface> ledgers,
62 std::optional<uint32_t> maxSeq = std::nullopt
64 : ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
68 [[nodiscard]] std::optional<model::Task>
71 static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
72 uint32_t currentSeq = seq_;
74 if (ledgers_.get().getMostRecent() >= currentSeq) {
75 while (currentSeq < maxSeq_.value_or(kMAX)) {
76 if (seq_.compare_exchange_weak(
77 currentSeq, currentSeq + 1u, std::memory_order_acq_rel
79 return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
90 uint32_t minSeq_ = 0u;
92 std::atomic_uint32_t seq_;
95 BackfillScheduler(BackfillScheduler
const& other)
96 : startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
100 BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
101 : startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
105 [[nodiscard]] std::optional<model::Task>
108 uint32_t currentSeq = seq_;
109 while (currentSeq > minSeq_) {
110 if (seq_.compare_exchange_weak(
111 currentSeq, currentSeq - 1u, std::memory_order_acq_rel
113 return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
121template <SomeScheduler... Schedulers>
123 std::tuple<Schedulers...> schedulers_;
127 requires(std::is_same_v<Ts, Schedulers> and ...)
128 SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
132 [[nodiscard]] std::optional<model::Task>
135 std::optional<model::Task> task;
136 auto const expand = [&](
auto& s) {
137 if (task.has_value())
141 return task.has_value();
144 std::apply([&expand](
auto&&... xs) { (... || expand(xs)); }, schedulers_);
151makeScheduler(SomeScheduler
auto&&... schedulers)
153 return std::make_unique<SchedulerChain<std::decay_t<
decltype(schedulers)>...>>(
154 std::forward<
decltype(schedulers)>(schedulers)...
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:106
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:69
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:133
Definition Scheduling.hpp:41
The interface of a scheduler for the extraction process.
Definition SchedulerInterface.hpp:31