rippled
Loading...
Searching...
No Matches
Workers.cpp
1#include <xrpld/core/detail/Workers.h>
2#include <xrpld/perflog/PerfLog.h>
3
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/utility/instrumentation.h>
6
7namespace ripple {
8
10 Callback& callback,
11 perf::PerfLog* perfLog,
12 std::string const& threadNames,
13 int numberOfThreads)
14 : m_callback(callback)
15 , perfLog_(perfLog)
16 , m_threadNames(threadNames)
17 , m_allPaused(true)
18 , m_semaphore(0)
19 , m_numberOfThreads(0)
20 , m_activeCount(0)
21 , m_pauseCount(0)
22 , m_runningTaskCount(0)
23{
24 setNumberOfThreads(numberOfThreads);
25}
26
33
34int
36{
37 return m_numberOfThreads;
38}
39
40// VFALCO NOTE if this function is called quickly to reduce then
41// increase the number of threads, it could result in
42// more paused threads being created than expected.
43//
44void
45Workers::setNumberOfThreads(int numberOfThreads)
46{
47 static int instance{0};
48 if (m_numberOfThreads == numberOfThreads)
49 return;
50
51 if (perfLog_)
52 perfLog_->resizeJobs(numberOfThreads);
53
54 if (numberOfThreads > m_numberOfThreads)
55 {
56 // Increasing the number of working threads
57 int const amount = numberOfThreads - m_numberOfThreads;
58
59 for (int i = 0; i < amount; ++i)
60 {
61 // See if we can reuse a paused worker
62 Worker* worker = m_paused.pop_front();
63
64 if (worker != nullptr)
65 {
66 // If we got here then the worker thread is at [1]
67 // This will unblock their call to wait()
68 //
69 worker->notify();
70 }
71 else
72 {
73 worker = new Worker(*this, m_threadNames, instance++);
74 m_everyone.push_front(worker);
75 }
76 }
77 }
78 else
79 {
80 // Decreasing the number of working threads
81 int const amount = m_numberOfThreads - numberOfThreads;
82
83 for (int i = 0; i < amount; ++i)
84 {
86
87 // Pausing a thread counts as one "internal task"
89 }
90 }
91
92 m_numberOfThreads = numberOfThreads;
93}
94
95void
97{
99
101 m_cv.wait(lk, [this] { return m_allPaused; });
102 lk.unlock();
103
104 XRPL_ASSERT(
106 "ripple::Workers::stop : zero running tasks");
107}
108
109void
114
115int
117{
118 return m_runningTaskCount.load();
119}
120
121void
123{
124 for (;;)
125 {
126 Worker* const worker = stack.pop_front();
127
128 if (worker != nullptr)
129 {
130 // This call blocks until the thread orderly exits
131 delete worker;
132 }
133 else
134 {
135 break;
136 }
137 }
138}
139
140//------------------------------------------------------------------------------
141
143 Workers& workers,
144 std::string const& threadName,
145 int const instance)
146 : m_workers{workers}
147 , threadName_{threadName}
148 , instance_{instance}
149 , wakeCount_{0}
150 , shouldExit_{false}
151{
153}
154
156{
157 {
158 std::lock_guard lock{mutex_};
159 ++wakeCount_;
160 shouldExit_ = true;
161 }
162
163 wakeup_.notify_one();
164 thread_.join();
165}
166
167void
169{
170 std::lock_guard lock{mutex_};
171 ++wakeCount_;
172 wakeup_.notify_one();
173}
174
175void
177{
178 bool shouldExit = true;
179 do
180 {
181 // Increment the count of active workers, and if
182 // we are the first one then reset the "all paused" event
183 //
184 if (++m_workers.m_activeCount == 1)
185 {
186 std::lock_guard lk{m_workers.m_mut};
187 m_workers.m_allPaused = false;
188 }
189
190 for (;;)
191 {
192 // Put the name back in case the callback changed it
193 beast::setCurrentThreadName(threadName_);
194
195 // Acquire a task or "internal task."
196 //
197 m_workers.m_semaphore.wait();
198
199 // See if there's a pause request. This
200 // counts as an "internal task."
201 //
202 int pauseCount = m_workers.m_pauseCount.load();
203
204 if (pauseCount > 0)
205 {
206 // Try to decrement
207 pauseCount = --m_workers.m_pauseCount;
208
209 if (pauseCount >= 0)
210 {
211 // We got paused
212 break;
213 }
214 else
215 {
216 // Undo our decrement
217 ++m_workers.m_pauseCount;
218 }
219 }
220
221 // We couldn't pause so we must have gotten
222 // unblocked in order to process a task.
223 //
224 ++m_workers.m_runningTaskCount;
225 m_workers.m_callback.processTask(instance_);
226 --m_workers.m_runningTaskCount;
227 }
228
229 // Any worker that goes into the paused list must
230 // guarantee that it will eventually block on its
231 // event object.
232 //
233 m_workers.m_paused.push_front(this);
234
235 // Decrement the count of active workers, and if we
236 // are the last one then signal the "all paused" event.
237 //
238 if (--m_workers.m_activeCount == 0)
239 {
240 std::lock_guard lk{m_workers.m_mut};
241 m_workers.m_allPaused = true;
242 m_workers.m_cv.notify_all();
243 }
244
245 // Set inactive thread name.
246 beast::setCurrentThreadName("(" + threadName_ + ")");
247
248 // [1] We will be here when the paused list is popped
249 //
250 // We block on our condition_variable, wakeup_, a requirement of being
251 // put into the paused list.
252 //
253 // wakeup_ will get signaled by either Worker::notify() or ~Worker.
254 {
255 std::unique_lock<std::mutex> lock{mutex_};
256 wakeup_.wait(lock, [this] { return this->wakeCount_ > 0; });
257
258 shouldExit = shouldExit_;
259 --wakeCount_;
260 }
261 } while (!shouldExit);
262}
263
264} // namespace ripple
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * pop_front()
Pop an element off the stack.
std::thread thread_
Definition Workers.h:189
Worker(Workers &workers, std::string const &threadName, int const instance)
Definition Workers.cpp:142
Workers is effectively a thread pool.
Definition Workers.h:63
std::mutex m_mut
Definition Workers.h:205
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition Workers.cpp:9
std::condition_variable m_cv
Definition Workers.h:204
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:35
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:122
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition Workers.h:215
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:116
void addTask()
Add a task to be performed.
Definition Workers.cpp:110
int m_numberOfThreads
Definition Workers.h:208
std::string m_threadNames
Definition Workers.h:203
beast::LockFreeStack< Worker > m_everyone
Definition Workers.h:213
std::atomic< int > m_pauseCount
Definition Workers.h:210
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:96
perf::PerfLog * perfLog_
Definition Workers.h:202
std::atomic< int > m_runningTaskCount
Definition Workers.h:212
semaphore m_semaphore
Definition Workers.h:207
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:45
void notify()
Increment the count and unblock one waiting thread.
Definition semaphore.h:57
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:33
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
T join(T... args)
T load(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
Called to perform tasks as needed.
Definition Workers.h:67