Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WorkQueue.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Mutex.hpp"
23#include "util/config/ConfigDefinition.hpp"
24#include "util/log/Logger.hpp"
25#include "util/prometheus/Counter.hpp"
26#include "util/prometheus/Gauge.hpp"
27
28#include <boost/asio/spawn.hpp>
29#include <boost/asio/thread_pool.hpp>
30#include <boost/json/object.hpp>
31
32#include <atomic>
33#include <chrono>
34#include <cstddef>
35#include <cstdint>
36#include <functional>
37#include <limits>
38#include <optional>
39#include <queue>
40#include <utility>
41
42namespace rpc {
43
47struct Reportable {
48 virtual ~Reportable() = default;
49
55 [[nodiscard]] virtual boost::json::object
56 report() const = 0;
57};
58
62class WorkQueue : public Reportable {
63 using TaskType = std::function<void(boost::asio::yield_context)>;
64
65 struct TaskWithTimestamp {
66 TaskType task;
67 std::chrono::system_clock::time_point queuedAt;
68 };
69
70 using QueueType = std::queue<TaskWithTimestamp>;
71
72public:
76 enum class Priority : uint8_t {
77 High,
78 Default,
79 };
80
81private:
82 struct QueueState {
83 QueueType high;
84 QueueType normal;
85
86 size_t highPriorityCounter = 0;
87
88 void
89 push(Priority priority, TaskType&& task)
90 {
91 auto& queue = [this, priority] -> QueueType& {
92 if (priority == Priority::High)
93 return high;
94 return normal;
95 }();
96 queue.push(TaskWithTimestamp{.task = std::move(task), .queuedAt = std::chrono::system_clock::now()});
97 }
98
99 [[nodiscard]] bool
100 empty() const
101 {
102 return high.empty() and normal.empty();
103 }
104
105 [[nodiscard]] std::optional<TaskWithTimestamp>
106 popNext()
107 {
108 if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) {
109 auto taskWithTimestamp = std::move(high.front());
110 high.pop();
111 ++highPriorityCounter;
112 return taskWithTimestamp;
113 }
114
115 if (not normal.empty()) {
116 auto taskWithTimestamp = std::move(normal.front());
117 normal.pop();
118 highPriorityCounter = 0;
119 return taskWithTimestamp;
120 }
121
122 return std::nullopt;
123 }
124 };
125
126private:
127 static constexpr auto kTAKE_HIGH_PRIO = 4uz;
128
129 // these are cumulative for the lifetime of the process
130 std::reference_wrapper<util::prometheus::CounterInt> queued_;
131 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
132
133 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
134 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
135
136 util::Logger log_{"RPC"};
137 boost::asio::thread_pool ioc_;
138
139 std::atomic_bool stopping_;
140 std::atomic_bool processingStarted_{false};
141
142 class OneTimeCallable {
143 std::function<void()> func_;
144 bool called_{false};
145
146 public:
147 void
148 setCallable(std::function<void()> func);
149
150 void
151 operator()();
152
153 explicit
154 operator bool() const;
155 };
156 util::Mutex<OneTimeCallable> onQueueEmpty_;
157 util::Mutex<QueueState> queueState_;
158
159public:
161 static constexpr DontStartProcessingTag kDONT_START_PROCESSING_TAG = {};
162
171 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
172
181 WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0);
182
183 ~WorkQueue() override;
184
188 void
190
196 void
197 requestStop(std::function<void()> onQueueEmpty = [] {});
198
202 void
203 stop();
204
211 [[nodiscard]] static WorkQueue
213
224 bool
225 postCoro(TaskType func, bool isWhiteListed, Priority priority = Priority::Default);
226
232 [[nodiscard]] boost::json::object
233 report() const override;
234
238 void
240
246 [[nodiscard]] size_t
247 size() const;
248
249private:
250 void
251 executeTask(boost::asio::yield_context yield);
252};
253
254} // namespace rpc
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:62
void stop()
Put the work queue into a stopping state and await workers to finish.
Definition WorkQueue.cpp:145
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an instance of the work queue.
Definition WorkQueue.cpp:83
boost::json::object report() const override
Generate a report of the work queue state.
Definition WorkQueue.cpp:172
void startProcessing()
Start processing of the enqueued tasks.
Definition WorkQueue.cpp:95
bool postCoro(TaskType func, bool isWhiteListed, Priority priority=Priority::Default)
Submit a job to the work queue.
Definition WorkQueue.cpp:108
Priority
Represents a task scheduling priority.
Definition WorkQueue.hpp:76
void join()
Wait until all the jobs in the queue are finished.
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:185
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:160
void requestStop(std::function< void()> onQueueEmpty=[] {})
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:136
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
An interface for any class providing a report as json object.
Definition WorkQueue.hpp:47
virtual boost::json::object report() const =0
Generate a report of the work queue state.
Definition WorkQueue.hpp:160