1#include <xrpld/core/detail/Workers.h> 
    2#include <xrpld/perflog/PerfLog.h> 
    4#include <xrpl/beast/core/CurrentThreadName.h> 
    5#include <xrpl/beast/utility/instrumentation.h> 
   14    : m_callback(callback)
 
   16    , m_threadNames(threadNames)
 
   19    , m_numberOfThreads(0)
 
   22    , m_runningTaskCount(0)
 
 
   47    static int instance{0};
 
   59        for (
int i = 0; i < amount; ++i)
 
   64            if (worker != 
nullptr)
 
   83        for (
int i = 0; i < amount; ++i)
 
 
  106        "ripple::Workers::stop : zero running tasks");
 
 
  128        if (worker != 
nullptr)
 
 
  147    , threadName_{threadName}
 
  148    , instance_{instance}
 
 
  163    wakeup_.notify_one();
 
 
  172    wakeup_.notify_one();
 
 
  178    bool shouldExit = 
true;
 
  184        if (++m_workers.m_activeCount == 1)
 
  187            m_workers.m_allPaused = 
false;
 
  197            m_workers.m_semaphore.wait();
 
  202            int pauseCount = m_workers.m_pauseCount.load();
 
  207                pauseCount = --m_workers.m_pauseCount;
 
  217                    ++m_workers.m_pauseCount;
 
  224            ++m_workers.m_runningTaskCount;
 
  225            m_workers.m_callback.processTask(instance_);
 
  226            --m_workers.m_runningTaskCount;
 
  233        m_workers.m_paused.push_front(
this);
 
  238        if (--m_workers.m_activeCount == 0)
 
  241            m_workers.m_allPaused = 
true;
 
  242            m_workers.m_cv.notify_all();
 
  256            wakeup_.wait(lock, [
this] { 
return this->wakeCount_ > 0; });
 
  258            shouldExit = shouldExit_;
 
  261    } 
while (!shouldExit);
 
 
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
 
Element * pop_front()
Pop an element off the stack.
 
Worker(Workers &workers, std::string const &threadName, int const instance)
 
Workers is effectively a thread pool.
 
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
 
std::condition_variable m_cv
 
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
 
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
 
beast::LockFreeStack< Worker, PausedTag > m_paused
 
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
 
void addTask()
Add a task to be performed.
 
std::string m_threadNames
 
beast::LockFreeStack< Worker > m_everyone
 
std::atomic< int > m_pauseCount
 
void stop()
Pause all threads and wait until they are paused.
 
std::atomic< int > m_runningTaskCount
 
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
 
void notify()
Increment the count and unblock one waiting thread.
 
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
 
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
 
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
Called to perform tasks as needed.