rippled
Loading...
Searching...
No Matches
Workers.cpp
1#include <xrpl/beast/core/CurrentThreadName.h>
2#include <xrpl/core/PerfLog.h>
3#include <xrpl/core/detail/Workers.h>
4
5namespace xrpl {
6
8 Callback& callback,
9 perf::PerfLog* perfLog,
10 std::string const& threadNames,
11 int numberOfThreads)
12 : m_callback(callback)
13 , perfLog_(perfLog)
14 , m_threadNames(threadNames)
15 , m_semaphore(0)
16 , m_activeCount(0)
17 , m_pauseCount(0)
18 , m_runningTaskCount(0)
19{
20 setNumberOfThreads(numberOfThreads);
21}
22
29
30int
32{
33 return m_numberOfThreads;
34}
35
36// VFALCO NOTE if this function is called quickly to reduce then
37// increase the number of threads, it could result in
38// more paused threads being created than expected.
39//
40void
41Workers::setNumberOfThreads(int numberOfThreads)
42{
43 static int instance{0};
44 if (m_numberOfThreads == numberOfThreads)
45 return;
46
47 if (perfLog_ != nullptr)
48 perfLog_->resizeJobs(numberOfThreads);
49
50 if (numberOfThreads > m_numberOfThreads)
51 {
52 // Increasing the number of working threads
53 int const amount = numberOfThreads - m_numberOfThreads;
54
55 for (int i = 0; i < amount; ++i)
56 {
57 // See if we can reuse a paused worker
58 Worker* worker = m_paused.pop_front();
59
60 if (worker != nullptr)
61 {
62 // If we got here then the worker thread is at [1]
63 // This will unblock their call to wait()
64 //
65 worker->notify();
66 }
67 else
68 {
69 worker = new Worker(*this, m_threadNames, instance++);
70 m_everyone.push_front(worker);
71 }
72 }
73 }
74 else
75 {
76 // Decreasing the number of working threads
77 int const amount = m_numberOfThreads - numberOfThreads;
78
79 for (int i = 0; i < amount; ++i)
80 {
82
83 // Pausing a thread counts as one "internal task"
85 }
86 }
87
88 m_numberOfThreads = numberOfThreads;
89}
90
91void
93{
95
96 // Wait until all workers have paused AND no tasks are actively running.
97 // Both conditions are needed because m_allPaused (mutex-protected) and
98 // m_runningTaskCount (atomic) are not synchronized under the same lock,
99 // so m_allPaused can momentarily be true while a task is still finishing.
101 m_cv.wait(lk, [this] { return m_allPaused && numberOfCurrentlyRunningTasks() == 0; });
102 lk.unlock();
103}
104
105void
110
111int
113{
114 return m_runningTaskCount.load();
115}
116
117void
119{
120 for (;;)
121 {
122 Worker const* const worker = stack.pop_front();
123
124 if (worker != nullptr)
125 {
126 // This call blocks until the thread orderly exits
127 delete worker;
128 }
129 else
130 {
131 break;
132 }
133 }
134}
135
136//------------------------------------------------------------------------------
137
138Workers::Worker::Worker(Workers& workers, std::string const& threadName, int const instance)
139 : m_workers{workers}, threadName_{threadName}, instance_{instance}
140
141{
143}
144
146{
147 {
148 std::lock_guard const lock{mutex_};
149 ++wakeCount_;
150 shouldExit_ = true;
151 }
152
153 wakeup_.notify_one();
154 thread_.join();
155}
156
157void
159{
160 std::lock_guard const lock{mutex_};
161 ++wakeCount_;
162 wakeup_.notify_one();
163}
164
165void
167{
168 bool shouldExit = true;
169 do
170 {
171 // Increment the count of active workers, and if
172 // we are the first one then reset the "all paused" event
173 //
174 if (++m_workers.m_activeCount == 1)
175 {
176 std::lock_guard const lk{m_workers.m_mut};
177 m_workers.m_allPaused = false;
178 }
179
180 for (;;)
181 {
182 // Put the name back in case the callback changed it
183 beast::setCurrentThreadName(threadName_);
184
185 // Acquire a task or "internal task."
186 //
187 m_workers.m_semaphore.wait();
188
189 // See if there's a pause request. This
190 // counts as an "internal task."
191 //
192 int pauseCount = m_workers.m_pauseCount.load();
193
194 if (pauseCount > 0)
195 {
196 // Try to decrement
197 pauseCount = --m_workers.m_pauseCount;
198
199 if (pauseCount >= 0)
200 {
201 // We got paused
202 break;
203 }
204
205 // Undo our decrement
206 ++m_workers.m_pauseCount;
207 }
208
209 // We couldn't pause so we must have gotten
210 // unblocked in order to process a task.
211 //
212 ++m_workers.m_runningTaskCount;
213 m_workers.m_callback.processTask(instance_);
214
215 // When the running task count drops to zero, wake stop() which
216 // may be waiting for both m_allPaused and zero running tasks.
217 // Locking m_mut before notify_all() prevents a lost wakeup:
218 // it serializes against the predicate check inside stop()'s
219 // cv.wait(), ensuring the notification is not missed between
220 // the predicate evaluation and the actual sleep.
221 if (--m_workers.m_runningTaskCount == 0)
222 {
223 std::lock_guard const lk{m_workers.m_mut};
224 m_workers.m_cv.notify_all();
225 }
226 }
227
228 // Any worker that goes into the paused list must
229 // guarantee that it will eventually block on its
230 // event object.
231 //
232 m_workers.m_paused.push_front(this);
233
234 // Decrement the count of active workers, and if we
235 // are the last one then signal the "all paused" event.
236 //
237 if (--m_workers.m_activeCount == 0)
238 {
239 std::lock_guard const lk{m_workers.m_mut};
240 m_workers.m_allPaused = true;
241 m_workers.m_cv.notify_all();
242 }
243
244 // Set inactive thread name.
245 beast::setCurrentThreadName("(" + threadName_ + ")");
246
247 // [1] We will be here when the paused list is popped
248 //
249 // We block on our condition_variable, wakeup_, a requirement of being
250 // put into the paused list.
251 //
252 // wakeup_ will get signaled by either Worker::notify() or ~Worker.
253 {
254 std::unique_lock<std::mutex> lock{mutex_};
255 wakeup_.wait(lock, [this] { return this->wakeCount_ > 0; });
256
257 shouldExit = shouldExit_;
258 --wakeCount_;
259 }
260 } while (!shouldExit);
261}
262
263} // namespace xrpl
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * pop_front()
Pop an element off the stack.
std::thread thread_
Definition Workers.h:183
Worker(Workers &workers, std::string const &threadName, int const instance)
Definition Workers.cpp:138
Workers is effectively a thread pool.
Definition Workers.h:61
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:41
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition Workers.cpp:7
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:92
std::atomic< int > m_runningTaskCount
Definition Workers.h:205
std::mutex m_mut
Definition Workers.h:199
std::condition_variable m_cv
Definition Workers.h:198
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:112
bool m_allPaused
Definition Workers.h:200
void addTask()
Add a task to be performed.
Definition Workers.cpp:106
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition Workers.h:207
std::atomic< int > m_pauseCount
Definition Workers.h:204
perf::PerfLog * perfLog_
Definition Workers.h:196
semaphore m_semaphore
Definition Workers.h:201
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:118
int m_numberOfThreads
Definition Workers.h:202
std::string m_threadNames
Definition Workers.h:197
beast::LockFreeStack< Worker > m_everyone
Definition Workers.h:206
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:31
void notify()
Increment the count and unblock one waiting thread.
Definition semaphore.h:56
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
T join(T... args)
T load(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
Called to perform tasks as needed.
Definition Workers.h:65