rippled
Loading...
Searching...
No Matches
JobQueue.h
1#pragma once
2
3#include <xrpl/basics/LocalValue.h>
4#include <xrpl/core/ClosureCounter.h>
5#include <xrpl/core/JobTypeData.h>
6#include <xrpl/core/JobTypes.h>
7#include <xrpl/core/detail/Workers.h>
8#include <xrpl/json/json_value.h>
9
10#include <boost/context/protected_fixedsize_stack.hpp>
11#include <boost/coroutine2/all.hpp>
12
13#include <set>
14
15namespace xrpl {
16
17namespace perf {
18class PerfLog;
19} // namespace perf
20
21class Logs;
23{
24 explicit Coro_create_t() = default;
25};
26
38{
39public:
42 {
43 private:
48 bool running_{false};
52 boost::coroutines2::coroutine<void>::pull_type coro_;
53 boost::coroutines2::coroutine<void>::push_type* yield_;
54#ifndef NDEBUG
55 bool finished_ = false;
56#endif
57
58 public:
59 // Private: Used in the implementation
60 template <class F>
62
63 // Not copy-constructible or assignable
64 Coro(Coro const&) = delete;
65 Coro&
66 operator=(Coro const&) = delete;
67
69
79 void
80 yield() const;
81
95 bool
97
107 void
109
111 bool
112 runnable() const;
113
115 void
117
119 void
121 };
122
123 using JobFunction = std::function<void()>;
124
125 JobQueue(
126 int threadCount,
127 beast::insight::Collector::ptr const& collector,
128 beast::Journal journal,
129 Logs& logs,
130 perf::PerfLog& perfLog);
131 ~JobQueue();
132
142 template <
143 typename JobHandler,
144 typename =
146 bool
147 addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
148 {
149 if (auto optionalCountedJob = jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
150 {
151 return addRefCountedJob(type, name, std::move(*optionalCountedJob));
152 }
153 return false;
154 }
155
165 template <class F>
167 postCoro(JobType t, std::string const& name, F&& f);
168
171 int
172 getJobCount(JobType t) const;
173
176 int
177 getJobCountTotal(JobType t) const;
178
181 int
182 getJobCountGE(JobType t) const;
183
187 makeLoadEvent(JobType t, std::string const& name);
188
191 void
192 addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed);
193
194 // Cannot be const because LoadMonitor has no const methods.
195 bool
196 isOverloaded();
197
198 // Cannot be const because LoadMonitor has no const methods.
200 getJson(int c = 0);
201
203 void
204 rendezvous();
205
206 void
207 stop();
208
209 bool
211 {
212 return stopping_;
213 }
214
215 // We may be able to move away from this, but we can keep it during the
216 // transition.
217 bool
218 isStopped() const;
219
220private:
221 friend class Coro;
222
224
234
235 // The number of jobs currently in processTask()
237
238 // The number of suspended coroutines
239 int nSuspend_ = 0;
240
242
243 // Statistics tracking
248
250
251 void
252 collect();
255
256 // Adds a reference counted job to the JobQueue.
257 //
258 // param type The type of job.
259 // param name Name of the job.
260 // param func std::function with signature void (Job&). Called when the
261 // job is executed.
262 //
263 // return true if func added to queue.
264 bool
265 addRefCountedJob(JobType type, std::string const& name, JobFunction const& func);
266
267 // Returns the next Job we should run now.
268 //
269 // RunnableJob:
270 // A Job in the JobSet whose slots count for its type is greater than zero.
271 //
272 // Pre-conditions:
273 // mJobSet must not be empty.
274 // mJobSet holds at least one RunnableJob
275 //
276 // Post-conditions:
277 // job is a valid Job object.
278 // job is removed from mJobQueue.
279 // Waiting job count of its type is decremented
280 // Running job count of its type is incremented
281 //
282 // Invariants:
283 // The calling thread owns the JobLock
284 void
285 getNextJob(Job& job);
286
287 // Indicates that a running Job has completed its task.
288 //
289 // Pre-conditions:
290 // Job must not exist in mJobSet.
291 // The JobType must not be invalid.
292 //
293 // Post-conditions:
294 // The running count of that JobType is decremented
295 // A new task is signaled if there are more waiting Jobs than the limit, if
296 // any.
297 //
298 // Invariants:
299 // <none>
300 void
301 finishJob(JobType type);
302
303 // Runs the next appropriate waiting Job.
304 //
305 // Pre-conditions:
306 // A RunnableJob must exist in the JobSet
307 //
308 // Post-conditions:
309 // The chosen RunnableJob will have Job::doJob() called.
310 //
311 // Invariants:
312 // <none>
313 void
314 processTask(int instance) override;
315
316 // Returns the limit of running jobs for the given job type.
317 // For jobs with no limit, we return the largest int. Hopefully that
318 // will be enough.
319 static int
320 getJobLimit(JobType type);
321};
322
323/*
324 An RPC command is received and is handled via ServerHandler(HTTP) or
325 Handler(websocket), depending on the connection type. The handler then calls
326 the JobQueue::postCoro() method to create a coroutine and run it at a later
327 point. This frees up the handler thread and allows it to continue handling
328 other requests while the RPC command completes its work asynchronously.
329
330 postCoro() creates a Coro object. When the Coro ctor is called, and its
331 coro_ member is initialized (a boost::coroutines::pull_type), execution
332 automatically passes to the coroutine, which we don't want at this point,
333 since we are still in the handler thread context. It's important to note
334 here that construction of a boost pull_type automatically passes execution to
335 the coroutine. A pull_type object automatically generates a push_type that is
336 passed as a parameter (do_yield) in the signature of the function the
337 pull_type was created with. This function is immediately called during coro_
338 construction and within it, Coro::yield_ is assigned the push_type
339 parameter (do_yield) address and called (yield()) so we can return execution
340 back to the caller's stack.
341
342 postCoro() then calls Coro::post(), which schedules a job on the job
343 queue to continue execution of the coroutine in a JobQueue worker thread at
344 some later time. When the job runs, we lock on the Coro::mutex_ and call
345 coro_ which continues where we had left off. Since we the last thing we did
346 in coro_ was call yield(), the next thing we continue with is calling the
347 function param f, that was passed into Coro ctor. It is within this
348 function body that the caller specifies what he would like to do while
349 running in the coroutine and allow them to suspend and resume execution.
350 A task that relies on other events to complete, such as path finding, calls
351 Coro::yield() to suspend its execution while waiting on those events to
352 complete and continue when signaled via the Coro::post() method.
353
354 There is a potential race condition that exists here where post() can get
355 called before yield() after f is called. Technically the problem only occurs
356 if the job that post() scheduled is executed before yield() is called.
357 If the post() job were to be executed before yield(), undefined behavior
358 would occur. The lock ensures that coro_ is not called again until we exit
359 the coroutine. At which point a scheduled resume() job waiting on the lock
360 would gain entry. resume() checks if the coroutine has already completed
361 (coro_ converts to false) and, if so, skips invoking operator() since
362 calling operator() on a completed boost::coroutine2 pull_type is undefined
363 behavior.
364
365 The race condition occurs as follows:
366
367 1- The coroutine is running.
368 2- The coroutine is about to suspend, but before it can do so, it must
369 arrange for some event to wake it up.
370 3- The coroutine arranges for some event to wake it up.
371 4- Before the coroutine can suspend, that event occurs and the
372 resumption of the coroutine is scheduled on the job queue. 5- Again, before
373 the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
374 Again, before the coroutine can suspend, the resumption code runs the
375 coroutine.
376 The coroutine is now running in two threads.
377
378 The lock prevents this from happening as step 6 will block until the
379 lock is released which only happens after the coroutine completes.
380*/
381
382} // namespace xrpl
383
384#include <xrpl/core/Coro.ipp>
385
386namespace xrpl {
387
388template <class F>
391{
392 /* First param is a detail type to make construction private.
393 Last param is the function the coroutine runs. Signature of
394 void(std::shared_ptr<Coro>).
395 */
396 auto coro = std::make_shared<Coro>(Coro_create_t{}, *this, t, name, std::forward<F>(f));
397 if (!coro->post())
398 {
399 // The Coro was not successfully posted. Disable it so it's destructor
400 // can run with no negative side effects. Then destroy it.
401 coro->expectEarlyExit();
402 coro.reset();
403 }
404 return coro;
405}
406
407} // namespace xrpl
Represents a JSON value.
Definition json_value.h:130
A generic endpoint for log messages.
Definition Journal.h:40
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
Coroutines must run to completion.
Definition JobQueue.h:42
boost::coroutines2::coroutine< void >::pull_type coro_
Definition JobQueue.h:52
Coro(Coro const &)=delete
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
std::mutex mutex_
Definition JobQueue.h:49
boost::coroutines2::coroutine< void >::push_type * yield_
Definition JobQueue.h:53
bool post()
Schedule coroutine execution.
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
void yield() const
Suspend coroutine execution.
std::mutex mutex_run_
Definition JobQueue.h:50
std::string name_
Definition JobQueue.h:47
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
std::condition_variable cv_
Definition JobQueue.h:51
void join()
Waits until coroutine returns from the user function.
Coro & operator=(Coro const &)=delete
void resume()
Resume coroutine execution.
detail::LocalValues lvs_
Definition JobQueue.h:44
A pool of threads to perform work.
Definition JobQueue.h:38
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:125
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:329
beast::Journal m_journal
Definition JobQueue.h:225
std::mutex m_mutex
Definition JobQueue.h:226
JobCounter jobCounter_
Definition JobQueue.h:229
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:237
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:390
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:115
JobTypeData m_invalidJobData
Definition JobQueue.h:233
beast::insight::Gauge job_count
Definition JobQueue.h:246
bool isStopped() const
Definition JobQueue.cpp:272
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:230
Workers m_workers
Definition JobQueue.h:241
std::atomic_bool stopping_
Definition JobQueue.h:230
JobDataMap m_jobData
Definition JobQueue.h:232
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:105
std::set< Job > m_jobSet
Definition JobQueue.h:228
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:58
bool isOverloaded()
Definition JobQueue.cpp:165
std::atomic_bool stopped_
Definition JobQueue.h:231
bool isStopping() const
Definition JobQueue.h:210
std::condition_variable cv_
Definition JobQueue.h:249
static int getJobLimit(JobType type)
Definition JobQueue.cpp:380
std::uint64_t m_lastJob
Definition JobQueue.h:227
beast::insight::Hook hook
Definition JobQueue.h:247
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:154
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:142
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:245
perf::PerfLog & perfLog_
Definition JobQueue.h:244
void finishJob(JobType type)
Definition JobQueue.cpp:308
Json::Value getJson(int c=0)
Definition JobQueue.cpp:173
void getNextJob(Job &job)
Definition JobQueue.cpp:278
Manages partitions for logging.
Definition Log.h:32
Workers is effectively a thread pool.
Definition Workers.h:61
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
JobType
Definition Job.h:14
T reset(T... args)
Coro_create_t()=default
Called to perform tasks as needed.
Definition Workers.h:65