rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/core/JobQueue.h>
3#include <xrpl/core/PerfLog.h>
4
5#include <mutex>
6
7namespace xrpl {
8
10 int threadCount,
11 beast::insight::Collector::ptr const& collector,
12 beast::Journal journal,
13 Logs& logs,
14 perf::PerfLog& perfLog)
15 : m_journal(journal)
16 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
17 , m_workers(*this, &perfLog, "JobQueue", threadCount)
18 , perfLog_(perfLog)
19 , m_collector(collector)
20{
21 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
22
23 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
24 job_count = m_collector->make_gauge("job_count");
25
26 {
27 std::lock_guard const lock(m_mutex);
28
29 for (auto const& x : JobTypes::instance())
30 {
31 JobTypeInfo const& jt = x.second;
32
33 // And create dynamic information for all jobs
34 auto const result(m_jobData.emplace(
38 XRPL_ASSERT(result.second == true, "xrpl::JobQueue::JobQueue : jobs added");
39 (void)result.second;
40 }
41 }
42}
43
45{
46 // Must unhook before destroying
48}
49
50void
52{
53 std::lock_guard const lock(m_mutex);
54 job_count = m_jobSet.size();
55}
56
57bool
59{
60 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::addRefCountedJob : valid input job type");
61
62 auto iter(m_jobData.find(type));
63 XRPL_ASSERT(
64 iter != m_jobData.end(), "xrpl::JobQueue::addRefCountedJob : job type found in jobs");
65 if (iter == m_jobData.end())
66 return false;
67
68 JLOG(m_journal.debug()) << __func__ << " : Adding job : " << name << " : " << type;
69 JobTypeData& data(iter->second);
70
71 // FIXME: Workaround incorrect client shutdown ordering
72 // do not add jobs to a queue with no threads
73 XRPL_ASSERT(
74 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) || m_workers.getNumberOfThreads() > 0,
75 "xrpl::JobQueue::addRefCountedJob : threads available or job "
76 "requires no threads");
77
78 {
79 std::lock_guard const lock(m_mutex);
80 auto result = m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
81 auto const& job = *result.first;
82
83 JobType const type(job.getType());
84 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::addRefCountedJob : has valid job type");
85 XRPL_ASSERT(m_jobSet.contains(job), "xrpl::JobQueue::addRefCountedJob : job found");
86 perfLog_.jobQueue(type);
87
88 JobTypeData& data(getJobTypeData(type));
89
90 if (data.waiting + data.running < getJobLimit(type))
91 {
93 }
94 else
95 {
96 // defer the task until we go below the limit
97 ++data.deferred;
98 }
99 ++data.waiting;
100 }
101 return true;
102}
103
104int
106{
107 std::lock_guard const lock(m_mutex);
108
109 JobDataMap::const_iterator const c = m_jobData.find(t);
110
111 return (c == m_jobData.end()) ? 0 : c->second.waiting;
112}
113
114int
116{
117 std::lock_guard const lock(m_mutex);
118
119 JobDataMap::const_iterator const c = m_jobData.find(t);
120
121 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
122}
123
124int
126{
127 // return the number of jobs at this priority level or greater
128 int ret = 0;
129
130 std::lock_guard const lock(m_mutex);
131
132 for (auto const& x : m_jobData)
133 {
134 if (x.first >= t)
135 ret += x.second.waiting;
136 }
137
138 return ret;
139}
140
143{
144 JobDataMap::iterator const iter(m_jobData.find(t));
145 XRPL_ASSERT(iter != m_jobData.end(), "xrpl::JobQueue::makeLoadEvent : valid job type input");
146
147 if (iter == m_jobData.end())
148 return {};
149
150 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
151}
152
153void
155{
156 if (isStopped())
157 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
158
159 JobDataMap::iterator const iter(m_jobData.find(t));
160 XRPL_ASSERT(iter != m_jobData.end(), "xrpl::JobQueue::addLoadEvents : valid job type input");
161 iter->second.load().addSamples(count, elapsed);
162}
163
164bool
166{
167 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
168 return entry.second.load().isOver();
169 });
170}
171
174{
175 using namespace std::chrono_literals;
177
178 ret["threads"] = m_workers.getNumberOfThreads();
179
180 Json::Value priorities = Json::arrayValue;
181
182 std::lock_guard const lock(m_mutex);
183
184 for (auto& x : m_jobData)
185 {
186 XRPL_ASSERT(x.first != jtINVALID, "xrpl::JobQueue::getJson : valid job type");
187
188 if (x.first == jtGENERIC)
189 continue;
190
191 JobTypeData& data(x.second);
192
193 LoadMonitor::Stats const stats(data.stats());
194
195 int const waiting(data.waiting);
196 int const running(data.running);
197
198 if ((stats.count != 0) || (waiting != 0) || (stats.latencyPeak != 0ms) || (running != 0))
199 {
200 Json::Value& pri = priorities.append(Json::objectValue);
201
202 pri["job_type"] = data.name();
203
204 if (stats.isOverloaded)
205 pri["over_target"] = true;
206
207 if (waiting != 0)
208 pri["waiting"] = waiting;
209
210 if (stats.count != 0)
211 pri["per_second"] = static_cast<int>(stats.count);
212
213 if (stats.latencyPeak != 0ms)
214 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
215
216 if (stats.latencyAvg != 0ms)
217 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
218
219 if (running != 0)
220 pri["in_progress"] = running;
221 }
222 }
223
224 ret["job_types"] = priorities;
225
226 return ret;
227}
228
229void
231{
233 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
234}
235
238{
239 JobDataMap::iterator const c(m_jobData.find(type));
240 XRPL_ASSERT(c != m_jobData.end(), "xrpl::JobQueue::getJobTypeData : valid job type input");
241
242 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
243 // and use something sane.
244 if (c == m_jobData.end())
245 return m_invalidJobData;
246
247 return c->second;
248}
249
250void
252{
253 stopping_ = true;
254 using namespace std::chrono_literals;
255 jobCounter_.join("JobQueue", 1s, m_journal);
256 {
257 // After the JobCounter is joined, all jobs have finished executing
258 // (i.e. returned from `Job::doJob`) and no more are being accepted,
259 // but there may still be some threads between the return of
260 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
261 // we must wait on the condition variable to make these assertions.
263 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
264 XRPL_ASSERT(m_processCount == 0, "xrpl::JobQueue::stop : all processes completed");
265 XRPL_ASSERT(m_jobSet.empty(), "xrpl::JobQueue::stop : all jobs completed");
266 XRPL_ASSERT(nSuspend_ == 0, "xrpl::JobQueue::stop : no coros suspended");
267 stopped_ = true;
268 }
269}
270
271bool
273{
274 return stopped_;
275}
276
277void
279{
280 XRPL_ASSERT(!m_jobSet.empty(), "xrpl::JobQueue::getNextJob : non-empty jobs");
281
283 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
284 {
285 JobType const type = iter->getType();
286 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::getNextJob : valid job type");
287
288 JobTypeData& data(getJobTypeData(type));
289 XRPL_ASSERT(
290 data.running <= getJobLimit(type), "xrpl::JobQueue::getNextJob : maximum jobs running");
291
292 // Run this job if we're running below the limit.
293 if (data.running < getJobLimit(data.type()))
294 {
295 XRPL_ASSERT(data.waiting > 0, "xrpl::JobQueue::getNextJob : positive data waiting");
296 --data.waiting;
297 ++data.running;
298 break;
299 }
300 }
301
302 XRPL_ASSERT(iter != m_jobSet.end(), "xrpl::JobQueue::getNextJob : found next job");
303 job = *iter;
304 m_jobSet.erase(iter);
305}
306
307void
309{
310 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::finishJob : valid input job type");
311
312 JobTypeData& data = getJobTypeData(type);
313
314 // Queue a deferred task if possible
315 if (data.deferred > 0)
316 {
317 XRPL_ASSERT(
318 data.running + data.waiting >= getJobLimit(type),
319 "xrpl::JobQueue::finishJob : job limit");
320
321 --data.deferred;
323 }
324
325 --data.running;
326}
327
328void
330{
331 JobType type = jtINVALID;
332
333 {
334 using namespace std::chrono;
335 Job::clock_type::time_point const start_time(Job::clock_type::now());
336 {
337 Job job;
338 {
339 std::lock_guard const lock(m_mutex);
340 getNextJob(job);
342 }
343 type = job.getType();
344 JobTypeData const& data(getJobTypeData(type));
345 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
346
347 // The amount of time that the job was in the queue
348 auto const q_time = ceil<microseconds>(start_time - job.queue_time());
349 perfLog_.jobStart(type, q_time, start_time, instance);
350
351 job.doJob();
352
353 // The amount of time it took to execute the job
354 auto const x_time = ceil<microseconds>(Job::clock_type::now() - start_time);
355
356 if (x_time >= 10ms || q_time >= 10ms)
357 {
358 getJobTypeData(type).dequeue.notify(q_time);
359 getJobTypeData(type).execute.notify(x_time);
360 }
361 perfLog_.jobFinish(type, x_time, instance);
362 }
363 }
364
365 {
366 std::lock_guard const lock(m_mutex);
367 // Job should be destroyed before stopping
368 // otherwise destructors with side effects can access
369 // parent objects that are already destroyed.
370 finishJob(type);
371 if (--m_processCount == 0 && m_jobSet.empty())
372 cv_.notify_all();
373 }
374
375 // Note that when Job::~Job is called, the last reference
376 // to the associated LoadEvent object (in the Job) may be destroyed.
377}
378
379int
381{
382 JobTypeInfo const& j(JobTypes::instance().get(type));
383 XRPL_ASSERT(j.type() != jtINVALID, "xrpl::JobQueue::getJobLimit : valid job type");
384
385 return j.limit();
386}
387
388} // namespace xrpl
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:40
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition Event.h:44
A reference to a handler for performing polled collection.
Definition Hook.h:12
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:125
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:329
beast::Journal m_journal
Definition JobQueue.h:225
std::mutex m_mutex
Definition JobQueue.h:226
JobCounter jobCounter_
Definition JobQueue.h:229
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:237
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:115
JobTypeData m_invalidJobData
Definition JobQueue.h:233
beast::insight::Gauge job_count
Definition JobQueue.h:246
bool isStopped() const
Definition JobQueue.cpp:272
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:230
Workers m_workers
Definition JobQueue.h:241
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:9
std::atomic_bool stopping_
Definition JobQueue.h:230
JobDataMap m_jobData
Definition JobQueue.h:232
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:105
std::set< Job > m_jobSet
Definition JobQueue.h:228
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:58
bool isOverloaded()
Definition JobQueue.cpp:165
std::atomic_bool stopped_
Definition JobQueue.h:231
std::condition_variable cv_
Definition JobQueue.h:249
static int getJobLimit(JobType type)
Definition JobQueue.cpp:380
std::uint64_t m_lastJob
Definition JobQueue.h:227
beast::insight::Hook hook
Definition JobQueue.h:247
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:154
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:142
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:245
perf::PerfLog & perfLog_
Definition JobQueue.h:244
void finishJob(JobType type)
Definition JobQueue.cpp:308
Json::Value getJson(int c=0)
Definition JobQueue.cpp:173
void getNextJob(Job &job)
Definition JobQueue.cpp:278
Holds all the 'static' information about a job, which does not change.
Definition JobTypeInfo.h:9
int limit() const
Definition JobTypeInfo.h:56
JobType type() const
Definition JobTypeInfo.h:44
static JobTypes const & instance()
Definition JobTypes.h:105
JobType getType() const
Definition Job.cpp:26
void doJob()
Definition Job.cpp:38
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition Job.cpp:32
Manages partitions for logging.
Definition Log.h:32
void addTask()
Add a task to be performed.
Definition Workers.cpp:106
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:31
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobQueue(JobType const type)=0
Log queued job.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T forward_as_tuple(T... args)
T is_same_v
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
JobType
Definition Job.h:14
@ jtINVALID
Definition Job.h:16
@ jtCLIENT_WEBSOCKET
Definition Job.h:30
@ jtCLIENT
Definition Job.h:24
@ jtGENERIC
Definition Job.h:66
T piecewise_construct
beast::insight::Event dequeue
Definition JobTypeData.h:31
beast::insight::Event execute
Definition JobTypeData.h:32
std::chrono::milliseconds latencyPeak
Definition LoadMonitor.h:41
std::chrono::milliseconds latencyAvg
Definition LoadMonitor.h:40