22#include "util/Mutex.hpp"
23#include "util/config/ConfigDefinition.hpp"
24#include "util/log/Logger.hpp"
25#include "util/prometheus/Counter.hpp"
26#include "util/prometheus/Gauge.hpp"
28#include <boost/asio.hpp>
29#include <boost/asio/spawn.hpp>
30#include <boost/asio/steady_timer.hpp>
31#include <boost/asio/strand.hpp>
32#include <boost/asio/thread_pool.hpp>
33#include <boost/json.hpp>
34#include <boost/json/object.hpp>
56 [[nodiscard]]
virtual boost::json::object
64 using TaskType = std::function<void(boost::asio::yield_context)>;
65 using QueueType = std::queue<TaskType>;
77 struct DispatcherState {
86 auto& queue = [
this, priority] -> QueueType& {
87 if (priority == Priority::High)
91 queue.push(std::forward<
decltype(task)>(task));
97 return high.empty() and normal.empty();
102 static constexpr auto kTAKE_HIGH_PRIO = 4uz;
105 std::reference_wrapper<util::prometheus::CounterInt> queued_;
106 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
108 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
109 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
111 util::Logger log_{
"RPC"};
112 boost::asio::thread_pool ioc_;
113 boost::asio::strand<boost::asio::thread_pool::executor_type> strand_;
114 bool hasDispatcher_ =
false;
116 std::atomic_bool stopping_;
118 util::Mutex<std::function<void()>> onQueueEmpty_;
119 util::Mutex<DispatcherState> dispatcherState_;
120 boost::asio::steady_timer waitTimer_;
134 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
144 WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0);
160 requestStop(std::function<
void()> onQueueEmpty = [] {});
188 postCoro(TaskType func,
bool isWhiteListed,
Priority priority = Priority::Default);
195 [[nodiscard]] boost::json::object
214 dispatcherLoop(boost::asio::yield_context yield);
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:63
void stop()
Put the work queue into a stopping state and await workers to finish.
Definition WorkQueue.cpp:203
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an instance of the work queue.
Definition WorkQueue.cpp:66
boost::json::object report() const override
Generate a report of the work queue state.
Definition WorkQueue.cpp:224
void startProcessing()
Start processing of the enqueued tasks.
Definition WorkQueue.cpp:78
bool postCoro(TaskType func, bool isWhiteListed, Priority priority=Priority::Default)
Submit a job to the work queue.
Definition WorkQueue.cpp:89
Priority
Represents a task scheduling priority.
Definition WorkQueue.hpp:71
void join()
Wait until all the jobs in the queue are finished.
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:237
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:212
void requestStop(std::function< void()> onQueueEmpty=[] {})
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:185
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
An interface for any class providing a report as json object.
Definition WorkQueue.hpp:48
virtual boost::json::object report() const =0
Generate a report of the work queue state.
Definition WorkQueue.hpp:123