22#include "util/Mutex.hpp"
23#include "util/log/Logger.hpp"
24#include "util/newconfig/ConfigDefinition.hpp"
25#include "util/prometheus/Counter.hpp"
26#include "util/prometheus/Gauge.hpp"
28#include <boost/asio.hpp>
29#include <boost/asio/spawn.hpp>
30#include <boost/asio/thread_pool.hpp>
31#include <boost/json.hpp>
32#include <boost/json/object.hpp>
48 std::reference_wrapper<util::prometheus::CounterInt> queued_;
49 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
51 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
52 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
55 boost::asio::thread_pool ioc_;
57 std::atomic_bool stopping_;
59 class OneTimeCallable {
60 std::function<void()> func_;
65 setCallable(std::function<
void()> func);
70 operator bool()
const;
81 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
90 stop(std::function<
void()> onQueueEmpty);
111 template <
typename FnType>
116 LOG(log_.
warn()) <<
"Queue is stopping, rejecting incoming task.";
120 if (curSize_.get().value() >= maxSize_ && !isWhiteListed) {
121 LOG(log_.
warn()) <<
"Queue is full. rejecting job. current size = " << curSize_.get().value()
122 <<
"; max size = " << maxSize_;
132 [
this, func = std::forward<FnType>(func), start = std::chrono::system_clock::now()](
auto yield)
mutable {
133 auto const run = std::chrono::system_clock::now();
134 auto const wait = std::chrono::duration_cast<std::chrono::microseconds>(run - start).count();
137 durationUs_.get() += wait;
138 LOG(log_.
info()) <<
"WorkQueue wait time = " << wait <<
" queue size = " << curSize_.get().value();
142 if (curSize_.get().value() == 0 && stopping_) {
143 auto onTasksComplete = onQueueEmpty_.
lock();
144 ASSERT(onTasksComplete->operator
bool(),
"onTasksComplete must be set when stopping is true.");
145 onTasksComplete->operator()();
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:46
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an we instance of the work queue.
Definition WorkQueue.cpp:54
bool postCoro(FnType &&func, bool isWhiteListed)
Submit a job to the work queue.
Definition WorkQueue.hpp:113
void join()
Wait until all the jobs in the queue are finished.
Definition WorkQueue.cpp:118
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:124
void stop(std::function< void()> onQueueEmpty)
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:82
boost::json::object report() const
Generate a report of the work queue state.
Definition WorkQueue.cpp:105
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:93
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:210
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:134
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:36