rippled
Loading...
Searching...
No Matches
Workers.h
1#ifndef XRPL_CORE_WORKERS_H_INCLUDED
2#define XRPL_CORE_WORKERS_H_INCLUDED
3
4#include <xrpld/core/detail/semaphore.h>
5
6#include <xrpl/beast/core/LockFreeStack.h>
7
8#include <atomic>
9#include <condition_variable>
10#include <mutex>
11#include <string>
12#include <thread>
13
14namespace ripple {
15
16namespace perf {
17class PerfLog;
18}
19
63{
64public:
66 struct Callback
67 {
68 virtual ~Callback() = default;
69 Callback() = default;
70 Callback(Callback const&) = delete;
72 operator=(Callback const&) = delete;
73
84 virtual void
85 processTask(int instance) = 0;
86 };
87
95 explicit Workers(
96 Callback& callback,
97 perf::PerfLog* perfLog,
98 std::string const& threadNames = "Worker",
99 int numberOfThreads =
100 static_cast<int>(std::thread::hardware_concurrency()));
101
102 ~Workers();
103
112 int
113 getNumberOfThreads() const noexcept;
114
118 void
119 setNumberOfThreads(int numberOfThreads);
120
129 void
130 stop();
131
140 void
141 addTask();
142
147 int
148 numberOfCurrentlyRunningTasks() const noexcept;
149
150 //--------------------------------------------------------------------------
151
152private:
154 {
155 explicit PausedTag() = default;
156 };
157
158 /* A Worker executes tasks on its provided thread.
159
160 These are the states:
161
162 Active: Running the task processing loop.
163 Idle: Active, but blocked on waiting for a task.
164 Paused: Blocked waiting to exit or become active.
165 */
166 class Worker : public beast::LockFreeStack<Worker>::Node,
167 public beast::LockFreeStack<Worker, PausedTag>::Node
168 {
169 public:
170 Worker(
171 Workers& workers,
172 std::string const& threadName,
173 int const instance);
174
175 ~Worker();
176
177 void
178 notify();
179
180 private:
181 void
182 run();
183
184 private:
187 int const instance_;
188
192 int wakeCount_; // how many times to un-pause
194 };
195
196private:
197 static void
199
200private:
203 std::string m_threadNames; // The name to give each thread
204 std::condition_variable m_cv; // signaled when all threads paused
207 semaphore m_semaphore; // each pending task is 1 resource
208 int m_numberOfThreads; // how many we want active now
209 std::atomic<int> m_activeCount; // to know when all are paused
210 std::atomic<int> m_pauseCount; // how many threads need to pause now
212 m_runningTaskCount; // how many calls to processTask() active
213 beast::LockFreeStack<Worker> m_everyone; // holds all created workers
215 m_paused; // holds just paused workers
216};
217
218} // namespace ripple
219
220#endif
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
std::string const threadName_
Definition Workers.h:186
std::thread thread_
Definition Workers.h:189
std::condition_variable wakeup_
Definition Workers.h:191
Workers is effectively a thread pool.
Definition Workers.h:63
std::mutex m_mut
Definition Workers.h:205
std::condition_variable m_cv
Definition Workers.h:204
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:35
Callback & m_callback
Definition Workers.h:201
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:122
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition Workers.h:215
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:116
void addTask()
Add a task to be performed.
Definition Workers.cpp:110
int m_numberOfThreads
Definition Workers.h:208
std::string m_threadNames
Definition Workers.h:203
beast::LockFreeStack< Worker > m_everyone
Definition Workers.h:213
std::atomic< int > m_activeCount
Definition Workers.h:209
std::atomic< int > m_pauseCount
Definition Workers.h:210
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:96
perf::PerfLog * perfLog_
Definition Workers.h:202
std::atomic< int > m_runningTaskCount
Definition Workers.h:212
semaphore m_semaphore
Definition Workers.h:207
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:45
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:33
T hardware_concurrency(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
Called to perform tasks as needed.
Definition Workers.h:67
virtual ~Callback()=default
Callback & operator=(Callback const &)=delete
virtual void processTask(int instance)=0
Perform a task.
Callback(Callback const &)=delete