22#include "util/Assert.hpp"
23#include "util/Mutex.hpp"
24#include "util/Spawn.hpp"
25#include "util/config/ConfigDefinition.hpp"
26#include "util/log/Logger.hpp"
27#include "util/prometheus/Counter.hpp"
28#include "util/prometheus/Gauge.hpp"
30#include <boost/asio.hpp>
31#include <boost/asio/spawn.hpp>
32#include <boost/asio/thread_pool.hpp>
33#include <boost/json.hpp>
34#include <boost/json/object.hpp>
50 std::reference_wrapper<util::prometheus::CounterInt> queued_;
51 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
53 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
54 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
57 boost::asio::thread_pool ioc_;
59 std::atomic_bool stopping_;
61 class OneTimeCallable {
62 std::function<void()> func_;
67 setCallable(std::function<
void()> func);
72 operator bool()
const;
83 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
92 stop(std::function<
void()> onQueueEmpty);
113 template <
typename FnType>
118 LOG(log_.
warn()) <<
"Queue is stopping, rejecting incoming task.";
122 if (curSize_.get().value() >= maxSize_ && !isWhiteListed) {
123 LOG(log_.
warn()) <<
"Queue is full. rejecting job. current size = " << curSize_.get().value()
124 <<
"; max size = " << maxSize_;
134 [
this, func = std::forward<FnType>(func), start = std::chrono::system_clock::now()](
auto yield)
mutable {
135 auto const run = std::chrono::system_clock::now();
136 auto const wait = std::chrono::duration_cast<std::chrono::microseconds>(run - start).count();
139 durationUs_.get() += wait;
140 LOG(log_.
info()) <<
"WorkQueue wait time = " << wait <<
" queue size = " << curSize_.get().value();
144 if (curSize_.get().value() == 0 && stopping_) {
145 auto onTasksComplete = onQueueEmpty_.
lock();
146 ASSERT(onTasksComplete->operator
bool(),
"onTasksComplete must be set when stopping is true.");
147 onTasksComplete->operator()();
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:48
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an we instance of the work queue.
Definition WorkQueue.cpp:55
bool postCoro(FnType &&func, bool isWhiteListed)
Submit a job to the work queue.
Definition WorkQueue.hpp:115
void join()
Wait until all the jobs in the queue are finished.
Definition WorkQueue.cpp:119
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:125
void stop(std::function< void()> onQueueEmpty)
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:83
boost::json::object report() const
Generate a report of the work queue state.
Definition WorkQueue.cpp:106
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:94
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:317
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:312
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:134
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Definition Spawn.hpp:69