xrpld
Loading...
Searching...
No Matches
Workers.cpp
1#include <xrpl/core/detail/Workers.h>
2
3#include <xrpl/beast/core/CurrentThreadName.h>
4#include <xrpl/beast/core/LockFreeStack.h>
5#include <xrpl/core/PerfLog.h>
6
7#include <mutex>
8#include <string>
9#include <utility>
10
11namespace xrpl {
12
14 Callback& callback,
15 perf::PerfLog* perfLog,
16 std::string threadNames,
17 int numberOfThreads)
18 : callback_(callback)
19 , perfLog_(perfLog)
20 , threadNames_(std::move(threadNames))
21 , semaphore_(0)
22 , activeCount_(0)
23 , pauseCount_(0)
25{
26 setNumberOfThreads(numberOfThreads);
27}
28
35
36int
38{
39 return numberOfThreads_;
40}
41
42// VFALCO NOTE if this function is called quickly to reduce then
43// increase the number of threads, it could result in
44// more paused threads being created than expected.
45//
46void
47Workers::setNumberOfThreads(int numberOfThreads)
48{
49 static int kInstance{0};
50 if (numberOfThreads_ == numberOfThreads)
51 return;
52
53 if (perfLog_ != nullptr)
54 perfLog_->resizeJobs(numberOfThreads);
55
56 if (numberOfThreads > numberOfThreads_)
57 {
58 // Increasing the number of working threads
59 int const amount = numberOfThreads - numberOfThreads_;
60
61 for (int i = 0; i < amount; ++i)
62 {
63 // See if we can reuse a paused worker
64 Worker* worker = paused_.popFront();
65
66 if (worker != nullptr)
67 {
68 // If we got here then the worker thread is at [1]
69 // This will unblock their call to wait()
70 //
71 worker->notify();
72 }
73 else
74 {
75 worker = new Worker(*this, threadNames_, kInstance++);
76 everyone_.pushFront(worker);
77 }
78 }
79 }
80 else
81 {
82 // Decreasing the number of working threads
83 int const amount = numberOfThreads_ - numberOfThreads;
84
85 for (int i = 0; i < amount; ++i)
86 {
88
89 // Pausing a thread counts as one "internal task"
90 semaphore_.notify();
91 }
92 }
93
94 numberOfThreads_ = numberOfThreads;
95}
96
97void
99{
101
102 // Wait until all workers have paused AND no tasks are actively running.
103 // Both conditions are needed because allPaused_ (mutex-protected) and
104 // runningTaskCount_ (atomic) are not synchronized under the same lock,
105 // so allPaused_ can momentarily be true while a task is still finishing.
107 cv_.wait(lk, [this] { return allPaused_ && numberOfCurrentlyRunningTasks() == 0; });
108 lk.unlock();
109}
110
111void
113{
114 semaphore_.notify();
115}
116
117int
119{
120 return runningTaskCount_.load();
121}
122
123void
125{
126 for (;;)
127 {
128 Worker const* const worker = stack.popFront();
129
130 if (worker != nullptr)
131 {
132 // This call blocks until the thread orderly exits
133 delete worker;
134 }
135 else
136 {
137 break;
138 }
139 }
140}
141
142//------------------------------------------------------------------------------
143
144Workers::Worker::Worker(Workers& workers, std::string threadName, int const instance)
145 : workers_{workers}, threadName_{std::move(threadName)}, instance_{instance}
146
147{
149}
150
152{
153 {
154 std::scoped_lock const lock{mutex_};
155 ++wakeCount_;
156 shouldExit_ = true;
157 }
158
159 wakeup_.notify_one();
160 thread_.join();
161}
162
163void
165{
166 std::scoped_lock const lock{mutex_};
167 ++wakeCount_;
168 wakeup_.notify_one();
169}
170
171void
173{
174 bool shouldExit = true;
175 do
176 {
177 // Increment the count of active workers, and if
178 // we are the first one then reset the "all paused" event
179 //
180 if (++workers_.activeCount_ == 1)
181 {
182 std::scoped_lock const lk{workers_.mut_};
183 workers_.allPaused_ = false;
184 }
185
186 for (;;)
187 {
188 // Put the name back in case the callback changed it
190
191 // Acquire a task or "internal task."
192 //
193 workers_.semaphore_.wait();
194
195 // See if there's a pause request. This
196 // counts as an "internal task."
197 //
198 int pauseCount = workers_.pauseCount_.load();
199
200 if (pauseCount > 0)
201 {
202 // Try to decrement
203 pauseCount = --workers_.pauseCount_;
204
205 if (pauseCount >= 0)
206 {
207 // We got paused
208 break;
209 }
210
211 // Undo our decrement
212 ++workers_.pauseCount_;
213 }
214
215 // We couldn't pause so we must have gotten
216 // unblocked in order to process a task.
217 //
218 ++workers_.runningTaskCount_;
219 workers_.callback_.processTask(instance_);
220
221 // When the running task count drops to zero, wake stop() which
222 // may be waiting for both allPaused_ and zero running tasks.
223 // Locking mut_ before notify_all() prevents a lost wakeup:
224 // it serializes against the predicate check inside stop()'s
225 // cv_.wait(), ensuring the notification is not missed between
226 // the predicate evaluation and the actual sleep.
227 if (--workers_.runningTaskCount_ == 0)
228 {
229 std::scoped_lock const lk{workers_.mut_};
230 workers_.cv_.notify_all();
231 }
232 }
233
234 // Any worker that goes into the paused list must
235 // guarantee that it will eventually block on its
236 // event object.
237 //
238 workers_.paused_.pushFront(this);
239
240 // Decrement the count of active workers, and if we
241 // are the last one then signal the "all paused" event.
242 //
243 if (--workers_.activeCount_ == 0)
244 {
245 std::scoped_lock const lk{workers_.mut_};
246 workers_.allPaused_ = true;
247 workers_.cv_.notify_all();
248 }
249
250 // Set inactive thread name.
252
253 // [1] We will be here when the paused list is popped
254 //
255 // We block on our condition_variable, wakeup_, a requirement of being
256 // put into the paused list.
257 //
258 // wakeup_ will get signaled by either Worker::notify() or ~Worker.
259 {
261 wakeup_.wait(lock, [this] { return this->wakeCount_ > 0; });
262
263 shouldExit = shouldExit_;
264 --wakeCount_;
265 }
266 } while (!shouldExit);
267}
268
269} // namespace xrpl
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * popFront()
Pop an element off the stack.
std::string const threadName_
Definition Workers.h:180
std::thread thread_
Definition Workers.h:183
std::mutex mutex_
Definition Workers.h:184
int const instance_
Definition Workers.h:181
Worker(Workers &workers, std::string threadName, int const instance)
Definition Workers.cpp:144
std::condition_variable wakeup_
Definition Workers.h:185
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:47
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:98
int numberOfThreads_
Definition Workers.h:202
std::atomic< int > activeCount_
Definition Workers.h:203
std::mutex mut_
Definition Workers.h:199
beast::LockFreeStack< Worker, PausedTag > paused_
Definition Workers.h:207
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:118
std::atomic< int > pauseCount_
Definition Workers.h:204
Callback & callback_
Definition Workers.h:195
void addTask()
Add a task to be performed.
Definition Workers.cpp:112
Workers(Callback &callback, perf::PerfLog *perfLog, std::string threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition Workers.cpp:13
perf::PerfLog * perfLog_
Definition Workers.h:196
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:124
std::string threadNames_
Definition Workers.h:197
semaphore semaphore_
Definition Workers.h:201
bool allPaused_
Definition Workers.h:200
std::condition_variable cv_
Definition Workers.h:198
std::atomic< int > runningTaskCount_
Definition Workers.h:205
beast::LockFreeStack< Worker > everyone_
Definition Workers.h:206
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:37
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
Called to perform tasks as needed.
Definition Workers.h:65
T unlock(T... args)