Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WorkQueue.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Mutex.hpp"
23#include "util/config/ConfigDefinition.hpp"
24#include "util/log/Logger.hpp"
25#include "util/prometheus/Counter.hpp"
26#include "util/prometheus/Gauge.hpp"
27
28#include <boost/asio.hpp>
29#include <boost/asio/spawn.hpp>
30#include <boost/asio/steady_timer.hpp>
31#include <boost/asio/strand.hpp>
32#include <boost/asio/thread_pool.hpp>
33#include <boost/json.hpp>
34#include <boost/json/object.hpp>
35
36#include <atomic>
37#include <cstddef>
38#include <cstdint>
39#include <functional>
40#include <limits>
41#include <queue>
42
43namespace rpc {
44
48struct Reportable {
49 virtual ~Reportable() = default;
50
56 [[nodiscard]] virtual boost::json::object
57 report() const = 0;
58};
59
63class WorkQueue : public Reportable {
64 using TaskType = std::function<void(boost::asio::yield_context)>;
65 using QueueType = std::queue<TaskType>;
66
67public:
71 enum class Priority : uint8_t {
72 High,
73 Default,
74 };
75
76private:
77 struct DispatcherState {
78 QueueType high;
79 QueueType normal;
80
81 bool isIdle = false;
82
83 void
84 push(Priority priority, auto&& task)
85 {
86 auto& queue = [this, priority] -> QueueType& {
87 if (priority == Priority::High)
88 return high;
89 return normal;
90 }();
91 queue.push(std::forward<decltype(task)>(task));
92 }
93
94 [[nodiscard]] bool
95 empty() const
96 {
97 return high.empty() and normal.empty();
98 }
99 };
100
101private:
102 static constexpr auto kTAKE_HIGH_PRIO = 4uz;
103
104 // these are cumulative for the lifetime of the process
105 std::reference_wrapper<util::prometheus::CounterInt> queued_;
106 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
107
108 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
109 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
110
111 util::Logger log_{"RPC"};
112 boost::asio::thread_pool ioc_;
113 boost::asio::strand<boost::asio::thread_pool::executor_type> strand_;
114 bool hasDispatcher_ = false;
115
116 std::atomic_bool stopping_;
117
118 util::Mutex<std::function<void()>> onQueueEmpty_;
119 util::Mutex<DispatcherState> dispatcherState_;
120 boost::asio::steady_timer waitTimer_;
121
122public:
124 static constexpr DontStartProcessingTag kDONT_START_PROCESSING_TAG = {};
125
134 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
135
144 WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0);
145
146 ~WorkQueue() override;
147
151 void
153
159 void
160 requestStop(std::function<void()> onQueueEmpty = [] {});
161
165 void
166 stop();
167
174 [[nodiscard]] static WorkQueue
176
187 bool
188 postCoro(TaskType func, bool isWhiteListed, Priority priority = Priority::Default);
189
195 [[nodiscard]] boost::json::object
196 report() const override;
197
201 void
203
209 [[nodiscard]] size_t
210 size() const;
211
212private:
213 void
214 dispatcherLoop(boost::asio::yield_context yield);
215};
216
217} // namespace rpc
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:63
void stop()
Put the work queue into a stopping state and await workers to finish.
Definition WorkQueue.cpp:203
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an instance of the work queue.
Definition WorkQueue.cpp:66
boost::json::object report() const override
Generate a report of the work queue state.
Definition WorkQueue.cpp:224
void startProcessing()
Start processing of the enqueued tasks.
Definition WorkQueue.cpp:78
bool postCoro(TaskType func, bool isWhiteListed, Priority priority=Priority::Default)
Submit a job to the work queue.
Definition WorkQueue.cpp:89
Priority
Represents a task scheduling priority.
Definition WorkQueue.hpp:71
void join()
Wait until all the jobs in the queue are finished.
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:237
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:212
void requestStop(std::function< void()> onQueueEmpty=[] {})
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:185
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
An interface for any class providing a report as json object.
Definition WorkQueue.hpp:48
virtual boost::json::object report() const =0
Generate a report of the work queue state.
Definition WorkQueue.hpp:123