22#include "etl/Models.hpp"
23#include "util/Assert.hpp"
24#include "util/Mutex.hpp"
27#include <condition_variable>
42 return lhs.seq > rhs.seq;
53 std::uint32_t expectedSequence;
56 std::vector<model::LedgerData>,
60 Data(std::uint32_t seq) : expectedSequence(seq)
66 std::uint32_t increment_;
69 std::condition_variable cv_;
70 std::atomic_bool stopping_ =
false;
74 std::uint32_t startSeq = 0u;
75 std::uint32_t increment =
77 std::optional<std::size_t> limit = std::nullopt;
87 : limit_(settings.limit.value_or(0uz))
88 , increment_(settings.increment)
89 , data_(settings.startSeq)
95 ASSERT(stopping_,
"stop() must be called before destroying the TaskQueue");
108 auto lock = data_.lock();
110 if (limit_ == 0uz or lock->forwardLoadQueue.size() < limit_) {
111 lock->forwardLoadQueue.push(std::move(item));
125 [[nodiscard]] std::optional<model::LedgerData>
128 auto lock = data_.lock();
129 std::optional<model::LedgerData> out;
131 if (not lock->forwardLoadQueue.empty() &&
132 lock->forwardLoadQueue.top().seq == lock->expectedSequence) {
133 out.emplace(lock->forwardLoadQueue.top());
134 lock->forwardLoadQueue.pop();
135 lock->expectedSequence += increment_;
150 return data_.lock()->forwardLoadQueue.empty();
163 auto lock = data_.lock<std::unique_lock>();
164 cv_.wait(lock, [&] {
return stopping_ or not lock->forwardLoadQueue.empty(); });
A wrapper for std::priority_queue that serialises operations using a mutex.
Definition TaskQueue.hpp:51
void stop()
Notify the queue that it's no longer needed.
Definition TaskQueue.hpp:172
TaskQueue(Settings settings)
Construct a new priority queue.
Definition TaskQueue.hpp:86
bool empty()
Check if the queue is empty.
Definition TaskQueue.hpp:148
void awaitTask()
Awaits for the queue to become non-empty.
Definition TaskQueue.hpp:158
std::optional< model::LedgerData > dequeue()
Dequeue the next available item out of the queue.
Definition TaskQueue.hpp:126
bool enqueue(model::LedgerData item)
Enqueue a new item onto the queue if space is available.
Definition TaskQueue.hpp:106
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
Definition TaskQueue.hpp:38
Definition TaskQueue.hpp:73
Represents an entire ledger diff worth of transactions and objects.
Definition Models.hpp:143