rippled
Loading...
Searching...
No Matches
Workers.h
1#ifndef XRPL_CORE_WORKERS_H_INCLUDED
2#define XRPL_CORE_WORKERS_H_INCLUDED
3
4#include <xrpl/beast/core/LockFreeStack.h>
5#include <xrpl/core/detail/semaphore.h>
6
7#include <atomic>
8#include <condition_variable>
9#include <mutex>
10#include <string>
11#include <thread>
12
13namespace xrpl {
14
15namespace perf {
16class PerfLog;
17}
18
62{
63public:
65 struct Callback
66 {
67 virtual ~Callback() = default;
68 Callback() = default;
69 Callback(Callback const&) = delete;
71 operator=(Callback const&) = delete;
72
83 virtual void
84 processTask(int instance) = 0;
85 };
86
94 explicit Workers(
95 Callback& callback,
96 perf::PerfLog* perfLog,
97 std::string const& threadNames = "Worker",
98 int numberOfThreads =
99 static_cast<int>(std::thread::hardware_concurrency()));
100
101 ~Workers();
102
111 int
112 getNumberOfThreads() const noexcept;
113
117 void
118 setNumberOfThreads(int numberOfThreads);
119
128 void
129 stop();
130
139 void
140 addTask();
141
146 int
147 numberOfCurrentlyRunningTasks() const noexcept;
148
149 //--------------------------------------------------------------------------
150
151private:
153 {
154 explicit PausedTag() = default;
155 };
156
157 /* A Worker executes tasks on its provided thread.
158
159 These are the states:
160
161 Active: Running the task processing loop.
162 Idle: Active, but blocked on waiting for a task.
163 Paused: Blocked waiting to exit or become active.
164 */
165 class Worker : public beast::LockFreeStack<Worker>::Node,
166 public beast::LockFreeStack<Worker, PausedTag>::Node
167 {
168 public:
169 Worker(
170 Workers& workers,
171 std::string const& threadName,
172 int const instance);
173
174 ~Worker();
175
176 void
177 notify();
178
179 private:
180 void
181 run();
182
183 private:
186 int const instance_;
187
191 int wakeCount_; // how many times to un-pause
193 };
194
195private:
196 static void
198
199private:
202 std::string m_threadNames; // The name to give each thread
203 std::condition_variable m_cv; // signaled when all threads paused
206 semaphore m_semaphore; // each pending task is 1 resource
207 int m_numberOfThreads; // how many we want active now
208 std::atomic<int> m_activeCount; // to know when all are paused
209 std::atomic<int> m_pauseCount; // how many threads need to pause now
211 m_runningTaskCount; // how many calls to processTask() active
212 beast::LockFreeStack<Worker> m_everyone; // holds all created workers
214 m_paused; // holds just paused workers
215};
216
217} // namespace xrpl
218
219#endif
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
std::string const threadName_
Definition Workers.h:185
std::thread thread_
Definition Workers.h:188
std::mutex mutex_
Definition Workers.h:189
int const instance_
Definition Workers.h:186
Workers & m_workers
Definition Workers.h:184
std::condition_variable wakeup_
Definition Workers.h:190
Workers is effectively a thread pool.
Definition Workers.h:62
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:44
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:95
std::atomic< int > m_activeCount
Definition Workers.h:208
std::atomic< int > m_runningTaskCount
Definition Workers.h:211
std::mutex m_mut
Definition Workers.h:204
std::condition_variable m_cv
Definition Workers.h:203
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:115
bool m_allPaused
Definition Workers.h:205
void addTask()
Add a task to be performed.
Definition Workers.cpp:109
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition Workers.h:214
std::atomic< int > m_pauseCount
Definition Workers.h:209
perf::PerfLog * perfLog_
Definition Workers.h:201
semaphore m_semaphore
Definition Workers.h:206
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:121
Callback & m_callback
Definition Workers.h:200
int m_numberOfThreads
Definition Workers.h:207
std::string m_threadNames
Definition Workers.h:202
beast::LockFreeStack< Worker > m_everyone
Definition Workers.h:212
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:34
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:32
T hardware_concurrency(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
Called to perform tasks as needed.
Definition Workers.h:66
Callback(Callback const &)=delete
virtual void processTask(int instance)=0
Perform a task.
Callback & operator=(Callback const &)=delete
virtual ~Callback()=default