rippled
Loading...
Searching...
No Matches
Classes | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
ripple::JobQueue Class Reference

A pool of threads to perform work. More...

#include <JobQueue.h>

Inheritance diagram for ripple::JobQueue:
Inheritance graph
[legend]
Collaboration diagram for ripple::JobQueue:
Collaboration graph
[legend]

Classes

class  Coro
 Coroutines must run to completion. More...
 

Public Types

using JobFunction = std::function< void()>
 

Public Member Functions

 JobQueue (int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
 
 ~JobQueue ()
 
template<typename JobHandler >
bool addJob (JobType type, std::string const &name, JobHandler &&jobHandler)
 Adds a job to the JobQueue.
 
template<class F >
std::shared_ptr< CoropostCoro (JobType t, std::string const &name, F &&f)
 Creates a coroutine and adds a job to the queue which will run it.
 
int getJobCount (JobType t) const
 Jobs waiting at this priority.
 
int getJobCountTotal (JobType t) const
 Jobs waiting plus running at this priority.
 
int getJobCountGE (JobType t) const
 All waiting jobs at or greater than this priority.
 
std::unique_ptr< LoadEventmakeLoadEvent (JobType t, std::string const &name)
 Return a scoped LoadEvent.
 
void addLoadEvents (JobType t, int count, std::chrono::milliseconds elapsed)
 Add multiple load events.
 
bool isOverloaded ()
 
Json::Value getJson (int c=0)
 
void rendezvous ()
 Block until no jobs running.
 
void stop ()
 
bool isStopping () const
 
bool isStopped () const
 

Private Types

using JobDataMap = std::map< JobType, JobTypeData >
 

Private Member Functions

void collect ()
 
JobTypeDatagetJobTypeData (JobType type)
 
bool addRefCountedJob (JobType type, std::string const &name, JobFunction const &func)
 
void getNextJob (Job &job)
 
void finishJob (JobType type)
 
void processTask (int instance) override
 Perform a task.
 
int getJobLimit (JobType type)
 

Private Attributes

beast::Journal m_journal
 
std::mutex m_mutex
 
std::uint64_t m_lastJob
 
std::set< Jobm_jobSet
 
JobCounter jobCounter_
 
std::atomic_bool stopping_ {false}
 
std::atomic_bool stopped_ {false}
 
JobDataMap m_jobData
 
JobTypeData m_invalidJobData
 
int m_processCount
 
int nSuspend_ = 0
 
Workers m_workers
 
perf::PerfLogperfLog_
 
beast::insight::Collector::ptr m_collector
 
beast::insight::Gauge job_count
 
beast::insight::Hook hook
 
std::condition_variable cv_
 

Friends

class Coro
 

Detailed Description

A pool of threads to perform work.

A job posted will always run to completion.

Coroutines that are suspended must be resumed, and run to completion.

When the JobQueue stops, it waits for all jobs and coroutines to finish.

Definition at line 57 of file JobQueue.h.

Member Typedef Documentation

◆ JobFunction

Definition at line 143 of file JobQueue.h.

◆ JobDataMap

Definition at line 245 of file JobQueue.h.

Constructor & Destructor Documentation

◆ JobQueue()

ripple::JobQueue::JobQueue ( int  threadCount,
beast::insight::Collector::ptr const &  collector,
beast::Journal  journal,
Logs logs,
perf::PerfLog perfLog 
)

Definition at line 29 of file JobQueue.cpp.

◆ ~JobQueue()

ripple::JobQueue::~JobQueue ( )

Definition at line 68 of file JobQueue.cpp.

Member Function Documentation

◆ addJob()

template<typename JobHandler >
bool ripple::JobQueue::addJob ( JobType  type,
std::string const &  name,
JobHandler &&  jobHandler 
)

Adds a job to the JobQueue.

Parameters
typeThe type of job.
nameName of the job.
jobHandlerLambda with signature void (Job&). Called when the job is executed.
Returns
true if jobHandler added to queue.

Definition at line 168 of file JobQueue.h.

◆ postCoro()

template<class F >
std::shared_ptr< JobQueue::Coro > ripple::JobQueue::postCoro ( JobType  t,
std::string const &  name,
F &&  f 
)

Creates a coroutine and adds a job to the queue which will run it.

Parameters
tThe type of job.
nameName of the job.
fHas a signature of void(std::shared_ptr<Coro>). Called when the job executes.
Returns
shared_ptr to posted Coro. nullptr if post was not successful.

Definition at line 413 of file JobQueue.h.

◆ getJobCount()

int ripple::JobQueue::getJobCount ( JobType  t) const

Jobs waiting at this priority.

Definition at line 142 of file JobQueue.cpp.

◆ getJobCountTotal()

int ripple::JobQueue::getJobCountTotal ( JobType  t) const

Jobs waiting plus running at this priority.

Definition at line 152 of file JobQueue.cpp.

◆ getJobCountGE()

int ripple::JobQueue::getJobCountGE ( JobType  t) const

All waiting jobs at or greater than this priority.

Definition at line 162 of file JobQueue.cpp.

◆ makeLoadEvent()

std::unique_ptr< LoadEvent > ripple::JobQueue::makeLoadEvent ( JobType  t,
std::string const &  name 
)

Return a scoped LoadEvent.

Definition at line 179 of file JobQueue.cpp.

◆ addLoadEvents()

void ripple::JobQueue::addLoadEvents ( JobType  t,
int  count,
std::chrono::milliseconds  elapsed 
)

Add multiple load events.

Definition at line 193 of file JobQueue.cpp.

◆ isOverloaded()

bool ripple::JobQueue::isOverloaded ( )

Definition at line 206 of file JobQueue.cpp.

◆ getJson()

Json::Value ripple::JobQueue::getJson ( int  c = 0)

Definition at line 214 of file JobQueue.cpp.

◆ rendezvous()

void ripple::JobQueue::rendezvous ( )

Block until no jobs running.

Definition at line 273 of file JobQueue.cpp.

◆ stop()

void ripple::JobQueue::stop ( )

Definition at line 296 of file JobQueue.cpp.

◆ isStopping()

bool ripple::JobQueue::isStopping ( ) const

Definition at line 232 of file JobQueue.h.

◆ isStopped()

bool ripple::JobQueue::isStopped ( ) const

Definition at line 322 of file JobQueue.cpp.

◆ collect()

void ripple::JobQueue::collect ( )
private

Definition at line 75 of file JobQueue.cpp.

◆ getJobTypeData()

JobTypeData & ripple::JobQueue::getJobTypeData ( JobType  type)
private

Definition at line 280 of file JobQueue.cpp.

◆ addRefCountedJob()

bool ripple::JobQueue::addRefCountedJob ( JobType  type,
std::string const &  name,
JobFunction const &  func 
)
private

Definition at line 82 of file JobQueue.cpp.

◆ getNextJob()

void ripple::JobQueue::getNextJob ( Job job)
private

Definition at line 328 of file JobQueue.cpp.

◆ finishJob()

void ripple::JobQueue::finishJob ( JobType  type)
private

Definition at line 365 of file JobQueue.cpp.

◆ processTask()

void ripple::JobQueue::processTask ( int  instance)
overrideprivatevirtual

Perform a task.

The call is made on a thread owned by Workers. It is important that you only process one task from inside your callback. Each call to addTask will result in exactly one call to processTask.

Parameters
instanceThe worker thread instance.
See also
Workers::addTask

Implements ripple::Workers::Callback.

Definition at line 388 of file JobQueue.cpp.

◆ getJobLimit()

int ripple::JobQueue::getJobLimit ( JobType  type)
private

Definition at line 441 of file JobQueue.cpp.

Friends And Related Symbol Documentation

◆ Coro

friend class Coro
friend

Definition at line 243 of file JobQueue.h.

Member Data Documentation

◆ m_journal

beast::Journal ripple::JobQueue::m_journal
private

Definition at line 247 of file JobQueue.h.

◆ m_mutex

std::mutex ripple::JobQueue::m_mutex
mutableprivate

Definition at line 248 of file JobQueue.h.

◆ m_lastJob

std::uint64_t ripple::JobQueue::m_lastJob
private

Definition at line 249 of file JobQueue.h.

◆ m_jobSet

std::set<Job> ripple::JobQueue::m_jobSet
private

Definition at line 250 of file JobQueue.h.

◆ jobCounter_

JobCounter ripple::JobQueue::jobCounter_
private

Definition at line 251 of file JobQueue.h.

◆ stopping_

std::atomic_bool ripple::JobQueue::stopping_ {false}
private

Definition at line 252 of file JobQueue.h.

◆ stopped_

std::atomic_bool ripple::JobQueue::stopped_ {false}
private

Definition at line 253 of file JobQueue.h.

◆ m_jobData

JobDataMap ripple::JobQueue::m_jobData
private

Definition at line 254 of file JobQueue.h.

◆ m_invalidJobData

JobTypeData ripple::JobQueue::m_invalidJobData
private

Definition at line 255 of file JobQueue.h.

◆ m_processCount

int ripple::JobQueue::m_processCount
private

Definition at line 258 of file JobQueue.h.

◆ nSuspend_

int ripple::JobQueue::nSuspend_ = 0
private

Definition at line 261 of file JobQueue.h.

◆ m_workers

Workers ripple::JobQueue::m_workers
private

Definition at line 263 of file JobQueue.h.

◆ perfLog_

perf::PerfLog& ripple::JobQueue::perfLog_
private

Definition at line 266 of file JobQueue.h.

◆ m_collector

beast::insight::Collector::ptr ripple::JobQueue::m_collector
private

Definition at line 267 of file JobQueue.h.

◆ job_count

beast::insight::Gauge ripple::JobQueue::job_count
private

Definition at line 268 of file JobQueue.h.

◆ hook

beast::insight::Hook ripple::JobQueue::hook
private

Definition at line 269 of file JobQueue.h.

◆ cv_

std::condition_variable ripple::JobQueue::cv_
private

Definition at line 271 of file JobQueue.h.