1#include <xrpl/beast/core/CurrentThreadName.h>
2#include <xrpl/beast/utility/instrumentation.h>
3#include <xrpl/core/PerfLog.h>
4#include <xrpl/core/detail/Workers.h>
13 : m_callback(callback)
15 , m_threadNames(threadNames)
18 , m_numberOfThreads(0)
21 , m_runningTaskCount(0)
46 static int instance{0};
58 for (
int i = 0; i < amount; ++i)
63 if (worker !=
nullptr)
82 for (
int i = 0; i < amount; ++i)
105 "xrpl::Workers::stop : zero running tasks");
127 if (worker !=
nullptr)
146 , threadName_{threadName}
147 , instance_{instance}
162 wakeup_.notify_one();
171 wakeup_.notify_one();
177 bool shouldExit =
true;
183 if (++m_workers.m_activeCount == 1)
186 m_workers.m_allPaused =
false;
196 m_workers.m_semaphore.wait();
201 int pauseCount = m_workers.m_pauseCount.load();
206 pauseCount = --m_workers.m_pauseCount;
216 ++m_workers.m_pauseCount;
223 ++m_workers.m_runningTaskCount;
224 m_workers.m_callback.processTask(instance_);
225 --m_workers.m_runningTaskCount;
232 m_workers.m_paused.push_front(
this);
237 if (--m_workers.m_activeCount == 0)
240 m_workers.m_allPaused =
true;
241 m_workers.m_cv.notify_all();
255 wakeup_.wait(lock, [
this] {
return this->wakeCount_ > 0; });
257 shouldExit = shouldExit_;
260 }
while (!shouldExit);
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * pop_front()
Pop an element off the stack.
Worker(Workers &workers, std::string const &threadName, int const instance)
Workers is effectively a thread pool.
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
void stop()
Pause all threads and wait until they are paused.
std::atomic< int > m_runningTaskCount
std::condition_variable m_cv
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
void addTask()
Add a task to be performed.
beast::LockFreeStack< Worker, PausedTag > m_paused
std::atomic< int > m_pauseCount
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
std::string m_threadNames
beast::LockFreeStack< Worker > m_everyone
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
void notify()
Increment the count and unblock one waiting thread.
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Called to perform tasks as needed.