Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WorkQueue.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "util/Mutex.hpp"
23#include "util/log/Logger.hpp"
24#include "util/newconfig/ConfigDefinition.hpp"
25#include "util/prometheus/Counter.hpp"
26#include "util/prometheus/Gauge.hpp"
27
28#include <boost/asio.hpp>
29#include <boost/asio/spawn.hpp>
30#include <boost/asio/thread_pool.hpp>
31#include <boost/json.hpp>
32#include <boost/json/object.hpp>
33
34#include <atomic>
35#include <chrono>
36#include <cstddef>
37#include <cstdint>
38#include <functional>
39#include <limits>
40
41namespace rpc {
42
46class WorkQueue {
47 // these are cumulative for the lifetime of the process
48 std::reference_wrapper<util::prometheus::CounterInt> queued_;
49 std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
50
51 std::reference_wrapper<util::prometheus::GaugeInt> curSize_;
52 uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
53
54 util::Logger log_{"RPC"};
55 boost::asio::thread_pool ioc_;
56
57 std::atomic_bool stopping_;
58
59 class OneTimeCallable {
60 std::function<void()> func_;
61 bool called_{false};
62
63 public:
64 void
65 setCallable(std::function<void()> func);
66
67 void
68 operator()();
69
70 operator bool() const;
71 };
72 util::Mutex<OneTimeCallable> onQueueEmpty_;
73
74public:
81 WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
82 ~WorkQueue();
83
89 void
90 stop(std::function<void()> onQueueEmpty);
91
98 static WorkQueue
100
111 template <typename FnType>
112 bool
113 postCoro(FnType&& func, bool isWhiteListed)
114 {
115 if (stopping_) {
116 LOG(log_.warn()) << "Queue is stopping, rejecting incoming task.";
117 return false;
118 }
119
120 if (curSize_.get().value() >= maxSize_ && !isWhiteListed) {
121 LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << curSize_.get().value()
122 << "; max size = " << maxSize_;
123 return false;
124 }
125
126 ++curSize_.get();
127
128 // Each time we enqueue a job, we want to post a symmetrical job that will dequeue and run the job at the front
129 // of the job queue.
130 boost::asio::spawn(
131 ioc_,
132 [this, func = std::forward<FnType>(func), start = std::chrono::system_clock::now()](auto yield) mutable {
133 auto const run = std::chrono::system_clock::now();
134 auto const wait = std::chrono::duration_cast<std::chrono::microseconds>(run - start).count();
135
136 ++queued_.get();
137 durationUs_.get() += wait;
138 LOG(log_.info()) << "WorkQueue wait time = " << wait << " queue size = " << curSize_.get().value();
139
140 func(yield);
141 --curSize_.get();
142 if (curSize_.get().value() == 0 && stopping_) {
143 auto onTasksComplete = onQueueEmpty_.lock();
144 ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
145 onTasksComplete->operator()();
146 }
147 }
148 );
149
150 return true;
151 }
152
158 boost::json::object
159 report() const;
160
164 void
165 join();
166
172 size_t
173 size() const;
174};
175
176} // namespace rpc
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:46
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize=0)
Create an we instance of the work queue.
Definition WorkQueue.cpp:54
bool postCoro(FnType &&func, bool isWhiteListed)
Submit a job to the work queue.
Definition WorkQueue.hpp:113
void join()
Wait until all the jobs in the queue are finished.
Definition WorkQueue.cpp:118
size_t size() const
Get the size of the queue.
Definition WorkQueue.cpp:124
void stop(std::function< void()> onQueueEmpty)
Put the work queue into a stopping state. This will prevent new jobs from being queued.
Definition WorkQueue.cpp:82
boost::json::object report() const
Generate a report of the work queue state.
Definition WorkQueue.cpp:105
static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const &config)
A factory function that creates the work queue based on a config.
Definition WorkQueue.cpp:93
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:210
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:134
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:36