3#include "etl/Models.hpp"
4#include "util/Assert.hpp"
5#include "util/Mutex.hpp"
8#include <condition_variable>
23 return lhs.seq > rhs.seq;
34 std::uint32_t expectedSequence;
37 std::vector<model::LedgerData>,
41 Data(std::uint32_t seq) : expectedSequence(seq)
47 std::uint32_t increment_;
50 std::condition_variable cv_;
51 std::atomic_bool stopping_ =
false;
55 std::uint32_t startSeq = 0u;
56 std::uint32_t increment =
58 std::optional<std::size_t> limit = std::nullopt;
68 : limit_(settings.limit.value_or(0uz))
69 , increment_(settings.increment)
70 , data_(settings.startSeq)
76 ASSERT(stopping_,
"stop() must be called before destroying the TaskQueue");
89 auto lock = data_.lock();
91 if (limit_ == 0uz or lock->forwardLoadQueue.size() < limit_) {
92 lock->forwardLoadQueue.push(std::move(item));
106 [[nodiscard]] std::optional<model::LedgerData>
109 auto lock = data_.lock();
110 std::optional<model::LedgerData> out;
112 if (not lock->forwardLoadQueue.empty() &&
113 lock->forwardLoadQueue.top().seq == lock->expectedSequence) {
114 out.emplace(lock->forwardLoadQueue.top());
115 lock->forwardLoadQueue.pop();
116 lock->expectedSequence += increment_;
131 return data_.lock()->forwardLoadQueue.empty();
144 auto lock = data_.lock<std::unique_lock>();
145 cv_.wait(lock, [&] {
return stopping_ or not lock->forwardLoadQueue.empty(); });
A wrapper for std::priority_queue that serialises operations using a mutex.
Definition TaskQueue.hpp:32
void stop()
Notify the queue that it's no longer needed.
Definition TaskQueue.hpp:153
TaskQueue(Settings settings)
Construct a new priority queue.
Definition TaskQueue.hpp:67
bool empty()
Check if the queue is empty.
Definition TaskQueue.hpp:129
void awaitTask()
Awaits for the queue to become non-empty.
Definition TaskQueue.hpp:139
std::optional< model::LedgerData > dequeue()
Dequeue the next available item out of the queue.
Definition TaskQueue.hpp:107
bool enqueue(model::LedgerData item)
Enqueue a new item onto the queue if space is available.
Definition TaskQueue.hpp:87
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:82
Definition TaskQueue.hpp:19
Definition TaskQueue.hpp:54
Represents an entire ledger diff worth of transactions and objects.
Definition Models.hpp:124