Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WorkQueue.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Mutex.hpp"
23#include "util/config/ConfigDefinition.hpp"
24#include "util/log/Logger.hpp"
25#include "util/prometheus/Counter.hpp"
26#include "util/prometheus/Gauge.hpp"
27
28#include <boost/asio.hpp>
29#include <boost/asio/spawn.hpp>
30#include <boost/asio/steady_timer.hpp>
31#include <boost/asio/strand.hpp>
32#include <boost/asio/thread_pool.hpp>
33#include <boost/json.hpp>
34#include <boost/json/object.hpp>
35
36#include <atomic>
37#include <cstddef>
38#include <cstdint>
39#include <functional>
40#include <limits>
41#include <optional>
42#include <queue>
43#include <utility>
44
45namespace rpc {
46
50struct Reportable {
51 virtual ~Reportable() = default;
52
58 [[nodiscard]] virtual boost::json::object
59 report() const = 0;
60};
61
65class WorkQueue : public Reportable {
66 using TaskType = std::function<void(boost::asio::yield_context)>;
67 using QueueType = std::queue<TaskType>;
68
69public:
73 enum class Priority : uint8_t {
74 High,
75 Default,
76 };
77
78private:
79 struct DispatcherState {
80 QueueType high;
81 QueueType normal;
82
83 bool isIdle = false;
84 size_t highPriorityCounter = 0;
85
86 void
87 push(Priority priority, auto&& task)
88 {
89 auto& queue = [this, priority] -> QueueType& {
90 if (priority == Priority::High)
91 return high;
92 return normal;
93 }();
94 queue.push(std::forward<decltype(task)>(task));
95 }
96
97 [[nodiscard]] bool
98 empty() const
99 {
100 return high.empty() and normal.empty();
101 }
102
103 [[nodiscard]] std::optional<TaskType>
104 popNext()
105 {
106 if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) {
107 auto task = std::move(high.front());
108 high.pop();
109 ++highPriorityCounter;
110 return task;
111 }
112
113 if (not normal.empty()) {
114 auto task = std::move(normal.front());
115 normal.pop();
116 highPriorityCounter = 0;
117 return task;
118 }
119
120 return std::nullopt;
121 }
122 };
123
124private:
125 static constexpr auto kTAKE_HIGH_PRIO = 4uz;
126
127 // these are cumulative for the lifetime of the process
128 std::reference_wrapper<util::prometheus::CounterInt> queued_;
129 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
130
131 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
132 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
133
134 util::Logger log_{"RPC"};
135 boost::asio::thread_pool ioc_;
136 boost::asio::strand<boost::asio::thread_pool::executor_type> strand_;
137 bool hasDispatcher_ = false;
138
139 std::atomic_bool stopping_;
140
141 util::Mutex<std::function<void()>> onQueueEmpty_;
142 util::Mutex<DispatcherState> dispatcherState_;
143 boost::asio::steady_timer waitTimer_;
144
145public:
147 static constexpr DontStartProcessingTag kDONT_START_PROCESSING_TAG = {};
148
157 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
158
167 WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0);
168
169 ~WorkQueue() override;
170
174 void
176
182 void
183 requestStop(std::function<void()> onQueueEmpty = [] {});
184
188 void
189 stop();
190
197 [[nodiscard]] static WorkQueue
199
210 bool
211 postCoro(TaskType func, bool isWhiteListed, Priority priority = Priority::Default);
212
218 [[nodiscard]] boost::json::object
219 report() const override;
220
224 void
226
232 [[nodiscard]] size_t
233 size() const;
234
235private:
236 void
237 dispatcherLoop(boost::asio::yield_context yield);
238};
239
240} // namespace rpc
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:65
void stop()
Put the work queue into a stopping state and await workers to finish.
Definition WorkQueue.cpp:191
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an instance of the work queue.
Definition WorkQueue.cpp:66
boost::json::object report() const override
Generate a report of the work queue state.
Definition WorkQueue.cpp:212
void startProcessing()
Start processing of the enqueued tasks.
Definition WorkQueue.cpp:78
bool postCoro(TaskType func, bool isWhiteListed, Priority priority=Priority::Default)
Submit a job to the work queue.
Definition WorkQueue.cpp:89
Priority
Represents a task scheduling priority.
Definition WorkQueue.hpp:73
void join()
Wait until all the jobs in the queue are finished.
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:225
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:200
void requestStop(std::function< void()> onQueueEmpty=[] {})
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:173
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
An interface for any class providing a report as json object.
Definition WorkQueue.hpp:50
virtual boost::json::object report() const =0
Generate a report of the work queue state.
Definition WorkQueue.hpp:146