1#include <xrpl/core/JobQueue.h>
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/contract.h>
5#include <xrpl/beast/insight/Collector.h>
6#include <xrpl/beast/utility/instrumentation.h>
7#include <xrpl/core/Job.h>
8#include <xrpl/core/JobTypeInfo.h>
9#include <xrpl/core/LoadEvent.h>
10#include <xrpl/core/PerfLog.h>
11#include <xrpl/json/json_value.h>
32 ,
workers_(*this, &perfLog,
"JobQueue", threadCount)
36 JLOG(
journal_.info()) <<
"Using " << threadCount <<
" threads";
53 XRPL_ASSERT(result.second ==
true,
"xrpl::JobQueue::JobQueue : jobs added");
75 XRPL_ASSERT(type !=
JtInvalid,
"xrpl::JobQueue::addRefCountedJob : valid input job type");
79 iter !=
jobData_.end(),
"xrpl::JobQueue::addRefCountedJob : job type found in jobs");
83 JLOG(
journal_.debug()) << __func__ <<
" : Adding job : " << name <<
" : " << type;
90 "xrpl::JobQueue::addRefCountedJob : threads available or job "
91 "requires no threads");
95 auto result =
jobSet_.emplace(type, name, ++
lastJob_, data.load(), func);
96 auto const& job = *result.first;
98 JobType const type(job.getType());
99 XRPL_ASSERT(type !=
JtInvalid,
"xrpl::JobQueue::addRefCountedJob : has valid job type");
100 XRPL_ASSERT(
jobSet_.contains(job),
"xrpl::JobQueue::addRefCountedJob : job found");
105 if (data.waiting + data.running <
getJobLimit(type))
124 JobDataMap::const_iterator
const c =
jobData_.find(t);
126 return (c ==
jobData_.end()) ? 0 : c->second.waiting;
134 JobDataMap::const_iterator
const c =
jobData_.find(t);
136 return (c ==
jobData_.end()) ? 0 : (c->second.waiting + c->second.running);
150 ret += x.second.waiting;
159 JobDataMap::iterator
const iter(
jobData_.find(t));
160 XRPL_ASSERT(iter !=
jobData_.end(),
"xrpl::JobQueue::makeLoadEvent : valid job type input");
172 logicError(
"JobQueue::addLoadEvents() called after JobQueue stopped");
174 JobDataMap::iterator
const iter(
jobData_.find(t));
175 XRPL_ASSERT(iter !=
jobData_.end(),
"xrpl::JobQueue::addLoadEvents : valid job type input");
176 iter->second.load().addSamples(count, elapsed);
188 using namespace std::chrono_literals;
191 ret[
"threads"] =
workers_.getNumberOfThreads();
199 XRPL_ASSERT(x.first !=
JtInvalid,
"xrpl::JobQueue::getJson : valid job type");
208 int const waiting(data.waiting);
209 int const running(data.running);
211 if ((stats.
count != 0) || (waiting != 0) || (stats.
latencyPeak != 0ms) || (running != 0))
215 pri[
"job_type"] = data.name();
218 pri[
"over_target"] =
true;
221 pri[
"waiting"] = waiting;
223 if (stats.
count != 0)
224 pri[
"per_second"] =
static_cast<int>(stats.
count);
233 pri[
"in_progress"] = running;
237 ret[
"job_types"] = priorities;
252 JobDataMap::iterator
const c(
jobData_.find(type));
253 XRPL_ASSERT(c !=
jobData_.end(),
"xrpl::JobQueue::getJobTypeData : valid job type input");
267 using namespace std::chrono_literals;
277 XRPL_ASSERT(
processCount_ == 0,
"xrpl::JobQueue::stop : all processes completed");
278 XRPL_ASSERT(
jobSet_.empty(),
"xrpl::JobQueue::stop : all jobs completed");
279 XRPL_ASSERT(
nSuspend_ == 0,
"xrpl::JobQueue::stop : no coros suspended");
293 XRPL_ASSERT(!
jobSet_.empty(),
"xrpl::JobQueue::getNextJob : non-empty jobs");
295 std::set<Job>::const_iterator iter;
298 JobType const type = iter->getType();
299 XRPL_ASSERT(type !=
JtInvalid,
"xrpl::JobQueue::getNextJob : valid job type");
303 data.running <=
getJobLimit(type),
"xrpl::JobQueue::getNextJob : maximum jobs running");
308 XRPL_ASSERT(data.waiting > 0,
"xrpl::JobQueue::getNextJob : positive data waiting");
315 XRPL_ASSERT(iter !=
jobSet_.end(),
"xrpl::JobQueue::getNextJob : found next job");
323 XRPL_ASSERT(type !=
JtInvalid,
"xrpl::JobQueue::finishJob : valid input job type");
328 if (data.deferred > 0)
332 "xrpl::JobQueue::finishJob : job limit");
358 JLOG(
journal_.trace()) <<
"Doing " <<
data.name() <<
"job";
362 perfLog_.jobStart(type, qTime, startTime, instance);
369 if (xTime >= 10ms || qTime >= 10ms)
374 perfLog_.jobFinish(type, xTime, instance);
396 XRPL_ASSERT(j.
type() !=
JtInvalid,
"xrpl::JobQueue::getJobLimit : valid job type");
A generic endpoint for log messages.
std::shared_ptr< Collector > ptr
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
A reference to a handler for performing polled collection.
Value & append(Value const &value)
Append value to array at the end.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
json::Value getJson(int c=0)
std::function< void()> JobFunction
void processTask(int instance) override
Perform a task.
JobTypeData & getJobTypeData(JobType type)
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
void rendezvous()
Block until no jobs running.
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
std::atomic_bool stopping_
int getJobCount(JobType t) const
Jobs waiting at this priority.
beast::insight::Collector::ptr collector_
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
std::atomic_bool stopped_
beast::insight::Gauge jobCount_
std::condition_variable cv_
static int getJobLimit(JobType type)
JobTypeData invalidJobData_
beast::insight::Hook hook_
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
void finishJob(JobType type)
void getNextJob(Job &job)
Holds all the 'static' information about a job, which does not change.
static JobTypes const & instance()
clock_type::time_point const & queueTime() const
Returns the time when the job was queued.
Manages partitions for logging.
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
T forward_as_tuple(T... args)
@ Array
array value (ordered list)
@ Object
object value (collection of name/value pairs).
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
void logicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::insight::Event dequeue
beast::insight::Event execute
std::chrono::milliseconds latencyPeak
std::chrono::milliseconds latencyAvg