Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WorkQueue.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Assert.hpp"
23#include "util/Mutex.hpp"
24#include "util/Spawn.hpp"
25#include "util/config/ConfigDefinition.hpp"
26#include "util/log/Logger.hpp"
27#include "util/prometheus/Counter.hpp"
28#include "util/prometheus/Gauge.hpp"
29
30#include <boost/asio.hpp>
31#include <boost/asio/spawn.hpp>
32#include <boost/asio/thread_pool.hpp>
33#include <boost/json.hpp>
34#include <boost/json/object.hpp>
35
36#include <atomic>
37#include <chrono>
38#include <cstddef>
39#include <cstdint>
40#include <functional>
41#include <limits>
42
43namespace rpc {
44
48class WorkQueue {
49 // these are cumulative for the lifetime of the process
50 std::reference_wrapper<util::prometheus::CounterInt> queued_;
51 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
52
53 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
54 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
55
56 util::Logger log_{"RPC"};
57 boost::asio::thread_pool ioc_;
58
59 std::atomic_bool stopping_;
60
61 class OneTimeCallable {
62 std::function<void()> func_;
63 bool called_{false};
64
65 public:
66 void
67 setCallable(std::function<void()> func);
68
69 void
70 operator()();
71
72 operator bool() const;
73 };
74 util::Mutex<OneTimeCallable> onQueueEmpty_;
75
76public:
83 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
84 ~WorkQueue();
85
91 void
92 stop(std::function<void()> onQueueEmpty);
93
100 static WorkQueue
102
113 template <typename FnType>
114 bool
115 postCoro(FnType&& func, bool isWhiteListed)
116 {
117 if (stopping_) {
118 LOG(log_.warn()) << "Queue is stopping, rejecting incoming task.";
119 return false;
120 }
121
122 if (curSize_.get().value() >= maxSize_ && !isWhiteListed) {
123 LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << curSize_.get().value()
124 << "; max size = " << maxSize_;
125 return false;
126 }
127
128 ++curSize_.get();
129
130 // Each time we enqueue a job, we want to post a symmetrical job that will dequeue and run the job at the front
131 // of the job queue.
133 ioc_,
134 [this, func = std::forward<FnType>(func), start = std::chrono::system_clock::now()](auto yield) mutable {
135 auto const run = std::chrono::system_clock::now();
136 auto const wait = std::chrono::duration_cast<std::chrono::microseconds>(run - start).count();
137
138 ++queued_.get();
139 durationUs_.get() += wait;
140 LOG(log_.info()) << "WorkQueue wait time = " << wait << " queue size = " << curSize_.get().value();
141
142 func(yield);
143 --curSize_.get();
144 if (curSize_.get().value() == 0 && stopping_) {
145 auto onTasksComplete = onQueueEmpty_.lock();
146 ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
147 onTasksComplete->operator()();
148 }
149 }
150 );
151
152 return true;
153 }
154
160 boost::json::object
161 report() const;
162
166 void
167 join();
168
174 size_t
175 size() const;
176};
177
178} // namespace rpc
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:48
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an we instance of the work queue.
Definition WorkQueue.cpp:55
bool postCoro(FnType &&func, bool isWhiteListed)
Submit a job to the work queue.
Definition WorkQueue.hpp:115
void join()
Wait until all the jobs in the queue are finished.
Definition WorkQueue.cpp:119
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:125
void stop(std::function< void()> onQueueEmpty)
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:83
boost::json::object report() const
Generate a report of the work queue state.
Definition WorkQueue.cpp:106
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:94
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:317
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:312
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:134
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Definition Spawn.hpp:69