rippled
Loading...
Searching...
No Matches
Workers.h
1#pragma once
2
3#include <xrpl/beast/core/LockFreeStack.h>
4#include <xrpl/core/detail/semaphore.h>
5
6#include <atomic>
7#include <condition_variable>
8#include <mutex>
9#include <string>
10#include <thread>
11
12namespace xrpl {
13
14namespace perf {
15class PerfLog;
16}
17
61{
62public:
64 struct Callback
65 {
66 virtual ~Callback() = default;
67 Callback() = default;
68 Callback(Callback const&) = delete;
70 operator=(Callback const&) = delete;
71
82 virtual void
83 processTask(int instance) = 0;
84 };
85
93 explicit Workers(
94 Callback& callback,
95 perf::PerfLog* perfLog,
96 std::string const& threadNames = "Worker",
97 int numberOfThreads = static_cast<int>(std::thread::hardware_concurrency()));
98
99 ~Workers();
100
109 int
110 getNumberOfThreads() const noexcept;
111
115 void
116 setNumberOfThreads(int numberOfThreads);
117
126 void
127 stop();
128
137 void
138 addTask();
139
144 int
145 numberOfCurrentlyRunningTasks() const noexcept;
146
147 //--------------------------------------------------------------------------
148
149private:
151 {
152 explicit PausedTag() = default;
153 };
154
155 /* A Worker executes tasks on its provided thread.
156
157 These are the states:
158
159 Active: Running the task processing loop.
160 Idle: Active, but blocked on waiting for a task.
161 Paused: Blocked waiting to exit or become active.
162 */
163 class Worker : public beast::LockFreeStack<Worker>::Node, public beast::LockFreeStack<Worker, PausedTag>::Node
164 {
165 public:
166 Worker(Workers& workers, std::string const& threadName, int const instance);
167
168 ~Worker();
169
170 void
171 notify();
172
173 private:
174 void
175 run();
176
177 private:
180 int const instance_;
181
185 int wakeCount_; // how many times to un-pause
187 };
188
189private:
190 static void
192
193private:
196 std::string m_threadNames; // The name to give each thread
197 std::condition_variable m_cv; // signaled when all threads paused
200 semaphore m_semaphore; // each pending task is 1 resource
201 int m_numberOfThreads; // how many we want active now
202 std::atomic<int> m_activeCount; // to know when all are paused
203 std::atomic<int> m_pauseCount; // how many threads need to pause now
204 std::atomic<int> m_runningTaskCount; // how many calls to processTask() active
205 beast::LockFreeStack<Worker> m_everyone; // holds all created workers
207};
208
209} // namespace xrpl
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
std::string const threadName_
Definition Workers.h:179
std::thread thread_
Definition Workers.h:182
std::mutex mutex_
Definition Workers.h:183
int const instance_
Definition Workers.h:180
Workers & m_workers
Definition Workers.h:178
std::condition_variable wakeup_
Definition Workers.h:184
Workers is effectively a thread pool.
Definition Workers.h:61
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:40
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:91
std::atomic< int > m_activeCount
Definition Workers.h:202
std::atomic< int > m_runningTaskCount
Definition Workers.h:204
std::mutex m_mut
Definition Workers.h:198
std::condition_variable m_cv
Definition Workers.h:197
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:109
bool m_allPaused
Definition Workers.h:199
void addTask()
Add a task to be performed.
Definition Workers.cpp:103
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition Workers.h:206
std::atomic< int > m_pauseCount
Definition Workers.h:203
perf::PerfLog * perfLog_
Definition Workers.h:195
semaphore m_semaphore
Definition Workers.h:200
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:115
Callback & m_callback
Definition Workers.h:194
int m_numberOfThreads
Definition Workers.h:201
std::string m_threadNames
Definition Workers.h:196
beast::LockFreeStack< Worker > m_everyone
Definition Workers.h:205
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:30
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
T hardware_concurrency(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
Called to perform tasks as needed.
Definition Workers.h:65
Callback(Callback const &)=delete
virtual void processTask(int instance)=0
Perform a task.
Callback & operator=(Callback const &)=delete
virtual ~Callback()=default