rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1#include <xrpld/core/JobQueue.h>
2#include <xrpld/perflog/PerfLog.h>
3
4#include <xrpl/basics/contract.h>
5
6#include <mutex>
7
8namespace ripple {
9
11 int threadCount,
12 beast::insight::Collector::ptr const& collector,
13 beast::Journal journal,
14 Logs& logs,
15 perf::PerfLog& perfLog)
16 : m_journal(journal)
17 , m_lastJob(0)
18 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
19 , m_processCount(0)
20 , m_workers(*this, &perfLog, "JobQueue", threadCount)
21 , perfLog_(perfLog)
22 , m_collector(collector)
23{
24 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
25
26 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
27 job_count = m_collector->make_gauge("job_count");
28
29 {
31
32 for (auto const& x : JobTypes::instance())
33 {
34 JobTypeInfo const& jt = x.second;
35
36 // And create dynamic information for all jobs
37 auto const result(m_jobData.emplace(
41 XRPL_ASSERT(
42 result.second == true,
43 "ripple::JobQueue::JobQueue : jobs added");
44 (void)result.second;
45 }
46 }
47}
48
50{
51 // Must unhook before destroying
53}
54
55void
61
62bool
64 JobType type,
65 std::string const& name,
66 JobFunction const& func)
67{
68 XRPL_ASSERT(
69 type != jtINVALID,
70 "ripple::JobQueue::addRefCountedJob : valid input job type");
71
72 auto iter(m_jobData.find(type));
73 XRPL_ASSERT(
74 iter != m_jobData.end(),
75 "ripple::JobQueue::addRefCountedJob : job type found in jobs");
76 if (iter == m_jobData.end())
77 return false;
78
79 JLOG(m_journal.debug())
80 << __func__ << " : Adding job : " << name << " : " << type;
81 JobTypeData& data(iter->second);
82
83 // FIXME: Workaround incorrect client shutdown ordering
84 // do not add jobs to a queue with no threads
85 XRPL_ASSERT(
86 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
88 "ripple::JobQueue::addRefCountedJob : threads available or job "
89 "requires no threads");
90
91 {
93 auto result =
94 m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
95 auto const& job = *result.first;
96
97 JobType const type(job.getType());
98 XRPL_ASSERT(
99 type != jtINVALID,
100 "ripple::JobQueue::addRefCountedJob : has valid job type");
101 XRPL_ASSERT(
102 m_jobSet.find(job) != m_jobSet.end(),
103 "ripple::JobQueue::addRefCountedJob : job found");
104 perfLog_.jobQueue(type);
105
106 JobTypeData& data(getJobTypeData(type));
107
108 if (data.waiting + data.running < getJobLimit(type))
109 {
111 }
112 else
113 {
114 // defer the task until we go below the limit
115 ++data.deferred;
116 }
117 ++data.waiting;
118 }
119 return true;
120}
121
122int
124{
126
127 JobDataMap::const_iterator c = m_jobData.find(t);
128
129 return (c == m_jobData.end()) ? 0 : c->second.waiting;
130}
131
132int
134{
136
137 JobDataMap::const_iterator c = m_jobData.find(t);
138
139 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
140}
141
142int
144{
145 // return the number of jobs at this priority level or greater
146 int ret = 0;
147
149
150 for (auto const& x : m_jobData)
151 {
152 if (x.first >= t)
153 ret += x.second.waiting;
154 }
155
156 return ret;
157}
158
161{
162 JobDataMap::iterator iter(m_jobData.find(t));
163 XRPL_ASSERT(
164 iter != m_jobData.end(),
165 "ripple::JobQueue::makeLoadEvent : valid job type input");
166
167 if (iter == m_jobData.end())
168 return {};
169
170 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
171}
172
173void
175{
176 if (isStopped())
177 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
178
179 JobDataMap::iterator iter(m_jobData.find(t));
180 XRPL_ASSERT(
181 iter != m_jobData.end(),
182 "ripple::JobQueue::addLoadEvents : valid job type input");
183 iter->second.load().addSamples(count, elapsed);
184}
185
186bool
188{
189 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
190 return entry.second.load().isOver();
191 });
192}
193
196{
197 using namespace std::chrono_literals;
199
200 ret["threads"] = m_workers.getNumberOfThreads();
201
202 Json::Value priorities = Json::arrayValue;
203
205
206 for (auto& x : m_jobData)
207 {
208 XRPL_ASSERT(
209 x.first != jtINVALID, "ripple::JobQueue::getJson : valid job type");
210
211 if (x.first == jtGENERIC)
212 continue;
213
214 JobTypeData& data(x.second);
215
216 LoadMonitor::Stats stats(data.stats());
217
218 int waiting(data.waiting);
219 int running(data.running);
220
221 if ((stats.count != 0) || (waiting != 0) ||
222 (stats.latencyPeak != 0ms) || (running != 0))
223 {
224 Json::Value& pri = priorities.append(Json::objectValue);
225
226 pri["job_type"] = data.name();
227
228 if (stats.isOverloaded)
229 pri["over_target"] = true;
230
231 if (waiting != 0)
232 pri["waiting"] = waiting;
233
234 if (stats.count != 0)
235 pri["per_second"] = static_cast<int>(stats.count);
236
237 if (stats.latencyPeak != 0ms)
238 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
239
240 if (stats.latencyAvg != 0ms)
241 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
242
243 if (running != 0)
244 pri["in_progress"] = running;
245 }
246 }
247
248 ret["job_types"] = priorities;
249
250 return ret;
251}
252
253void
255{
257 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
258}
259
262{
263 JobDataMap::iterator c(m_jobData.find(type));
264 XRPL_ASSERT(
265 c != m_jobData.end(),
266 "ripple::JobQueue::getJobTypeData : valid job type input");
267
268 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
269 // and use something sane.
270 if (c == m_jobData.end())
271 return m_invalidJobData;
272
273 return c->second;
274}
275
276void
278{
279 stopping_ = true;
280 using namespace std::chrono_literals;
281 jobCounter_.join("JobQueue", 1s, m_journal);
282 {
283 // After the JobCounter is joined, all jobs have finished executing
284 // (i.e. returned from `Job::doJob`) and no more are being accepted,
285 // but there may still be some threads between the return of
286 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
287 // we must wait on the condition variable to make these assertions.
289 cv_.wait(
290 lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
291 XRPL_ASSERT(
292 m_processCount == 0,
293 "ripple::JobQueue::stop : all processes completed");
294 XRPL_ASSERT(
295 m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
296 XRPL_ASSERT(
297 nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
298 stopped_ = true;
299 }
300}
301
302bool
304{
305 return stopped_;
306}
307
308void
310{
311 XRPL_ASSERT(
312 !m_jobSet.empty(), "ripple::JobQueue::getNextJob : non-empty jobs");
313
315 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
316 {
317 JobType const type = iter->getType();
318 XRPL_ASSERT(
319 type != jtINVALID, "ripple::JobQueue::getNextJob : valid job type");
320
321 JobTypeData& data(getJobTypeData(type));
322 XRPL_ASSERT(
323 data.running <= getJobLimit(type),
324 "ripple::JobQueue::getNextJob : maximum jobs running");
325
326 // Run this job if we're running below the limit.
327 if (data.running < getJobLimit(data.type()))
328 {
329 XRPL_ASSERT(
330 data.waiting > 0,
331 "ripple::JobQueue::getNextJob : positive data waiting");
332 --data.waiting;
333 ++data.running;
334 break;
335 }
336 }
337
338 XRPL_ASSERT(
339 iter != m_jobSet.end(),
340 "ripple::JobQueue::getNextJob : found next job");
341 job = *iter;
342 m_jobSet.erase(iter);
343}
344
345void
347{
348 XRPL_ASSERT(
349 type != jtINVALID,
350 "ripple::JobQueue::finishJob : valid input job type");
351
352 JobTypeData& data = getJobTypeData(type);
353
354 // Queue a deferred task if possible
355 if (data.deferred > 0)
356 {
357 XRPL_ASSERT(
358 data.running + data.waiting >= getJobLimit(type),
359 "ripple::JobQueue::finishJob : job limit");
360
361 --data.deferred;
363 }
364
365 --data.running;
366}
367
368void
370{
371 JobType type;
372
373 {
374 using namespace std::chrono;
375 Job::clock_type::time_point const start_time(Job::clock_type::now());
376 {
377 Job job;
378 {
380 getNextJob(job);
382 }
383 type = job.getType();
384 JobTypeData& data(getJobTypeData(type));
385 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
386
387 // The amount of time that the job was in the queue
388 auto const q_time =
389 ceil<microseconds>(start_time - job.queue_time());
390 perfLog_.jobStart(type, q_time, start_time, instance);
391
392 job.doJob();
393
394 // The amount of time it took to execute the job
395 auto const x_time =
396 ceil<microseconds>(Job::clock_type::now() - start_time);
397
398 if (x_time >= 10ms || q_time >= 10ms)
399 {
400 getJobTypeData(type).dequeue.notify(q_time);
401 getJobTypeData(type).execute.notify(x_time);
402 }
403 perfLog_.jobFinish(type, x_time, instance);
404 }
405 }
406
407 {
409 // Job should be destroyed before stopping
410 // otherwise destructors with side effects can access
411 // parent objects that are already destroyed.
412 finishJob(type);
413 if (--m_processCount == 0 && m_jobSet.empty())
414 cv_.notify_all();
415 }
416
417 // Note that when Job::~Job is called, the last reference
418 // to the associated LoadEvent object (in the Job) may be destroyed.
419}
420
421int
423{
424 JobTypeInfo const& j(JobTypes::instance().get(type));
425 XRPL_ASSERT(
426 j.type() != jtINVALID,
427 "ripple::JobQueue::getJobLimit : valid job type");
428
429 return j.limit();
430}
431
432} // namespace ripple
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition Event.h:45
A reference to a handler for performing polled collection.
Definition Hook.h:13
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
JobTypeData m_invalidJobData
Definition JobQueue.h:236
int getJobLimit(JobType type)
Definition JobQueue.cpp:422
std::atomic_bool stopped_
Definition JobQueue.h:234
std::uint64_t m_lastJob
Definition JobQueue.h:230
JobDataMap m_jobData
Definition JobQueue.h:235
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:254
JobCounter jobCounter_
Definition JobQueue.h:232
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:133
bool isStopped() const
Definition JobQueue.cpp:303
perf::PerfLog & perfLog_
Definition JobQueue.h:247
Workers m_workers
Definition JobQueue.h:244
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:261
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:143
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:174
Json::Value getJson(int c=0)
Definition JobQueue.cpp:195
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:248
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:63
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:160
std::set< Job > m_jobSet
Definition JobQueue.h:231
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:123
beast::insight::Hook hook
Definition JobQueue.h:250
beast::Journal m_journal
Definition JobQueue.h:228
std::mutex m_mutex
Definition JobQueue.h:229
beast::insight::Gauge job_count
Definition JobQueue.h:249
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:369
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:10
std::atomic_bool stopping_
Definition JobQueue.h:233
void finishJob(JobType type)
Definition JobQueue.cpp:346
void getNextJob(Job &job)
Definition JobQueue.cpp:309
std::condition_variable cv_
Definition JobQueue.h:252
Holds all the 'static' information about a job, which does not change.
Definition JobTypeInfo.h:10
JobType type() const
Definition JobTypeInfo.h:45
int limit() const
Definition JobTypeInfo.h:57
static JobTypes const & instance()
Definition JobTypes.h:109
void doJob()
Definition Job.cpp:43
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition Job.cpp:37
JobType getType() const
Definition Job.cpp:31
Manages partitions for logging.
Definition Log.h:33
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:35
void addTask()
Add a task to be performed.
Definition Workers.cpp:110
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:33
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
virtual void jobQueue(JobType const type)=0
Log queued job.
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T forward_as_tuple(T... args)
T is_same_v
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
JobType
Definition Job.h:16
@ jtINVALID
Definition Job.h:18
@ jtGENERIC
Definition Job.h:68
@ jtCLIENT
Definition Job.h:26
@ jtCLIENT_WEBSOCKET
Definition Job.h:32
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T piecewise_construct
beast::insight::Event execute
Definition JobTypeData.h:34
beast::insight::Event dequeue
Definition JobTypeData.h:33
std::chrono::milliseconds latencyAvg
Definition LoadMonitor.h:44
std::chrono::milliseconds latencyPeak
Definition LoadMonitor.h:45