xrpld
Loading...
Searching...
No Matches
JobQueue.cpp
1#include <xrpl/core/JobQueue.h>
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/contract.h>
5#include <xrpl/beast/insight/Collector.h>
6#include <xrpl/beast/utility/instrumentation.h>
7#include <xrpl/core/Job.h>
8#include <xrpl/core/JobTypeInfo.h>
9#include <xrpl/core/LoadEvent.h>
10#include <xrpl/core/PerfLog.h>
11#include <xrpl/json/json_value.h>
12
13#include <algorithm>
14#include <chrono>
15#include <functional>
16#include <memory>
17#include <mutex>
18#include <set>
19#include <tuple>
20#include <utility>
21
22namespace xrpl {
23
25 int threadCount,
26 beast::insight::Collector::ptr const& collector,
27 beast::Journal journal,
28 Logs& logs,
29 perf::PerfLog& perfLog)
30 : journal_(journal)
31 , invalidJobData_(JobTypes::instance().getInvalid(), collector, logs)
32 , workers_(*this, &perfLog, "JobQueue", threadCount)
33 , perfLog_(perfLog)
34 , collector_(collector)
35{
36 JLOG(journal_.info()) << "Using " << threadCount << " threads";
37
38 hook_ = collector_->makeHook(std::bind(&JobQueue::collect, this));
39 jobCount_ = collector_->makeGauge("job_count");
40
41 {
42 std::scoped_lock const lock(mutex_);
43
44 for (auto const& x : JobTypes::instance())
45 {
46 JobTypeInfo const& jt = x.second;
47
48 // And create dynamic information for all jobs
49 auto const result(jobData_.emplace(
53 XRPL_ASSERT(result.second == true, "xrpl::JobQueue::JobQueue : jobs added");
54 (void)result.second;
55 }
56 }
57}
58
60{
61 // Must unhook before destroying
63}
64
65void
67{
68 std::scoped_lock const lock(mutex_);
69 jobCount_ = jobSet_.size();
70}
71
72bool
74{
75 XRPL_ASSERT(type != JtInvalid, "xrpl::JobQueue::addRefCountedJob : valid input job type");
76
77 auto iter(jobData_.find(type));
78 XRPL_ASSERT(
79 iter != jobData_.end(), "xrpl::JobQueue::addRefCountedJob : job type found in jobs");
80 if (iter == jobData_.end())
81 return false;
82
83 JLOG(journal_.debug()) << __func__ << " : Adding job : " << name << " : " << type;
84 JobTypeData& data(iter->second);
85
86 // FIXME: Workaround incorrect client shutdown ordering
87 // do not add jobs to a queue with no threads
88 XRPL_ASSERT(
89 (type >= JtClient && type <= JtClientWebsocket) || workers_.getNumberOfThreads() > 0,
90 "xrpl::JobQueue::addRefCountedJob : threads available or job "
91 "requires no threads");
92
93 {
94 std::scoped_lock const lock(mutex_);
95 auto result = jobSet_.emplace(type, name, ++lastJob_, data.load(), func);
96 auto const& job = *result.first;
97
98 JobType const type(job.getType());
99 XRPL_ASSERT(type != JtInvalid, "xrpl::JobQueue::addRefCountedJob : has valid job type");
100 XRPL_ASSERT(jobSet_.contains(job), "xrpl::JobQueue::addRefCountedJob : job found");
101 perfLog_.jobQueue(type);
102
103 JobTypeData& data(getJobTypeData(type));
104
105 if (data.waiting + data.running < getJobLimit(type))
106 {
107 workers_.addTask();
108 }
109 else
110 {
111 // defer the task until we go below the limit
112 ++data.deferred;
113 }
114 ++data.waiting;
115 }
116 return true;
117}
118
119int
121{
122 std::scoped_lock const lock(mutex_);
123
124 JobDataMap::const_iterator const c = jobData_.find(t);
125
126 return (c == jobData_.end()) ? 0 : c->second.waiting;
127}
128
129int
131{
132 std::scoped_lock const lock(mutex_);
133
134 JobDataMap::const_iterator const c = jobData_.find(t);
135
136 return (c == jobData_.end()) ? 0 : (c->second.waiting + c->second.running);
137}
138
139int
141{
142 // return the number of jobs at this priority level or greater
143 int ret = 0;
144
145 std::scoped_lock const lock(mutex_);
146
147 for (auto const& x : jobData_)
148 {
149 if (x.first >= t)
150 ret += x.second.waiting;
151 }
152
153 return ret;
154}
155
158{
159 JobDataMap::iterator const iter(jobData_.find(t));
160 XRPL_ASSERT(iter != jobData_.end(), "xrpl::JobQueue::makeLoadEvent : valid job type input");
161
162 if (iter == jobData_.end())
163 return {};
164
165 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
166}
167
168void
170{
171 if (isStopped())
172 logicError("JobQueue::addLoadEvents() called after JobQueue stopped");
173
174 JobDataMap::iterator const iter(jobData_.find(t));
175 XRPL_ASSERT(iter != jobData_.end(), "xrpl::JobQueue::addLoadEvents : valid job type input");
176 iter->second.load().addSamples(count, elapsed);
177}
178
179bool
181{
182 return std::ranges::any_of(jobData_, [](auto& entry) { return entry.second.load().isOver(); });
183}
184
187{
188 using namespace std::chrono_literals;
190
191 ret["threads"] = workers_.getNumberOfThreads();
192
194
195 std::scoped_lock const lock(mutex_);
196
197 for (auto& x : jobData_)
198 {
199 XRPL_ASSERT(x.first != JtInvalid, "xrpl::JobQueue::getJson : valid job type");
200
201 if (x.first == JtGeneric)
202 continue;
203
204 JobTypeData& data(x.second);
205
206 LoadMonitor::Stats const stats(data.stats());
207
208 int const waiting(data.waiting);
209 int const running(data.running);
210
211 if ((stats.count != 0) || (waiting != 0) || (stats.latencyPeak != 0ms) || (running != 0))
212 {
213 json::Value& pri = priorities.append(json::ValueType::Object);
214
215 pri["job_type"] = data.name();
216
217 if (stats.isOverloaded)
218 pri["over_target"] = true;
219
220 if (waiting != 0)
221 pri["waiting"] = waiting;
222
223 if (stats.count != 0)
224 pri["per_second"] = static_cast<int>(stats.count);
225
226 if (stats.latencyPeak != 0ms)
227 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
228
229 if (stats.latencyAvg != 0ms)
230 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
231
232 if (running != 0)
233 pri["in_progress"] = running;
234 }
235 }
236
237 ret["job_types"] = priorities;
238
239 return ret;
240}
241
242void
244{
246 cv_.wait(lock, [this] { return processCount_ == 0 && jobSet_.empty(); });
247}
248
251{
252 JobDataMap::iterator const c(jobData_.find(type));
253 XRPL_ASSERT(c != jobData_.end(), "xrpl::JobQueue::getJobTypeData : valid job type input");
254
255 // NIKB: This is ugly and I hate it. We must remove JtInvalid completely
256 // and use something sane.
257 if (c == jobData_.end())
258 return invalidJobData_;
259
260 return c->second;
261}
262
263void
265{
266 stopping_ = true;
267 using namespace std::chrono_literals;
268 jobCounter_.join("JobQueue", 1s, journal_);
269 {
270 // After the JobCounter is joined, all jobs have finished executing
271 // (i.e. returned from `Job::doJob`) and no more are being accepted,
272 // but there may still be some threads between the return of
273 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
274 // we must wait on the condition variable to make these assertions.
276 cv_.wait(lock, [this] { return processCount_ == 0 && jobSet_.empty(); });
277 XRPL_ASSERT(processCount_ == 0, "xrpl::JobQueue::stop : all processes completed");
278 XRPL_ASSERT(jobSet_.empty(), "xrpl::JobQueue::stop : all jobs completed");
279 XRPL_ASSERT(nSuspend_ == 0, "xrpl::JobQueue::stop : no coros suspended");
280 stopped_ = true;
281 }
282}
283
284bool
286{
287 return stopped_;
288}
289
290void
292{
293 XRPL_ASSERT(!jobSet_.empty(), "xrpl::JobQueue::getNextJob : non-empty jobs");
294
295 std::set<Job>::const_iterator iter;
296 for (iter = jobSet_.begin(); iter != jobSet_.end(); ++iter)
297 {
298 JobType const type = iter->getType();
299 XRPL_ASSERT(type != JtInvalid, "xrpl::JobQueue::getNextJob : valid job type");
300
301 JobTypeData& data(getJobTypeData(type));
302 XRPL_ASSERT(
303 data.running <= getJobLimit(type), "xrpl::JobQueue::getNextJob : maximum jobs running");
304
305 // Run this job if we're running below the limit.
306 if (data.running < getJobLimit(data.type()))
307 {
308 XRPL_ASSERT(data.waiting > 0, "xrpl::JobQueue::getNextJob : positive data waiting");
309 --data.waiting;
310 ++data.running;
311 break;
312 }
313 }
314
315 XRPL_ASSERT(iter != jobSet_.end(), "xrpl::JobQueue::getNextJob : found next job");
316 job = *iter;
317 jobSet_.erase(iter);
318}
319
320void
322{
323 XRPL_ASSERT(type != JtInvalid, "xrpl::JobQueue::finishJob : valid input job type");
324
325 JobTypeData& data = getJobTypeData(type);
326
327 // Queue a deferred task if possible
328 if (data.deferred > 0)
329 {
330 XRPL_ASSERT(
331 data.running + data.waiting >= getJobLimit(type),
332 "xrpl::JobQueue::finishJob : job limit");
333
334 --data.deferred;
335 workers_.addTask();
336 }
337
338 --data.running;
339}
340
341void
343{
344 JobType type = JtInvalid;
345
346 {
347 using namespace std::chrono;
348 Job::clock_type::time_point const startTime(Job::clock_type::now());
349 {
350 Job job;
351 {
353 getNextJob(job);
355 }
356 type = job.getType();
357 JobTypeData const& data(getJobTypeData(type));
358 JLOG(journal_.trace()) << "Doing " << data.name() << "job";
359
360 // The amount of time that the job was in the queue
361 auto const qTime = ceil<microseconds>(startTime - job.queueTime());
362 perfLog_.jobStart(type, qTime, startTime, instance);
363
364 job.doJob();
365
366 // The amount of time it took to execute the job
367 auto const xTime = ceil<microseconds>(Job::clock_type::now() - startTime);
368
369 if (xTime >= 10ms || qTime >= 10ms)
370 {
371 getJobTypeData(type).dequeue.notify(qTime);
372 getJobTypeData(type).execute.notify(xTime);
373 }
374 perfLog_.jobFinish(type, xTime, instance);
375 }
376 }
377
378 {
380 // Job should be destroyed before stopping
381 // otherwise destructors with side effects can access
382 // parent objects that are already destroyed.
383 finishJob(type);
384 if (--processCount_ == 0 && jobSet_.empty())
385 cv_.notify_all();
386 }
387
388 // Note that when Job::~Job is called, the last reference
389 // to the associated LoadEvent object (in the Job) may be destroyed.
390}
391
392int
394{
395 JobTypeInfo const& j(JobTypes::instance().get(type));
396 XRPL_ASSERT(j.type() != JtInvalid, "xrpl::JobQueue::getJobLimit : valid job type");
397
398 return j.limit();
399}
400
401} // namespace xrpl
T any_of(T... args)
T bind(T... args)
T ceil(T... args)
A generic endpoint for log messages.
Definition Journal.h:38
std::shared_ptr< Collector > ptr
Definition Collector.h:26
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition Event.h:42
A reference to a handler for performing polled collection.
Definition Hook.h:12
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:140
json::Value getJson(int c=0)
Definition JobQueue.cpp:186
std::function< void()> JobFunction
Definition JobQueue.h:127
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:342
JobCounter jobCounter_
Definition JobQueue.h:232
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:250
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:130
bool isStopped() const
Definition JobQueue.cpp:285
~JobQueue() override
Definition JobQueue.cpp:59
Workers workers_
Definition JobQueue.h:244
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:243
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:24
std::atomic_bool stopping_
Definition JobQueue.h:233
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:120
beast::insight::Collector::ptr collector_
Definition JobQueue.h:248
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:73
bool isOverloaded()
Definition JobQueue.cpp:180
beast::Journal journal_
Definition JobQueue.h:228
JobDataMap jobData_
Definition JobQueue.h:235
std::atomic_bool stopped_
Definition JobQueue.h:234
beast::insight::Gauge jobCount_
Definition JobQueue.h:249
std::condition_variable cv_
Definition JobQueue.h:252
static int getJobLimit(JobType type)
Definition JobQueue.cpp:393
JobTypeData invalidJobData_
Definition JobQueue.h:236
std::mutex mutex_
Definition JobQueue.h:229
beast::insight::Hook hook_
Definition JobQueue.h:250
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:169
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:157
std::uint64_t lastJob_
Definition JobQueue.h:230
perf::PerfLog & perfLog_
Definition JobQueue.h:247
void finishJob(JobType type)
Definition JobQueue.cpp:321
void getNextJob(Job &job)
Definition JobQueue.cpp:291
std::set< Job > jobSet_
Definition JobQueue.h:231
Holds all the 'static' information about a job, which does not change.
Definition JobTypeInfo.h:9
int limit() const
Definition JobTypeInfo.h:56
JobType type() const
Definition JobTypeInfo.h:44
static JobTypes const & instance()
Definition JobTypes.h:102
JobType getType() const
Definition Job.cpp:34
void doJob()
Definition Job.cpp:46
clock_type::time_point const & queueTime() const
Returns the time when the job was queued.
Definition Job.cpp:40
Manages partitions for logging.
Definition Log.h:20
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
T data(T... args)
T forward_as_tuple(T... args)
T lock(T... args)
T make_unique(T... args)
@ Array
array value (ordered list)
Definition json_value.h:25
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
void logicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
JobType
Definition Job.h:16
@ JtGeneric
Definition Job.h:68
@ JtClient
Definition Job.h:26
@ JtInvalid
Definition Job.h:18
@ JtClientWebsocket
Definition Job.h:32
T piecewise_construct
beast::insight::Event dequeue
Definition JobTypeData.h:33
beast::insight::Event execute
Definition JobTypeData.h:34
std::chrono::milliseconds latencyPeak
Definition LoadMonitor.h:41
std::chrono::milliseconds latencyAvg
Definition LoadMonitor.h:40