xrpld
Loading...
Searching...
No Matches
xrpl::Workers Class Reference

Workers is effectively a thread pool. More...

#include <Workers.h>

Collaboration diagram for xrpl::Workers:

Classes

struct  Callback
 Called to perform tasks as needed. More...
struct  PausedTag
class  Worker

Public Member Functions

 Workers (Callback &callback, perf::PerfLog *perfLog, std::string threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
 Create the object.
 ~Workers ()
int getNumberOfThreads () const noexcept
 Retrieve the desired number of threads.
void setNumberOfThreads (int numberOfThreads)
 Set the desired number of threads.
void stop ()
 Pause all threads and wait until they are paused.
void addTask ()
 Add a task to be performed.
int numberOfCurrentlyRunningTasks () const noexcept
 Get the number of currently executing calls of Callback::processTask.

Static Private Member Functions

static void deleteWorkers (beast::LockFreeStack< Worker > &stack)

Private Attributes

Callbackcallback_
perf::PerfLogperfLog_
std::string threadNames_
std::condition_variable cv_
std::mutex mut_
bool allPaused_ {true}
semaphore semaphore_
int numberOfThreads_ {0}
std::atomic< int > activeCount_
std::atomic< int > pauseCount_
std::atomic< int > runningTaskCount_
beast::LockFreeStack< Workereveryone_
beast::LockFreeStack< Worker, PausedTagpaused_

Detailed Description

Workers is effectively a thread pool.

The constructor takes a "callback" that has a void processTask(int instance) method, and a number of workers. It creates that many Workers and then waits for calls to Workers::addTask(). It holds a semaphore that counts the number of pending "tasks", and a condition variable for the event when the last worker pauses itself.

A "task" is just a call to the callback's processTask method. "Adding a task" means calling that method now, or remembering to call it in the future. This is implemented with a semaphore. If there are any workers waiting when a task is added, then one will be woken to claim the task. If not, then the next worker to wait on the semaphore will claim the task.

Creating a Worker creates a thread that calls Worker::run(). When that thread enters Worker::run, it increments the count of active workers in the parent Workers object and then tries to claim a task, which blocks if there are none pending. It will be unblocked whenever the semaphore is notified (i.e. when the number of pending tasks is incremented). That only happens in two circumstances: (1) when Workers::addTask is called and (2) when Workers wants to pause some workers ("pause one worker" is considered one task), which happens when someone wants to stop the workers or shrink the threadpool. No worker threads are ever destroyed until Workers is destroyed; it merely pauses workers until then.

When a waiting worker is woken, it checks whether Workers is trying to pause workers. If so, it changes its status from active to paused and blocks on its own condition variable. If not, then it calls processTask on the "callback" held by Workers.

When a paused worker is woken, it checks whether it should exit. The signal to exit is only set in the destructor of Worker, which unblocks the paused thread and waits for it to exit. A Worker::run thread checks whether it needs to exit only when it is woken from a pause (not when it is woken from waiting). This is why the destructor for Workers pauses all the workers before destroying them.

Definition at line 60 of file Workers.h.

Constructor & Destructor Documentation

◆ Workers()

xrpl::Workers::Workers ( Callback & callback,
perf::PerfLog * perfLog,
std::string threadNames = "Worker",
int numberOfThreads = static_cast<int>(std::thread::hardware_concurrency()) )
explicit

Create the object.

A number of initial threads may be optionally specified. The default is to create one thread per CPU.

Parameters
threadNamesThe name given to each created worker thread.

Definition at line 13 of file Workers.cpp.

◆ ~Workers()

xrpl::Workers::~Workers ( )

Definition at line 29 of file Workers.cpp.

Member Function Documentation

◆ getNumberOfThreads()

int xrpl::Workers::getNumberOfThreads ( ) const
nodiscardnoexcept

Retrieve the desired number of threads.

This just returns the number of active threads that were requested. If there was a recent call to setNumberOfThreads, the actual number of active threads may be temporarily different from what was last requested.

Note
This function is not thread-safe.

Definition at line 37 of file Workers.cpp.

◆ setNumberOfThreads()

void xrpl::Workers::setNumberOfThreads ( int numberOfThreads)

Set the desired number of threads.

Note
This function is not thread-safe.

Definition at line 47 of file Workers.cpp.

◆ stop()

void xrpl::Workers::stop ( )

Pause all threads and wait until they are paused.

If a thread is processing a task it will pause as soon as the task completes. There may still be tasks signaled even after all threads have paused.

Note
This function is not thread-safe.

Definition at line 98 of file Workers.cpp.

◆ addTask()

void xrpl::Workers::addTask ( )

Add a task to be performed.

Every call to addTask will eventually result in a call to Callback::processTask unless the Workers object is destroyed or the number of threads is never set above zero.

Note
This function is thread-safe.

Definition at line 112 of file Workers.cpp.

◆ numberOfCurrentlyRunningTasks()

int xrpl::Workers::numberOfCurrentlyRunningTasks ( ) const
nodiscardnoexcept

Get the number of currently executing calls of Callback::processTask.

While this function is thread-safe, the value may not stay accurate for very long. It's mainly for diagnostic purposes.

Definition at line 118 of file Workers.cpp.

◆ deleteWorkers()

void xrpl::Workers::deleteWorkers ( beast::LockFreeStack< Worker > & stack)
staticprivate

Definition at line 124 of file Workers.cpp.

Member Data Documentation

◆ callback_

Callback& xrpl::Workers::callback_
private

Definition at line 195 of file Workers.h.

◆ perfLog_

perf::PerfLog* xrpl::Workers::perfLog_
private

Definition at line 196 of file Workers.h.

◆ threadNames_

std::string xrpl::Workers::threadNames_
private

Definition at line 197 of file Workers.h.

◆ cv_

std::condition_variable xrpl::Workers::cv_
private

Definition at line 198 of file Workers.h.

◆ mut_

std::mutex xrpl::Workers::mut_
private

Definition at line 199 of file Workers.h.

◆ allPaused_

bool xrpl::Workers::allPaused_ {true}
private

Definition at line 200 of file Workers.h.

◆ semaphore_

semaphore xrpl::Workers::semaphore_
private

Definition at line 201 of file Workers.h.

◆ numberOfThreads_

int xrpl::Workers::numberOfThreads_ {0}
private

Definition at line 202 of file Workers.h.

◆ activeCount_

std::atomic<int> xrpl::Workers::activeCount_
private

Definition at line 203 of file Workers.h.

◆ pauseCount_

std::atomic<int> xrpl::Workers::pauseCount_
private

Definition at line 204 of file Workers.h.

◆ runningTaskCount_

std::atomic<int> xrpl::Workers::runningTaskCount_
private

Definition at line 205 of file Workers.h.

◆ everyone_

beast::LockFreeStack<Worker> xrpl::Workers::everyone_
private

Definition at line 206 of file Workers.h.

◆ paused_

beast::LockFreeStack<Worker, PausedTag> xrpl::Workers::paused_
private

Definition at line 207 of file Workers.h.