rippled
Loading...
Searching...
No Matches
Workers.cpp
1#include <xrpl/beast/core/CurrentThreadName.h>
2#include <xrpl/beast/utility/instrumentation.h>
3#include <xrpl/core/PerfLog.h>
4#include <xrpl/core/detail/Workers.h>
5
6namespace xrpl {
7
9 Callback& callback,
10 perf::PerfLog* perfLog,
11 std::string const& threadNames,
12 int numberOfThreads)
13 : m_callback(callback)
14 , perfLog_(perfLog)
15 , m_threadNames(threadNames)
16 , m_allPaused(true)
17 , m_semaphore(0)
18 , m_numberOfThreads(0)
19 , m_activeCount(0)
20 , m_pauseCount(0)
21 , m_runningTaskCount(0)
22{
23 setNumberOfThreads(numberOfThreads);
24}
25
32
33int
35{
36 return m_numberOfThreads;
37}
38
39// VFALCO NOTE if this function is called quickly to reduce then
40// increase the number of threads, it could result in
41// more paused threads being created than expected.
42//
43void
44Workers::setNumberOfThreads(int numberOfThreads)
45{
46 static int instance{0};
47 if (m_numberOfThreads == numberOfThreads)
48 return;
49
50 if (perfLog_)
51 perfLog_->resizeJobs(numberOfThreads);
52
53 if (numberOfThreads > m_numberOfThreads)
54 {
55 // Increasing the number of working threads
56 int const amount = numberOfThreads - m_numberOfThreads;
57
58 for (int i = 0; i < amount; ++i)
59 {
60 // See if we can reuse a paused worker
61 Worker* worker = m_paused.pop_front();
62
63 if (worker != nullptr)
64 {
65 // If we got here then the worker thread is at [1]
66 // This will unblock their call to wait()
67 //
68 worker->notify();
69 }
70 else
71 {
72 worker = new Worker(*this, m_threadNames, instance++);
73 m_everyone.push_front(worker);
74 }
75 }
76 }
77 else
78 {
79 // Decreasing the number of working threads
80 int const amount = m_numberOfThreads - numberOfThreads;
81
82 for (int i = 0; i < amount; ++i)
83 {
85
86 // Pausing a thread counts as one "internal task"
88 }
89 }
90
91 m_numberOfThreads = numberOfThreads;
92}
93
94void
96{
98
100 m_cv.wait(lk, [this] { return m_allPaused; });
101 lk.unlock();
102
103 XRPL_ASSERT(
105 "xrpl::Workers::stop : zero running tasks");
106}
107
108void
113
114int
116{
117 return m_runningTaskCount.load();
118}
119
120void
122{
123 for (;;)
124 {
125 Worker* const worker = stack.pop_front();
126
127 if (worker != nullptr)
128 {
129 // This call blocks until the thread orderly exits
130 delete worker;
131 }
132 else
133 {
134 break;
135 }
136 }
137}
138
139//------------------------------------------------------------------------------
140
142 Workers& workers,
143 std::string const& threadName,
144 int const instance)
145 : m_workers{workers}
146 , threadName_{threadName}
147 , instance_{instance}
148 , wakeCount_{0}
149 , shouldExit_{false}
150{
152}
153
155{
156 {
157 std::lock_guard lock{mutex_};
158 ++wakeCount_;
159 shouldExit_ = true;
160 }
161
162 wakeup_.notify_one();
163 thread_.join();
164}
165
166void
168{
169 std::lock_guard lock{mutex_};
170 ++wakeCount_;
171 wakeup_.notify_one();
172}
173
174void
176{
177 bool shouldExit = true;
178 do
179 {
180 // Increment the count of active workers, and if
181 // we are the first one then reset the "all paused" event
182 //
183 if (++m_workers.m_activeCount == 1)
184 {
185 std::lock_guard lk{m_workers.m_mut};
186 m_workers.m_allPaused = false;
187 }
188
189 for (;;)
190 {
191 // Put the name back in case the callback changed it
192 beast::setCurrentThreadName(threadName_);
193
194 // Acquire a task or "internal task."
195 //
196 m_workers.m_semaphore.wait();
197
198 // See if there's a pause request. This
199 // counts as an "internal task."
200 //
201 int pauseCount = m_workers.m_pauseCount.load();
202
203 if (pauseCount > 0)
204 {
205 // Try to decrement
206 pauseCount = --m_workers.m_pauseCount;
207
208 if (pauseCount >= 0)
209 {
210 // We got paused
211 break;
212 }
213 else
214 {
215 // Undo our decrement
216 ++m_workers.m_pauseCount;
217 }
218 }
219
220 // We couldn't pause so we must have gotten
221 // unblocked in order to process a task.
222 //
223 ++m_workers.m_runningTaskCount;
224 m_workers.m_callback.processTask(instance_);
225 --m_workers.m_runningTaskCount;
226 }
227
228 // Any worker that goes into the paused list must
229 // guarantee that it will eventually block on its
230 // event object.
231 //
232 m_workers.m_paused.push_front(this);
233
234 // Decrement the count of active workers, and if we
235 // are the last one then signal the "all paused" event.
236 //
237 if (--m_workers.m_activeCount == 0)
238 {
239 std::lock_guard lk{m_workers.m_mut};
240 m_workers.m_allPaused = true;
241 m_workers.m_cv.notify_all();
242 }
243
244 // Set inactive thread name.
245 beast::setCurrentThreadName("(" + threadName_ + ")");
246
247 // [1] We will be here when the paused list is popped
248 //
249 // We block on our condition_variable, wakeup_, a requirement of being
250 // put into the paused list.
251 //
252 // wakeup_ will get signaled by either Worker::notify() or ~Worker.
253 {
254 std::unique_lock<std::mutex> lock{mutex_};
255 wakeup_.wait(lock, [this] { return this->wakeCount_ > 0; });
256
257 shouldExit = shouldExit_;
258 --wakeCount_;
259 }
260 } while (!shouldExit);
261}
262
263} // namespace xrpl
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * pop_front()
Pop an element off the stack.
std::thread thread_
Definition Workers.h:188
Worker(Workers &workers, std::string const &threadName, int const instance)
Definition Workers.cpp:141
Workers is effectively a thread pool.
Definition Workers.h:62
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition Workers.cpp:44
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition Workers.cpp:8
void stop()
Pause all threads and wait until they are paused.
Definition Workers.cpp:95
std::atomic< int > m_runningTaskCount
Definition Workers.h:211
std::mutex m_mut
Definition Workers.h:204
std::condition_variable m_cv
Definition Workers.h:203
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition Workers.cpp:115
bool m_allPaused
Definition Workers.h:205
void addTask()
Add a task to be performed.
Definition Workers.cpp:109
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition Workers.h:214
std::atomic< int > m_pauseCount
Definition Workers.h:209
perf::PerfLog * perfLog_
Definition Workers.h:201
semaphore m_semaphore
Definition Workers.h:206
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition Workers.cpp:121
int m_numberOfThreads
Definition Workers.h:207
std::string m_threadNames
Definition Workers.h:202
beast::LockFreeStack< Worker > m_everyone
Definition Workers.h:212
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:34
void notify()
Increment the count and unblock one waiting thread.
Definition semaphore.h:57
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:32
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
T join(T... args)
T load(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
Called to perform tasks as needed.
Definition Workers.h:66