Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
TaskQueue.hpp
1#pragma once
2
3#include "etl/Models.hpp"
4#include "util/Assert.hpp"
5#include "util/Mutex.hpp"
6
7#include <atomic>
8#include <condition_variable>
9#include <cstddef>
10#include <cstdint>
11#include <mutex>
12#include <optional>
13#include <queue>
14#include <utility>
15#include <vector>
16
17namespace etl::impl {
18
20 [[nodiscard]] bool
21 operator()(model::LedgerData const& lhs, model::LedgerData const& rhs) const noexcept
22 {
23 return lhs.seq > rhs.seq;
24 }
25};
26
32class TaskQueue {
33 struct Data {
34 std::uint32_t expectedSequence;
35 std::priority_queue<
37 std::vector<model::LedgerData>,
39 forwardLoadQueue;
40
41 Data(std::uint32_t seq) : expectedSequence(seq)
42 {
43 }
44 };
45
46 std::size_t limit_;
47 std::uint32_t increment_;
49
50 std::condition_variable cv_;
51 std::atomic_bool stopping_ = false;
52
53public:
54 struct Settings {
55 std::uint32_t startSeq = 0u; // sequence to start from (for dequeue)
56 std::uint32_t increment =
57 1u; // increment sequence by this value once dequeue was successful
58 std::optional<std::size_t> limit = std::nullopt;
59 };
60
67 explicit TaskQueue(Settings settings)
68 : limit_(settings.limit.value_or(0uz))
69 , increment_(settings.increment)
70 , data_(settings.startSeq)
71 {
72 }
73
75 {
76 ASSERT(stopping_, "stop() must be called before destroying the TaskQueue");
77 }
78
86 [[nodiscard]] bool
88 {
89 auto lock = data_.lock();
90
91 if (limit_ == 0uz or lock->forwardLoadQueue.size() < limit_) {
92 lock->forwardLoadQueue.push(std::move(item));
93 cv_.notify_all();
94
95 return true;
96 }
97
98 return false;
99 }
100
106 [[nodiscard]] std::optional<model::LedgerData>
108 {
109 auto lock = data_.lock();
110 std::optional<model::LedgerData> out;
111
112 if (not lock->forwardLoadQueue.empty() &&
113 lock->forwardLoadQueue.top().seq == lock->expectedSequence) {
114 out.emplace(lock->forwardLoadQueue.top());
115 lock->forwardLoadQueue.pop();
116 lock->expectedSequence += increment_;
117 }
118
119 return out;
120 }
121
128 [[nodiscard]] bool
130 {
131 return data_.lock()->forwardLoadQueue.empty();
132 }
133
138 void
140 {
141 if (stopping_)
142 return;
143
144 auto lock = data_.lock<std::unique_lock>();
145 cv_.wait(lock, [&] { return stopping_ or not lock->forwardLoadQueue.empty(); });
146 }
147
152 void
154 {
155 // unblock all waiters
156 stopping_ = true;
157 cv_.notify_all();
158 }
159};
160
161} // namespace etl::impl
A wrapper for std::priority_queue that serialises operations using a mutex.
Definition TaskQueue.hpp:32
void stop()
Notify the queue that it's no longer needed.
Definition TaskQueue.hpp:153
TaskQueue(Settings settings)
Construct a new priority queue.
Definition TaskQueue.hpp:67
bool empty()
Check if the queue is empty.
Definition TaskQueue.hpp:129
void awaitTask()
Awaits for the queue to become non-empty.
Definition TaskQueue.hpp:139
std::optional< model::LedgerData > dequeue()
Dequeue the next available item out of the queue.
Definition TaskQueue.hpp:107
bool enqueue(model::LedgerData item)
Enqueue a new item onto the queue if space is available.
Definition TaskQueue.hpp:87
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:82
Definition TaskQueue.hpp:19
Definition TaskQueue.hpp:54
Represents an entire ledger diff worth of transactions and objects.
Definition Models.hpp:124