23#include "etlng/Models.hpp"
24#include "etlng/SchedulerInterface.hpp"
38namespace etlng::impl {
41concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
44 std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers_;
47 std::optional<uint32_t> maxSeq_;
48 std::atomic_uint32_t seq_;
52 : ledgers_(other.ledgers_), startSeq_(other.startSeq_), maxSeq_(other.maxSeq_), seq_(other.seq_.load())
57 std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers,
59 std::optional<uint32_t> maxSeq = std::nullopt
61 : ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
65 [[nodiscard]] std::optional<model::Task>
68 static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
69 uint32_t currentSeq = seq_;
71 if (ledgers_.get().getMostRecent() >= currentSeq) {
72 while (currentSeq < maxSeq_.value_or(kMAX)) {
73 if (seq_.compare_exchange_weak(currentSeq, currentSeq + 1u, std::memory_order_acq_rel)) {
74 return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
85 uint32_t minSeq_ = 0u;
87 std::atomic_uint32_t seq_;
91 : startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
95 BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
96 : startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
100 [[nodiscard]] std::optional<model::Task>
103 uint32_t currentSeq = seq_;
104 while (currentSeq > minSeq_) {
105 if (seq_.compare_exchange_weak(currentSeq, currentSeq - 1u, std::memory_order_acq_rel)) {
106 return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
114template <SomeScheduler... Schedulers>
116 std::tuple<Schedulers...> schedulers_;
120 requires(std::is_same_v<Ts, Schedulers> and ...)
121 SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
125 [[nodiscard]] std::optional<model::Task>
128 std::optional<model::Task> task;
129 auto const expand = [&](
auto& s) {
130 if (task.has_value())
134 return task.has_value();
137 std::apply([&expand](
auto&&... xs) { (... || expand(xs)); }, schedulers_);
144makeScheduler(SomeScheduler
auto&&... schedulers)
146 return std::make_unique<SchedulerChain<std::decay_t<
decltype(schedulers)>...>>(
147 std::forward<
decltype(schedulers)>(schedulers)...
Definition Scheduling.hpp:83
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:101
Definition Scheduling.hpp:43
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:66
Definition Scheduling.hpp:115
std::optional< model::Task > next() override
Attempt to obtain the next task.
Definition Scheduling.hpp:126
Definition Scheduling.hpp:41
The interface of a scheduler for the extraction proccess.
Definition SchedulerInterface.hpp:31