xrpld
Loading...
Searching...
No Matches
Workers.h
1#pragma once
2
3#include <xrpl/beast/core/LockFreeStack.h>
4#include <xrpl/core/detail/semaphore.h>
5
6#include <atomic>
7#include <condition_variable>
8#include <mutex>
9#include <string>
10#include <thread>
11
12namespace xrpl {
13
14namespace perf {
15class PerfLog;
16} // namespace perf
17
61{
62public:
64 struct Callback
65 {
66 virtual ~Callback() = default;
67 Callback() = default;
68 Callback(Callback const&) = delete;
70 operator=(Callback const&) = delete;
71
82 virtual void
83 processTask(int instance) = 0;
84 };
85
93 explicit Workers(
94 Callback& callback,
95 perf::PerfLog* perfLog,
96 std::string threadNames = "Worker",
97 int numberOfThreads = static_cast<int>(std::thread::hardware_concurrency()));
98
99 ~Workers();
100
109 [[nodiscard]] int
110 getNumberOfThreads() const noexcept;
111
115 void
116 setNumberOfThreads(int numberOfThreads);
117
126 void
127 stop();
128
137 void
138 addTask();
139
144 [[nodiscard]] int
145 numberOfCurrentlyRunningTasks() const noexcept;
146
147 //--------------------------------------------------------------------------
148
149private:
151 {
152 explicit PausedTag() = default;
153 };
154
155 /* A Worker executes tasks on its provided thread.
156
157 These are the states:
158
159 Active: Running the task processing loop.
160 Idle: Active, but blocked on waiting for a task.
161 Paused: Blocked waiting to exit or become active.
162 */
163 class Worker : public beast::LockFreeStack<Worker>::Node,
164 public beast::LockFreeStack<Worker, PausedTag>::Node
165 {
166 public:
167 Worker(Workers& workers, std::string threadName, int const instance);
168
169 ~Worker();
170
171 void
172 notify();
173
174 private:
175 void
176 run();
177
178 private:
181 int const instance_;
182
186 int wakeCount_{0}; // how many times to un-pause
187 bool shouldExit_{false};
188 };
189
190private:
191 static void
193
194private:
197 std::string threadNames_; // The name to give each thread
198 std::condition_variable cv_; // signaled when all threads paused
200 bool allPaused_{true};
201 semaphore semaphore_; // each pending task is 1 resource
202 int numberOfThreads_{0}; // how many we want active now
203 std::atomic<int> activeCount_; // to know when all are paused
204 std::atomic<int> pauseCount_; // how many threads need to pause now
205 std::atomic<int> runningTaskCount_; // how many calls to processTask() active
206 beast::LockFreeStack<Worker> everyone_; // holds all created workers
208};
209
210} // namespace xrpl
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
std::string const threadName_
Definition Workers.h:180
std::thread thread_
Definition Workers.h:183
std::mutex mutex_
Definition Workers.h:184
int const instance_
Definition Workers.h:181
Worker(Workers &workers, std::string threadName, int const instance)
Definition Workers.cpp:144
std::condition_variable wakeup_
Definition Workers.h:185
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:47
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:98
int numberOfThreads_
Definition Workers.h:202
std::atomic< int > activeCount_
Definition Workers.h:203
std::mutex mut_
Definition Workers.h:199
beast::LockFreeStack< Worker, PausedTag > paused_
Definition Workers.h:207
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:118
std::atomic< int > pauseCount_
Definition Workers.h:204
Callback & callback_
Definition Workers.h:195
void addTask()
Add a task to be performed.
Definition Workers.cpp:112
Workers(Callback &callback, perf::PerfLog *perfLog, std::string threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition Workers.cpp:13
perf::PerfLog * perfLog_
Definition Workers.h:196
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:124
std::string threadNames_
Definition Workers.h:197
semaphore semaphore_
Definition Workers.h:201
bool allPaused_
Definition Workers.h:200
std::condition_variable cv_
Definition Workers.h:198
std::atomic< int > runningTaskCount_
Definition Workers.h:205
beast::LockFreeStack< Worker > everyone_
Definition Workers.h:206
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:37
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
T hardware_concurrency(T... args)
Dummy class for unit tests.
Definition Workers.h:14
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
BasicSemaphore< std::mutex, std::condition_variable > semaphore
Definition semaphore.h:87
Called to perform tasks as needed.
Definition Workers.h:65
Callback(Callback const &)=delete
virtual void processTask(int instance)=0
Perform a task.
Callback & operator=(Callback const &)=delete
virtual ~Callback()=default