1#include <xrpl/basics/contract.h>
2#include <xrpl/core/JobQueue.h>
3#include <xrpl/core/PerfLog.h>
17 , m_invalidJobData(
JobTypes::instance().getInvalid(), collector, logs)
19 , m_workers(*this, &perfLog,
"JobQueue", threadCount)
21 , m_collector(collector)
23 JLOG(
m_journal.
info()) <<
"Using " << threadCount <<
" threads";
41 result.second ==
true,
"xrpl::JobQueue::JobQueue : jobs added");
68 "xrpl::JobQueue::addRefCountedJob : valid input job type");
73 "xrpl::JobQueue::addRefCountedJob : job type found in jobs");
78 << __func__ <<
" : Adding job : " << name <<
" : " << type;
86 "xrpl::JobQueue::addRefCountedJob : threads available or job "
87 "requires no threads");
93 auto const& job = *result.first;
95 JobType const type(job.getType());
98 "xrpl::JobQueue::addRefCountedJob : has valid job type");
101 "xrpl::JobQueue::addRefCountedJob : job found");
106 if (data.waiting + data.running <
getJobLimit(type))
137 return (c ==
m_jobData.
end()) ? 0 : (c->second.waiting + c->second.running);
151 ret += x.second.waiting;
163 "xrpl::JobQueue::makeLoadEvent : valid job type input");
175 LogicError(
"JobQueue::addLoadEvents() called after JobQueue stopped");
180 "xrpl::JobQueue::addLoadEvents : valid job type input");
181 iter->second.load().addSamples(count, elapsed);
188 return entry.second.load().isOver();
195 using namespace std::chrono_literals;
207 x.first !=
jtINVALID,
"xrpl::JobQueue::getJson : valid job type");
216 int waiting(data.waiting);
217 int running(data.running);
219 if ((stats.
count != 0) || (waiting != 0) ||
224 pri[
"job_type"] = data.name();
227 pri[
"over_target"] =
true;
230 pri[
"waiting"] = waiting;
232 if (stats.
count != 0)
233 pri[
"per_second"] =
static_cast<int>(stats.
count);
242 pri[
"in_progress"] = running;
246 ret[
"job_types"] = priorities;
264 "xrpl::JobQueue::getJobTypeData : valid job type input");
278 using namespace std::chrono_literals;
291 "xrpl::JobQueue::stop : all processes completed");
293 m_jobSet.empty(),
"xrpl::JobQueue::stop : all jobs completed");
295 nSuspend_ == 0,
"xrpl::JobQueue::stop : no coros suspended");
310 !
m_jobSet.empty(),
"xrpl::JobQueue::getNextJob : non-empty jobs");
315 JobType const type = iter->getType();
317 type !=
jtINVALID,
"xrpl::JobQueue::getNextJob : valid job type");
322 "xrpl::JobQueue::getNextJob : maximum jobs running");
329 "xrpl::JobQueue::getNextJob : positive data waiting");
337 iter !=
m_jobSet.
end(),
"xrpl::JobQueue::getNextJob : found next job");
346 type !=
jtINVALID,
"xrpl::JobQueue::finishJob : valid input job type");
351 if (data.deferred > 0)
355 "xrpl::JobQueue::finishJob : job limit");
385 ceil<microseconds>(start_time - job.
queue_time());
394 if (x_time >= 10ms || q_time >= 10ms)
422 j.
type() !=
jtINVALID,
"xrpl::JobQueue::getJobLimit : valid job type");
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
A reference to a handler for performing polled collection.
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
void processTask(int instance) override
Perform a task.
JobTypeData & getJobTypeData(JobType type)
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
JobTypeData m_invalidJobData
beast::insight::Gauge job_count
void rendezvous()
Block until no jobs running.
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
std::atomic_bool stopping_
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
std::atomic_bool stopped_
std::condition_variable cv_
int getJobLimit(JobType type)
beast::insight::Hook hook
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
beast::insight::Collector::ptr m_collector
void finishJob(JobType type)
Json::Value getJson(int c=0)
void getNextJob(Job &job)
Holds all the 'static' information about a job, which does not change.
static JobTypes const & instance()
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Manages partitions for logging.
void addTask()
Add a task to be performed.
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobQueue(JobType const type)=0
Log queued job.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
T forward_as_tuple(T... args)
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
beast::insight::Event dequeue
beast::insight::Event execute
std::chrono::milliseconds latencyPeak
std::chrono::milliseconds latencyAvg