rippled
Loading...
Searching...
No Matches
Workers.h
1#pragma once
2
3#include <xrpl/beast/core/LockFreeStack.h>
4#include <xrpl/core/detail/semaphore.h>
5
6#include <atomic>
7#include <condition_variable>
8#include <mutex>
9#include <string>
10#include <thread>
11
12namespace xrpl {
13
14namespace perf {
15class PerfLog;
16} // namespace perf
17
61{
62public:
64 struct Callback
65 {
66 virtual ~Callback() = default;
67 Callback() = default;
68 Callback(Callback const&) = delete;
70 operator=(Callback const&) = delete;
71
82 virtual void
83 processTask(int instance) = 0;
84 };
85
93 explicit Workers(
94 Callback& callback,
95 perf::PerfLog* perfLog,
96 std::string const& threadNames = "Worker",
97 int numberOfThreads = static_cast<int>(std::thread::hardware_concurrency()));
98
99 ~Workers();
100
109 int
110 getNumberOfThreads() const noexcept;
111
115 void
116 setNumberOfThreads(int numberOfThreads);
117
126 void
127 stop();
128
137 void
138 addTask();
139
144 int
145 numberOfCurrentlyRunningTasks() const noexcept;
146
147 //--------------------------------------------------------------------------
148
149private:
151 {
152 explicit PausedTag() = default;
153 };
154
155 /* A Worker executes tasks on its provided thread.
156
157 These are the states:
158
159 Active: Running the task processing loop.
160 Idle: Active, but blocked on waiting for a task.
161 Paused: Blocked waiting to exit or become active.
162 */
163 class Worker : public beast::LockFreeStack<Worker>::Node,
164 public beast::LockFreeStack<Worker, PausedTag>::Node
165 {
166 public:
167 Worker(Workers& workers, std::string const& threadName, int const instance);
168
169 ~Worker();
170
171 void
172 notify();
173
174 private:
175 void
176 run();
177
178 private:
181 int const instance_;
182
186 int wakeCount_{0}; // how many times to un-pause
187 bool shouldExit_{false};
188 };
189
190private:
191 static void
193
194private:
197 std::string m_threadNames; // The name to give each thread
198 std::condition_variable m_cv; // signaled when all threads paused
200 bool m_allPaused{true};
201 semaphore m_semaphore; // each pending task is 1 resource
202 int m_numberOfThreads{0}; // how many we want active now
203 std::atomic<int> m_activeCount; // to know when all are paused
204 std::atomic<int> m_pauseCount; // how many threads need to pause now
205 std::atomic<int> m_runningTaskCount; // how many calls to processTask() active
206 beast::LockFreeStack<Worker> m_everyone; // holds all created workers
208};
209
210} // namespace xrpl
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
std::string const threadName_
Definition Workers.h:180
std::thread thread_
Definition Workers.h:183
std::mutex mutex_
Definition Workers.h:184
int const instance_
Definition Workers.h:181
Workers & m_workers
Definition Workers.h:179
std::condition_variable wakeup_
Definition Workers.h:185
Workers is effectively a thread pool.
Definition Workers.h:61
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:41
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:92
std::atomic< int > m_activeCount
Definition Workers.h:203
std::atomic< int > m_runningTaskCount
Definition Workers.h:205
std::mutex m_mut
Definition Workers.h:199
std::condition_variable m_cv
Definition Workers.h:198
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:112
bool m_allPaused
Definition Workers.h:200
void addTask()
Add a task to be performed.
Definition Workers.cpp:106
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition Workers.h:207
std::atomic< int > m_pauseCount
Definition Workers.h:204
perf::PerfLog * perfLog_
Definition Workers.h:196
semaphore m_semaphore
Definition Workers.h:201
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:118
Callback & m_callback
Definition Workers.h:195
int m_numberOfThreads
Definition Workers.h:202
std::string m_threadNames
Definition Workers.h:197
beast::LockFreeStack< Worker > m_everyone
Definition Workers.h:206
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:31
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
T hardware_concurrency(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
Called to perform tasks as needed.
Definition Workers.h:65
Callback(Callback const &)=delete
virtual void processTask(int instance)=0
Perform a task.
Callback & operator=(Callback const &)=delete
virtual ~Callback()=default