rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/core/JobQueue.h>
3#include <xrpl/core/PerfLog.h>
4
5#include <mutex>
6
7namespace xrpl {
8
10 int threadCount,
11 beast::insight::Collector::ptr const& collector,
12 beast::Journal journal,
13 Logs& logs,
14 perf::PerfLog& perfLog)
15 : m_journal(journal)
16 , m_lastJob(0)
17 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
18 , m_processCount(0)
19 , m_workers(*this, &perfLog, "JobQueue", threadCount)
20 , perfLog_(perfLog)
21 , m_collector(collector)
22{
23 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
24
25 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
26 job_count = m_collector->make_gauge("job_count");
27
28 {
30
31 for (auto const& x : JobTypes::instance())
32 {
33 JobTypeInfo const& jt = x.second;
34
35 // And create dynamic information for all jobs
36 auto const result(m_jobData.emplace(
40 XRPL_ASSERT(
41 result.second == true, "xrpl::JobQueue::JobQueue : jobs added");
42 (void)result.second;
43 }
44 }
45}
46
48{
49 // Must unhook before destroying
51}
52
53void
59
60bool
62 JobType type,
63 std::string const& name,
64 JobFunction const& func)
65{
66 XRPL_ASSERT(
67 type != jtINVALID,
68 "xrpl::JobQueue::addRefCountedJob : valid input job type");
69
70 auto iter(m_jobData.find(type));
71 XRPL_ASSERT(
72 iter != m_jobData.end(),
73 "xrpl::JobQueue::addRefCountedJob : job type found in jobs");
74 if (iter == m_jobData.end())
75 return false;
76
77 JLOG(m_journal.debug())
78 << __func__ << " : Adding job : " << name << " : " << type;
79 JobTypeData& data(iter->second);
80
81 // FIXME: Workaround incorrect client shutdown ordering
82 // do not add jobs to a queue with no threads
83 XRPL_ASSERT(
84 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
86 "xrpl::JobQueue::addRefCountedJob : threads available or job "
87 "requires no threads");
88
89 {
91 auto result =
92 m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
93 auto const& job = *result.first;
94
95 JobType const type(job.getType());
96 XRPL_ASSERT(
97 type != jtINVALID,
98 "xrpl::JobQueue::addRefCountedJob : has valid job type");
99 XRPL_ASSERT(
100 m_jobSet.find(job) != m_jobSet.end(),
101 "xrpl::JobQueue::addRefCountedJob : job found");
102 perfLog_.jobQueue(type);
103
104 JobTypeData& data(getJobTypeData(type));
105
106 if (data.waiting + data.running < getJobLimit(type))
107 {
109 }
110 else
111 {
112 // defer the task until we go below the limit
113 ++data.deferred;
114 }
115 ++data.waiting;
116 }
117 return true;
118}
119
120int
122{
124
125 JobDataMap::const_iterator c = m_jobData.find(t);
126
127 return (c == m_jobData.end()) ? 0 : c->second.waiting;
128}
129
130int
132{
134
135 JobDataMap::const_iterator c = m_jobData.find(t);
136
137 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
138}
139
140int
142{
143 // return the number of jobs at this priority level or greater
144 int ret = 0;
145
147
148 for (auto const& x : m_jobData)
149 {
150 if (x.first >= t)
151 ret += x.second.waiting;
152 }
153
154 return ret;
155}
156
159{
160 JobDataMap::iterator iter(m_jobData.find(t));
161 XRPL_ASSERT(
162 iter != m_jobData.end(),
163 "xrpl::JobQueue::makeLoadEvent : valid job type input");
164
165 if (iter == m_jobData.end())
166 return {};
167
168 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
169}
170
171void
173{
174 if (isStopped())
175 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
176
177 JobDataMap::iterator iter(m_jobData.find(t));
178 XRPL_ASSERT(
179 iter != m_jobData.end(),
180 "xrpl::JobQueue::addLoadEvents : valid job type input");
181 iter->second.load().addSamples(count, elapsed);
182}
183
184bool
186{
187 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
188 return entry.second.load().isOver();
189 });
190}
191
194{
195 using namespace std::chrono_literals;
197
198 ret["threads"] = m_workers.getNumberOfThreads();
199
200 Json::Value priorities = Json::arrayValue;
201
203
204 for (auto& x : m_jobData)
205 {
206 XRPL_ASSERT(
207 x.first != jtINVALID, "xrpl::JobQueue::getJson : valid job type");
208
209 if (x.first == jtGENERIC)
210 continue;
211
212 JobTypeData& data(x.second);
213
214 LoadMonitor::Stats stats(data.stats());
215
216 int waiting(data.waiting);
217 int running(data.running);
218
219 if ((stats.count != 0) || (waiting != 0) ||
220 (stats.latencyPeak != 0ms) || (running != 0))
221 {
222 Json::Value& pri = priorities.append(Json::objectValue);
223
224 pri["job_type"] = data.name();
225
226 if (stats.isOverloaded)
227 pri["over_target"] = true;
228
229 if (waiting != 0)
230 pri["waiting"] = waiting;
231
232 if (stats.count != 0)
233 pri["per_second"] = static_cast<int>(stats.count);
234
235 if (stats.latencyPeak != 0ms)
236 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
237
238 if (stats.latencyAvg != 0ms)
239 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
240
241 if (running != 0)
242 pri["in_progress"] = running;
243 }
244 }
245
246 ret["job_types"] = priorities;
247
248 return ret;
249}
250
251void
253{
255 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
256}
257
260{
261 JobDataMap::iterator c(m_jobData.find(type));
262 XRPL_ASSERT(
263 c != m_jobData.end(),
264 "xrpl::JobQueue::getJobTypeData : valid job type input");
265
266 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
267 // and use something sane.
268 if (c == m_jobData.end())
269 return m_invalidJobData;
270
271 return c->second;
272}
273
274void
276{
277 stopping_ = true;
278 using namespace std::chrono_literals;
279 jobCounter_.join("JobQueue", 1s, m_journal);
280 {
281 // After the JobCounter is joined, all jobs have finished executing
282 // (i.e. returned from `Job::doJob`) and no more are being accepted,
283 // but there may still be some threads between the return of
284 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
285 // we must wait on the condition variable to make these assertions.
287 cv_.wait(
288 lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
289 XRPL_ASSERT(
290 m_processCount == 0,
291 "xrpl::JobQueue::stop : all processes completed");
292 XRPL_ASSERT(
293 m_jobSet.empty(), "xrpl::JobQueue::stop : all jobs completed");
294 XRPL_ASSERT(
295 nSuspend_ == 0, "xrpl::JobQueue::stop : no coros suspended");
296 stopped_ = true;
297 }
298}
299
300bool
302{
303 return stopped_;
304}
305
306void
308{
309 XRPL_ASSERT(
310 !m_jobSet.empty(), "xrpl::JobQueue::getNextJob : non-empty jobs");
311
313 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
314 {
315 JobType const type = iter->getType();
316 XRPL_ASSERT(
317 type != jtINVALID, "xrpl::JobQueue::getNextJob : valid job type");
318
319 JobTypeData& data(getJobTypeData(type));
320 XRPL_ASSERT(
321 data.running <= getJobLimit(type),
322 "xrpl::JobQueue::getNextJob : maximum jobs running");
323
324 // Run this job if we're running below the limit.
325 if (data.running < getJobLimit(data.type()))
326 {
327 XRPL_ASSERT(
328 data.waiting > 0,
329 "xrpl::JobQueue::getNextJob : positive data waiting");
330 --data.waiting;
331 ++data.running;
332 break;
333 }
334 }
335
336 XRPL_ASSERT(
337 iter != m_jobSet.end(), "xrpl::JobQueue::getNextJob : found next job");
338 job = *iter;
339 m_jobSet.erase(iter);
340}
341
342void
344{
345 XRPL_ASSERT(
346 type != jtINVALID, "xrpl::JobQueue::finishJob : valid input job type");
347
348 JobTypeData& data = getJobTypeData(type);
349
350 // Queue a deferred task if possible
351 if (data.deferred > 0)
352 {
353 XRPL_ASSERT(
354 data.running + data.waiting >= getJobLimit(type),
355 "xrpl::JobQueue::finishJob : job limit");
356
357 --data.deferred;
359 }
360
361 --data.running;
362}
363
364void
366{
367 JobType type;
368
369 {
370 using namespace std::chrono;
371 Job::clock_type::time_point const start_time(Job::clock_type::now());
372 {
373 Job job;
374 {
376 getNextJob(job);
378 }
379 type = job.getType();
380 JobTypeData& data(getJobTypeData(type));
381 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
382
383 // The amount of time that the job was in the queue
384 auto const q_time =
385 ceil<microseconds>(start_time - job.queue_time());
386 perfLog_.jobStart(type, q_time, start_time, instance);
387
388 job.doJob();
389
390 // The amount of time it took to execute the job
391 auto const x_time =
392 ceil<microseconds>(Job::clock_type::now() - start_time);
393
394 if (x_time >= 10ms || q_time >= 10ms)
395 {
396 getJobTypeData(type).dequeue.notify(q_time);
397 getJobTypeData(type).execute.notify(x_time);
398 }
399 perfLog_.jobFinish(type, x_time, instance);
400 }
401 }
402
403 {
405 // Job should be destroyed before stopping
406 // otherwise destructors with side effects can access
407 // parent objects that are already destroyed.
408 finishJob(type);
409 if (--m_processCount == 0 && m_jobSet.empty())
410 cv_.notify_all();
411 }
412
413 // Note that when Job::~Job is called, the last reference
414 // to the associated LoadEvent object (in the Job) may be destroyed.
415}
416
417int
419{
420 JobTypeInfo const& j(JobTypes::instance().get(type));
421 XRPL_ASSERT(
422 j.type() != jtINVALID, "xrpl::JobQueue::getJobLimit : valid job type");
423
424 return j.limit();
425}
426
427} // namespace xrpl
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition Event.h:45
A reference to a handler for performing polled collection.
Definition Hook.h:13
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:141
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:365
beast::Journal m_journal
Definition JobQueue.h:227
std::mutex m_mutex
Definition JobQueue.h:228
JobCounter jobCounter_
Definition JobQueue.h:231
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:259
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:131
JobTypeData m_invalidJobData
Definition JobQueue.h:235
beast::insight::Gauge job_count
Definition JobQueue.h:248
bool isStopped() const
Definition JobQueue.cpp:301
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:252
Workers m_workers
Definition JobQueue.h:243
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:9
std::atomic_bool stopping_
Definition JobQueue.h:232
JobDataMap m_jobData
Definition JobQueue.h:234
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:121
std::set< Job > m_jobSet
Definition JobQueue.h:230
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:61
bool isOverloaded()
Definition JobQueue.cpp:185
std::atomic_bool stopped_
Definition JobQueue.h:233
std::condition_variable cv_
Definition JobQueue.h:251
int getJobLimit(JobType type)
Definition JobQueue.cpp:418
std::uint64_t m_lastJob
Definition JobQueue.h:229
beast::insight::Hook hook
Definition JobQueue.h:249
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:172
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:158
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:247
perf::PerfLog & perfLog_
Definition JobQueue.h:246
void finishJob(JobType type)
Definition JobQueue.cpp:343
Json::Value getJson(int c=0)
Definition JobQueue.cpp:193
void getNextJob(Job &job)
Definition JobQueue.cpp:307
Holds all the 'static' information about a job, which does not change.
Definition JobTypeInfo.h:10
int limit() const
Definition JobTypeInfo.h:57
JobType type() const
Definition JobTypeInfo.h:45
static JobTypes const & instance()
Definition JobTypes.h:109
JobType getType() const
Definition Job.cpp:30
void doJob()
Definition Job.cpp:42
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition Job.cpp:36
Manages partitions for logging.
Definition Log.h:33
void addTask()
Add a task to be performed.
Definition Workers.cpp:109
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:34
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:32
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobQueue(JobType const type)=0
Log queued job.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T forward_as_tuple(T... args)
T is_same_v
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
JobType
Definition Job.h:15
@ jtINVALID
Definition Job.h:17
@ jtCLIENT_WEBSOCKET
Definition Job.h:31
@ jtCLIENT
Definition Job.h:25
@ jtGENERIC
Definition Job.h:67
T piecewise_construct
beast::insight::Event dequeue
Definition JobTypeData.h:32
beast::insight::Event execute
Definition JobTypeData.h:33
std::chrono::milliseconds latencyPeak
Definition LoadMonitor.h:44
std::chrono::milliseconds latencyAvg
Definition LoadMonitor.h:43