1#include <xrpl/beast/core/CurrentThreadName.h>
2#include <xrpl/core/PerfLog.h>
3#include <xrpl/core/detail/Workers.h>
12 : m_callback(callback)
14 , m_threadNames(threadNames)
18 , m_runningTaskCount(0)
43 static int instance{0};
55 for (
int i = 0; i < amount; ++i)
60 if (worker !=
nullptr)
79 for (
int i = 0; i < amount; ++i)
124 if (worker !=
nullptr)
139 : m_workers{workers}, threadName_{threadName}, instance_{instance}
153 wakeup_.notify_one();
162 wakeup_.notify_one();
168 bool shouldExit =
true;
174 if (++m_workers.m_activeCount == 1)
177 m_workers.m_allPaused =
false;
187 m_workers.m_semaphore.wait();
192 int pauseCount = m_workers.m_pauseCount.load();
197 pauseCount = --m_workers.m_pauseCount;
206 ++m_workers.m_pauseCount;
212 ++m_workers.m_runningTaskCount;
213 m_workers.m_callback.processTask(instance_);
221 if (--m_workers.m_runningTaskCount == 0)
224 m_workers.m_cv.notify_all();
232 m_workers.m_paused.push_front(
this);
237 if (--m_workers.m_activeCount == 0)
240 m_workers.m_allPaused =
true;
241 m_workers.m_cv.notify_all();
255 wakeup_.wait(lock, [
this] {
return this->wakeCount_ > 0; });
257 shouldExit = shouldExit_;
260 }
while (!shouldExit);
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * pop_front()
Pop an element off the stack.
Worker(Workers &workers, std::string const &threadName, int const instance)
Workers is effectively a thread pool.
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
void stop()
Pause all threads and wait until they are paused.
std::atomic< int > m_runningTaskCount
std::condition_variable m_cv
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
void addTask()
Add a task to be performed.
beast::LockFreeStack< Worker, PausedTag > m_paused
std::atomic< int > m_pauseCount
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
std::string m_threadNames
beast::LockFreeStack< Worker > m_everyone
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
void notify()
Increment the count and unblock one waiting thread.
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Called to perform tasks as needed.