22#include "etlng/Models.hpp"
23#include "util/Assert.hpp"
24#include "util/Mutex.hpp"
27#include <condition_variable>
36namespace etlng::impl {
42 return lhs.seq > rhs.seq;
52 std::uint32_t expectedSequence;
53 std::priority_queue<model::LedgerData, std::vector<model::LedgerData>,
ReverseOrderComparator> forwardLoadQueue;
55 Data(std::uint32_t seq) : expectedSequence(seq)
61 std::uint32_t increment_;
64 std::condition_variable cv_;
65 std::atomic_bool stopping_ =
false;
69 std::uint32_t startSeq = 0u;
70 std::uint32_t increment = 1u;
71 std::optional<std::size_t> limit = std::nullopt;
80 : limit_(settings.limit.value_or(0uz)), increment_(settings.increment), data_(settings.startSeq)
86 ASSERT(stopping_,
"stop() must be called before destroying the TaskQueue");
99 auto lock = data_.
lock();
101 if (limit_ == 0uz or lock->forwardLoadQueue.size() < limit_) {
102 lock->forwardLoadQueue.push(std::move(item));
116 [[nodiscard]] std::optional<model::LedgerData>
119 auto lock = data_.
lock();
120 std::optional<model::LedgerData> out;
122 if (not lock->forwardLoadQueue.empty() && lock->forwardLoadQueue.top().seq == lock->expectedSequence) {
123 out.emplace(lock->forwardLoadQueue.top());
124 lock->forwardLoadQueue.pop();
125 lock->expectedSequence += increment_;
140 return data_.
lock()->forwardLoadQueue.empty();
153 auto lock = data_.
lock<std::unique_lock>();
154 cv_.wait(lock, [&] {
return stopping_ or not lock->forwardLoadQueue.empty(); });
A wrapper for std::priority_queue that serialises operations using a mutex.
Definition TaskQueue.hpp:50
bool enqueue(model::LedgerData item)
Enqueue a new item onto the queue if space is available.
Definition TaskQueue.hpp:97
bool empty()
Check if the queue is empty.
Definition TaskQueue.hpp:138
std::optional< model::LedgerData > dequeue()
Dequeue the next available item out of the queue.
Definition TaskQueue.hpp:117
void awaitTask()
Awaits for the queue to become non-empty.
Definition TaskQueue.hpp:148
TaskQueue(Settings settings)
Construct a new priority queue.
Definition TaskQueue.hpp:79
void stop()
Notify the queue that it's no longer needed.
Definition TaskQueue.hpp:162
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:139
Definition TaskQueue.hpp:38
Definition TaskQueue.hpp:68
Represents an entire ledger diff worth of transactions and objects.
Definition Models.hpp:143