Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WorkQueue.hpp
1#pragma once
2
3#include "util/Mutex.hpp"
4#include "util/config/ConfigDefinition.hpp"
5#include "util/log/Logger.hpp"
6#include "util/prometheus/Counter.hpp"
7#include "util/prometheus/Gauge.hpp"
8
9#include <boost/asio/spawn.hpp>
10#include <boost/asio/thread_pool.hpp>
11#include <boost/json/object.hpp>
12
13#include <atomic>
14#include <chrono>
15#include <cstddef>
16#include <cstdint>
17#include <functional>
18#include <limits>
19#include <optional>
20#include <queue>
21#include <utility>
22
23namespace rpc {
24
28struct Reportable {
29 virtual ~Reportable() = default;
30
36 [[nodiscard]] virtual boost::json::object
37 report() const = 0;
38};
39
43class WorkQueue : public Reportable {
44 using TaskType = std::function<void(boost::asio::yield_context)>;
45
46 struct TaskWithTimestamp {
47 TaskType task;
48 std::chrono::system_clock::time_point queuedAt;
49 };
50
51 using QueueType = std::queue<TaskWithTimestamp>;
52
53public:
57 enum class Priority : uint8_t {
58 High,
59 Default,
60 };
61
62private:
63 struct QueueState {
64 QueueType high;
65 QueueType normal;
66
67 size_t highPriorityCounter = 0;
68
69 void
70 push(Priority priority, TaskType&& task)
71 {
72 auto& queue = [this, priority] -> QueueType& {
73 if (priority == Priority::High)
74 return high;
75 return normal;
76 }();
77 queue.push(
78 TaskWithTimestamp{
79 .task = std::move(task), .queuedAt = std::chrono::system_clock::now()
80 }
81 );
82 }
83
84 [[nodiscard]] bool
85 empty() const
86 {
87 return high.empty() and normal.empty();
88 }
89
90 [[nodiscard]] std::optional<TaskWithTimestamp>
91 popNext()
92 {
93 if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) {
94 auto taskWithTimestamp = std::move(high.front());
95 high.pop();
96 ++highPriorityCounter;
97 return taskWithTimestamp;
98 }
99
100 if (not normal.empty()) {
101 auto taskWithTimestamp = std::move(normal.front());
102 normal.pop();
103 highPriorityCounter = 0;
104 return taskWithTimestamp;
105 }
106
107 return std::nullopt;
108 }
109 };
110
111private:
112 static constexpr auto kTAKE_HIGH_PRIO = 4uz;
113
114 // these are cumulative for the lifetime of the process
115 std::reference_wrapper<util::prometheus::CounterInt> queued_;
116 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
117
118 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
119 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
120
121 util::Logger log_{"RPC"};
122 boost::asio::thread_pool ioc_;
123
124 std::atomic_bool stopping_;
125 std::atomic_bool processingStarted_{false};
126
127 class OneTimeCallable {
128 std::function<void()> func_;
129 bool called_{false};
130
131 public:
132 void
133 setCallable(std::function<void()> func);
134
135 void
136 operator()();
137
138 explicit
139 operator bool() const;
140 };
141 util::Mutex<OneTimeCallable> onQueueEmpty_;
142 util::Mutex<QueueState> queueState_;
143
144public:
146 static constexpr DontStartProcessingTag kDONT_START_PROCESSING_TAG = {};
147
156 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
157
166 WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0);
167
168 ~WorkQueue() override;
169
173 void
175
182 void
183 requestStop(std::function<void()> onQueueEmpty = [] {});
184
188 void
189 stop();
190
197 [[nodiscard]] static WorkQueue
199
211 bool
212 postCoro(TaskType func, bool isWhiteListed, Priority priority = Priority::Default);
213
219 [[nodiscard]] boost::json::object
220 report() const override;
221
225 void
227
233 [[nodiscard]] size_t
234 size() const;
235
236private:
237 void
238 executeTask(boost::asio::yield_context yield);
239};
240
241} // namespace rpc
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:43
void stop()
Put the work queue into a stopping state and await workers to finish.
Definition WorkQueue.cpp:127
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an instance of the work queue.
Definition WorkQueue.cpp:64
boost::json::object report() const override
Generate a report of the work queue state.
Definition WorkQueue.cpp:157
void startProcessing()
Start processing of the enqueued tasks.
Definition WorkQueue.cpp:76
bool postCoro(TaskType func, bool isWhiteListed, Priority priority=Priority::Default)
Submit a job to the work queue.
Definition WorkQueue.cpp:89
Priority
Represents a task scheduling priority.
Definition WorkQueue.hpp:57
void join()
Wait until all the jobs in the queue are finished.
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:170
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:144
void requestStop(std::function< void()> onQueueEmpty=[] {})
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:118
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:18
An interface for any class providing a report as json object.
Definition WorkQueue.hpp:28
virtual boost::json::object report() const =0
Generate a report of the work queue state.
Definition WorkQueue.hpp:145