Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WorkQueue.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Mutex.hpp"
23#include "util/config/ConfigDefinition.hpp"
24#include "util/log/Logger.hpp"
25#include "util/prometheus/Counter.hpp"
26#include "util/prometheus/Gauge.hpp"
27
28#include <boost/asio/spawn.hpp>
29#include <boost/asio/thread_pool.hpp>
30#include <boost/json/object.hpp>
31
32#include <atomic>
33#include <chrono>
34#include <cstddef>
35#include <cstdint>
36#include <functional>
37#include <limits>
38#include <optional>
39#include <queue>
40#include <utility>
41
42namespace rpc {
43
47struct Reportable {
48 virtual ~Reportable() = default;
49
55 [[nodiscard]] virtual boost::json::object
56 report() const = 0;
57};
58
62class WorkQueue : public Reportable {
63 using TaskType = std::function<void(boost::asio::yield_context)>;
64
65 struct TaskWithTimestamp {
66 TaskType task;
67 std::chrono::system_clock::time_point queuedAt;
68 };
69
70 using QueueType = std::queue<TaskWithTimestamp>;
71
72public:
76 enum class Priority : uint8_t {
77 High,
78 Default,
79 };
80
81private:
82 struct QueueState {
83 QueueType high;
84 QueueType normal;
85
86 size_t highPriorityCounter = 0;
87
88 void
89 push(Priority priority, TaskType&& task)
90 {
91 auto& queue = [this, priority] -> QueueType& {
92 if (priority == Priority::High)
93 return high;
94 return normal;
95 }();
96 queue.push(
97 TaskWithTimestamp{
98 .task = std::move(task), .queuedAt = std::chrono::system_clock::now()
99 }
100 );
101 }
102
103 [[nodiscard]] bool
104 empty() const
105 {
106 return high.empty() and normal.empty();
107 }
108
109 [[nodiscard]] std::optional<TaskWithTimestamp>
110 popNext()
111 {
112 if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) {
113 auto taskWithTimestamp = std::move(high.front());
114 high.pop();
115 ++highPriorityCounter;
116 return taskWithTimestamp;
117 }
118
119 if (not normal.empty()) {
120 auto taskWithTimestamp = std::move(normal.front());
121 normal.pop();
122 highPriorityCounter = 0;
123 return taskWithTimestamp;
124 }
125
126 return std::nullopt;
127 }
128 };
129
130private:
131 static constexpr auto kTAKE_HIGH_PRIO = 4uz;
132
133 // these are cumulative for the lifetime of the process
134 std::reference_wrapper<util::prometheus::CounterInt> queued_;
135 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
136
137 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
138 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
139
140 util::Logger log_{"RPC"};
141 boost::asio::thread_pool ioc_;
142
143 std::atomic_bool stopping_;
144 std::atomic_bool processingStarted_{false};
145
146 class OneTimeCallable {
147 std::function<void()> func_;
148 bool called_{false};
149
150 public:
151 void
152 setCallable(std::function<void()> func);
153
154 void
155 operator()();
156
157 explicit
158 operator bool() const;
159 };
160 util::Mutex<OneTimeCallable> onQueueEmpty_;
161 util::Mutex<QueueState> queueState_;
162
163public:
165 static constexpr DontStartProcessingTag kDONT_START_PROCESSING_TAG = {};
166
175 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
176
185 WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0);
186
187 ~WorkQueue() override;
188
192 void
194
201 void
202 requestStop(std::function<void()> onQueueEmpty = [] {});
203
207 void
208 stop();
209
216 [[nodiscard]] static WorkQueue
218
230 bool
231 postCoro(TaskType func, bool isWhiteListed, Priority priority = Priority::Default);
232
238 [[nodiscard]] boost::json::object
239 report() const override;
240
244 void
246
252 [[nodiscard]] size_t
253 size() const;
254
255private:
256 void
257 executeTask(boost::asio::yield_context yield);
258};
259
260} // namespace rpc
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:62
void stop()
Put the work queue into a stopping state and await workers to finish.
Definition WorkQueue.cpp:146
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an instance of the work queue.
Definition WorkQueue.cpp:83
boost::json::object report() const override
Generate a report of the work queue state.
Definition WorkQueue.cpp:176
void startProcessing()
Start processing of the enqueued tasks.
Definition WorkQueue.cpp:95
bool postCoro(TaskType func, bool isWhiteListed, Priority priority=Priority::Default)
Submit a job to the work queue.
Definition WorkQueue.cpp:108
Priority
Represents a task scheduling priority.
Definition WorkQueue.hpp:76
void join()
Wait until all the jobs in the queue are finished.
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:189
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:163
void requestStop(std::function< void()> onQueueEmpty=[] {})
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:137
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
An interface for any class providing a report as json object.
Definition WorkQueue.hpp:47
virtual boost::json::object report() const =0
Generate a report of the work queue state.
Definition WorkQueue.hpp:164