xrpld
Loading...
Searching...
No Matches
xrpl::JobQueue Class Reference

A pool of threads to perform work. More...

#include <JobQueue.h>

Inheritance diagram for xrpl::JobQueue:
Collaboration diagram for xrpl::JobQueue:

Classes

class  Coro
 Coroutines must run to completion. More...

Public Types

using JobFunction = std::function<void()>

Public Member Functions

 JobQueue (int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
 ~JobQueue () override
template<typename JobHandler>
bool addJob (JobType type, std::string const &name, JobHandler &&jobHandler)
 Adds a job to the JobQueue.
template<class F>
std::shared_ptr< CoropostCoro (JobType t, std::string const &name, F &&f)
 Creates a coroutine and adds a job to the queue which will run it.
int getJobCount (JobType t) const
 Jobs waiting at this priority.
int getJobCountTotal (JobType t) const
 Jobs waiting plus running at this priority.
int getJobCountGE (JobType t) const
 All waiting jobs at or greater than this priority.
std::unique_ptr< LoadEventmakeLoadEvent (JobType t, std::string const &name)
 Return a scoped LoadEvent.
void addLoadEvents (JobType t, int count, std::chrono::milliseconds elapsed)
 Add multiple load events.
bool isOverloaded ()
json::Value getJson (int c=0)
void rendezvous ()
 Block until no jobs running.
void stop ()
bool isStopping () const
bool isStopped () const

Private Types

using JobDataMap = std::map<JobType, JobTypeData>

Private Member Functions

void collect ()
JobTypeDatagetJobTypeData (JobType type)
bool addRefCountedJob (JobType type, std::string const &name, JobFunction const &func)
void getNextJob (Job &job)
void finishJob (JobType type)
void processTask (int instance) override
 Perform a task.

Static Private Member Functions

static int getJobLimit (JobType type)

Private Attributes

beast::Journal journal_
std::mutex mutex_
std::uint64_t lastJob_ {0}
std::set< JobjobSet_
JobCounter jobCounter_
std::atomic_bool stopping_ {false}
std::atomic_bool stopped_ {false}
JobDataMap jobData_
JobTypeData invalidJobData_
int processCount_ {0}
int nSuspend_ = 0
Workers workers_
perf::PerfLogperfLog_
beast::insight::Collector::ptr collector_
beast::insight::Gauge jobCount_
beast::insight::Hook hook_
std::condition_variable cv_

Friends

class Coro

Detailed Description

A pool of threads to perform work.

A job posted will always run to completion.

Coroutines that are suspended must be resumed, and run to completion.

When the JobQueue stops, it waits for all jobs and coroutines to finish.

Definition at line 42 of file JobQueue.h.

Member Typedef Documentation

◆ JobFunction

Definition at line 127 of file JobQueue.h.

◆ JobDataMap

Definition at line 226 of file JobQueue.h.

Constructor & Destructor Documentation

◆ JobQueue()

xrpl::JobQueue::JobQueue ( int threadCount,
beast::insight::Collector::ptr const & collector,
beast::Journal journal,
Logs & logs,
perf::PerfLog & perfLog )

Definition at line 24 of file JobQueue.cpp.

◆ ~JobQueue()

xrpl::JobQueue::~JobQueue ( )
override

Definition at line 59 of file JobQueue.cpp.

Member Function Documentation

◆ addJob()

template<typename JobHandler>
bool xrpl::JobQueue::addJob ( JobType type,
std::string const & name,
JobHandler && jobHandler )

Adds a job to the JobQueue.

Parameters
typeThe type of job.
nameName of the job.
jobHandlerLambda with signature void (Job&). Called when the job is executed.
Returns
true if jobHandler added to queue.

Definition at line 150 of file JobQueue.h.

◆ postCoro()

template<class F>
std::shared_ptr< JobQueue::Coro > xrpl::JobQueue::postCoro ( JobType t,
std::string const & name,
F && f )

Creates a coroutine and adds a job to the queue which will run it.

Parameters
tThe type of job.
nameName of the job.
fHas a signature of void(std::shared_ptr<Coro>). Called when the job executes.
Returns
shared_ptr to posted Coro. nullptr if post was not successful.

Definition at line 393 of file JobQueue.h.

◆ getJobCount()

int xrpl::JobQueue::getJobCount ( JobType t) const

Jobs waiting at this priority.

Definition at line 120 of file JobQueue.cpp.

◆ getJobCountTotal()

int xrpl::JobQueue::getJobCountTotal ( JobType t) const

Jobs waiting plus running at this priority.

Definition at line 130 of file JobQueue.cpp.

◆ getJobCountGE()

int xrpl::JobQueue::getJobCountGE ( JobType t) const

All waiting jobs at or greater than this priority.

Definition at line 140 of file JobQueue.cpp.

◆ makeLoadEvent()

std::unique_ptr< LoadEvent > xrpl::JobQueue::makeLoadEvent ( JobType t,
std::string const & name )

Return a scoped LoadEvent.

Definition at line 157 of file JobQueue.cpp.

◆ addLoadEvents()

void xrpl::JobQueue::addLoadEvents ( JobType t,
int count,
std::chrono::milliseconds elapsed )

Add multiple load events.

Definition at line 169 of file JobQueue.cpp.

◆ isOverloaded()

bool xrpl::JobQueue::isOverloaded ( )

Definition at line 180 of file JobQueue.cpp.

◆ getJson()

json::Value xrpl::JobQueue::getJson ( int c = 0)

Definition at line 186 of file JobQueue.cpp.

◆ rendezvous()

void xrpl::JobQueue::rendezvous ( )

Block until no jobs running.

Definition at line 243 of file JobQueue.cpp.

◆ stop()

void xrpl::JobQueue::stop ( )

Definition at line 264 of file JobQueue.cpp.

◆ isStopping()

bool xrpl::JobQueue::isStopping ( ) const

Definition at line 213 of file JobQueue.h.

◆ isStopped()

bool xrpl::JobQueue::isStopped ( ) const

Definition at line 285 of file JobQueue.cpp.

◆ collect()

void xrpl::JobQueue::collect ( )
private

Definition at line 66 of file JobQueue.cpp.

◆ getJobTypeData()

JobTypeData & xrpl::JobQueue::getJobTypeData ( JobType type)
private

Definition at line 250 of file JobQueue.cpp.

◆ addRefCountedJob()

bool xrpl::JobQueue::addRefCountedJob ( JobType type,
std::string const & name,
JobFunction const & func )
private

Definition at line 73 of file JobQueue.cpp.

◆ getNextJob()

void xrpl::JobQueue::getNextJob ( Job & job)
private

Definition at line 291 of file JobQueue.cpp.

◆ finishJob()

void xrpl::JobQueue::finishJob ( JobType type)
private

Definition at line 321 of file JobQueue.cpp.

◆ processTask()

void xrpl::JobQueue::processTask ( int instance)
overrideprivatevirtual

Perform a task.

The call is made on a thread owned by Workers. It is important that you only process one task from inside your callback. Each call to addTask will result in exactly one call to processTask.

Parameters
instanceThe worker thread instance.
See also
Workers::addTask

Implements xrpl::Workers::Callback.

Definition at line 342 of file JobQueue.cpp.

◆ getJobLimit()

int xrpl::JobQueue::getJobLimit ( JobType type)
staticprivate

Definition at line 393 of file JobQueue.cpp.

◆ Coro

friend class Coro
friend

Definition at line 224 of file JobQueue.h.

Member Data Documentation

◆ journal_

beast::Journal xrpl::JobQueue::journal_
private

Definition at line 228 of file JobQueue.h.

◆ mutex_

std::mutex xrpl::JobQueue::mutex_
mutableprivate

Definition at line 229 of file JobQueue.h.

◆ lastJob_

std::uint64_t xrpl::JobQueue::lastJob_ {0}
private

Definition at line 230 of file JobQueue.h.

◆ jobSet_

std::set<Job> xrpl::JobQueue::jobSet_
private

Definition at line 231 of file JobQueue.h.

◆ jobCounter_

JobCounter xrpl::JobQueue::jobCounter_
private

Definition at line 232 of file JobQueue.h.

◆ stopping_

std::atomic_bool xrpl::JobQueue::stopping_ {false}
private

Definition at line 233 of file JobQueue.h.

◆ stopped_

std::atomic_bool xrpl::JobQueue::stopped_ {false}
private

Definition at line 234 of file JobQueue.h.

◆ jobData_

JobDataMap xrpl::JobQueue::jobData_
private

Definition at line 235 of file JobQueue.h.

◆ invalidJobData_

JobTypeData xrpl::JobQueue::invalidJobData_
private

Definition at line 236 of file JobQueue.h.

◆ processCount_

int xrpl::JobQueue::processCount_ {0}
private

Definition at line 239 of file JobQueue.h.

◆ nSuspend_

int xrpl::JobQueue::nSuspend_ = 0
private

Definition at line 242 of file JobQueue.h.

◆ workers_

Workers xrpl::JobQueue::workers_
private

Definition at line 244 of file JobQueue.h.

◆ perfLog_

perf::PerfLog& xrpl::JobQueue::perfLog_
private

Definition at line 247 of file JobQueue.h.

◆ collector_

beast::insight::Collector::ptr xrpl::JobQueue::collector_
private

Definition at line 248 of file JobQueue.h.

◆ jobCount_

beast::insight::Gauge xrpl::JobQueue::jobCount_
private

Definition at line 249 of file JobQueue.h.

◆ hook_

beast::insight::Hook xrpl::JobQueue::hook_
private

Definition at line 250 of file JobQueue.h.

◆ cv_

std::condition_variable xrpl::JobQueue::cv_
private

Definition at line 252 of file JobQueue.h.