1#ifndef XRPL_CORE_JOBQUEUE_H_INCLUDED
2#define XRPL_CORE_JOBQUEUE_H_INCLUDED
4#include <xrpl/basics/LocalValue.h>
5#include <xrpl/core/ClosureCounter.h>
6#include <xrpl/core/JobTypeData.h>
7#include <xrpl/core/JobTypes.h>
8#include <xrpl/core/detail/Workers.h>
9#include <xrpl/json/json_value.h>
11#include <boost/coroutine/all.hpp>
52 boost::coroutines::asymmetric_coroutine<void>::pull_type
coro_;
53 boost::coroutines::asymmetric_coroutine<void>::push_type*
yield_;
150 if (
auto optionalCountedJob =
387#include <xrpl/core/Coro.ipp>
405 coro->expectEarlyExit();
A generic endpoint for log messages.
A metric for measuring an integral value.
A reference to a handler for performing polled collection.
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
Coroutines must run to completion.
Coro(Coro const &)=delete
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
bool post()
Schedule coroutine execution.
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
void yield() const
Suspend coroutine execution.
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
std::condition_variable cv_
boost::coroutines::asymmetric_coroutine< void >::push_type * yield_
boost::coroutines::asymmetric_coroutine< void >::pull_type coro_
void join()
Waits until coroutine returns from the user function.
Coro & operator=(Coro const &)=delete
void resume()
Resume coroutine execution.
A pool of threads to perform work.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
void processTask(int instance) override
Perform a task.
JobTypeData & getJobTypeData(JobType type)
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
JobTypeData m_invalidJobData
beast::insight::Gauge job_count
void rendezvous()
Block until no jobs running.
std::atomic_bool stopping_
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
std::atomic_bool stopped_
std::condition_variable cv_
int getJobLimit(JobType type)
beast::insight::Hook hook
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
beast::insight::Collector::ptr m_collector
void finishJob(JobType type)
Json::Value getJson(int c=0)
void getNextJob(Job &job)
Manages partitions for logging.
Workers is effectively a thread pool.
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Called to perform tasks as needed.