rippled
Loading...
Searching...
No Matches
JobQueue.h
1#ifndef XRPL_CORE_JOBQUEUE_H_INCLUDED
2#define XRPL_CORE_JOBQUEUE_H_INCLUDED
3
4#include <xrpld/core/ClosureCounter.h>
5#include <xrpld/core/JobTypeData.h>
6#include <xrpld/core/JobTypes.h>
7#include <xrpld/core/detail/Workers.h>
8
9#include <xrpl/basics/LocalValue.h>
10#include <xrpl/json/json_value.h>
11
12#include <boost/coroutine/all.hpp>
13
14#include <set>
15
16namespace ripple {
17
18namespace perf {
19class PerfLog;
20}
21
22class Logs;
24{
25 explicit Coro_create_t() = default;
26};
27
39{
40public:
43 {
44 private:
53 boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
54 boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
55#ifndef NDEBUG
56 bool finished_ = false;
57#endif
58
59 public:
60 // Private: Used in the implementation
61 template <class F>
63
64 // Not copy-constructible or assignable
65 Coro(Coro const&) = delete;
66 Coro&
67 operator=(Coro const&) = delete;
68
70
80 void
81 yield() const;
82
96 bool
98
108 void
110
112 bool
113 runnable() const;
114
116 void
118
120 void
122 };
123
124 using JobFunction = std::function<void()>;
125
126 JobQueue(
127 int threadCount,
128 beast::insight::Collector::ptr const& collector,
129 beast::Journal journal,
130 Logs& logs,
131 perf::PerfLog& perfLog);
132 ~JobQueue();
133
143 template <
144 typename JobHandler,
146 decltype(std::declval<JobHandler&&>()()),
147 void>::value>>
148 bool
149 addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
150 {
151 if (auto optionalCountedJob =
153 {
154 return addRefCountedJob(type, name, std::move(*optionalCountedJob));
155 }
156 return false;
157 }
158
168 template <class F>
170 postCoro(JobType t, std::string const& name, F&& f);
171
174 int
175 getJobCount(JobType t) const;
176
179 int
180 getJobCountTotal(JobType t) const;
181
184 int
185 getJobCountGE(JobType t) const;
186
190 makeLoadEvent(JobType t, std::string const& name);
191
194 void
195 addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed);
196
197 // Cannot be const because LoadMonitor has no const methods.
198 bool
199 isOverloaded();
200
201 // Cannot be const because LoadMonitor has no const methods.
203 getJson(int c = 0);
204
206 void
207 rendezvous();
208
209 void
210 stop();
211
212 bool
214 {
215 return stopping_;
216 }
217
218 // We may be able to move away from this, but we can keep it during the
219 // transition.
220 bool
221 isStopped() const;
222
223private:
224 friend class Coro;
225
227
237
238 // The number of jobs currently in processTask()
240
241 // The number of suspended coroutines
242 int nSuspend_ = 0;
243
245
246 // Statistics tracking
251
253
254 void
255 collect();
258
259 // Adds a reference counted job to the JobQueue.
260 //
261 // param type The type of job.
262 // param name Name of the job.
263 // param func std::function with signature void (Job&). Called when the
264 // job is executed.
265 //
266 // return true if func added to queue.
267 bool
269 JobType type,
270 std::string const& name,
271 JobFunction const& func);
272
273 // Returns the next Job we should run now.
274 //
275 // RunnableJob:
276 // A Job in the JobSet whose slots count for its type is greater than zero.
277 //
278 // Pre-conditions:
279 // mJobSet must not be empty.
280 // mJobSet holds at least one RunnableJob
281 //
282 // Post-conditions:
283 // job is a valid Job object.
284 // job is removed from mJobQueue.
285 // Waiting job count of its type is decremented
286 // Running job count of its type is incremented
287 //
288 // Invariants:
289 // The calling thread owns the JobLock
290 void
291 getNextJob(Job& job);
292
293 // Indicates that a running Job has completed its task.
294 //
295 // Pre-conditions:
296 // Job must not exist in mJobSet.
297 // The JobType must not be invalid.
298 //
299 // Post-conditions:
300 // The running count of that JobType is decremented
301 // A new task is signaled if there are more waiting Jobs than the limit, if
302 // any.
303 //
304 // Invariants:
305 // <none>
306 void
307 finishJob(JobType type);
308
309 // Runs the next appropriate waiting Job.
310 //
311 // Pre-conditions:
312 // A RunnableJob must exist in the JobSet
313 //
314 // Post-conditions:
315 // The chosen RunnableJob will have Job::doJob() called.
316 //
317 // Invariants:
318 // <none>
319 void
320 processTask(int instance) override;
321
322 // Returns the limit of running jobs for the given job type.
323 // For jobs with no limit, we return the largest int. Hopefully that
324 // will be enough.
325 int
326 getJobLimit(JobType type);
327};
328
329/*
330 An RPC command is received and is handled via ServerHandler(HTTP) or
331 Handler(websocket), depending on the connection type. The handler then calls
332 the JobQueue::postCoro() method to create a coroutine and run it at a later
333 point. This frees up the handler thread and allows it to continue handling
334 other requests while the RPC command completes its work asynchronously.
335
336 postCoro() creates a Coro object. When the Coro ctor is called, and its
337 coro_ member is initialized (a boost::coroutines::pull_type), execution
338 automatically passes to the coroutine, which we don't want at this point,
339 since we are still in the handler thread context. It's important to note
340 here that construction of a boost pull_type automatically passes execution to
341 the coroutine. A pull_type object automatically generates a push_type that is
342 passed as a parameter (do_yield) in the signature of the function the
343 pull_type was created with. This function is immediately called during coro_
344 construction and within it, Coro::yield_ is assigned the push_type
345 parameter (do_yield) address and called (yield()) so we can return execution
346 back to the caller's stack.
347
348 postCoro() then calls Coro::post(), which schedules a job on the job
349 queue to continue execution of the coroutine in a JobQueue worker thread at
350 some later time. When the job runs, we lock on the Coro::mutex_ and call
351 coro_ which continues where we had left off. Since we the last thing we did
352 in coro_ was call yield(), the next thing we continue with is calling the
353 function param f, that was passed into Coro ctor. It is within this
354 function body that the caller specifies what he would like to do while
355 running in the coroutine and allow them to suspend and resume execution.
356 A task that relies on other events to complete, such as path finding, calls
357 Coro::yield() to suspend its execution while waiting on those events to
358 complete and continue when signaled via the Coro::post() method.
359
360 There is a potential race condition that exists here where post() can get
361 called before yield() after f is called. Technically the problem only occurs
362 if the job that post() scheduled is executed before yield() is called.
363 If the post() job were to be executed before yield(), undefined behavior
364 would occur. The lock ensures that coro_ is not called again until we exit
365 the coroutine. At which point a scheduled resume() job waiting on the lock
366 would gain entry, harmlessly call coro_ and immediately return as we have
367 already completed the coroutine.
368
369 The race condition occurs as follows:
370
371 1- The coroutine is running.
372 2- The coroutine is about to suspend, but before it can do so, it must
373 arrange for some event to wake it up.
374 3- The coroutine arranges for some event to wake it up.
375 4- Before the coroutine can suspend, that event occurs and the
376 resumption of the coroutine is scheduled on the job queue. 5- Again, before
377 the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
378 Again, before the coroutine can suspend, the resumption code runs the
379 coroutine.
380 The coroutine is now running in two threads.
381
382 The lock prevents this from happening as step 6 will block until the
383 lock is released which only happens after the coroutine completes.
384*/
385
386} // namespace ripple
387
388#include <xrpld/core/Coro.ipp>
389
390namespace ripple {
391
392template <class F>
395{
396 /* First param is a detail type to make construction private.
397 Last param is the function the coroutine runs. Signature of
398 void(std::shared_ptr<Coro>).
399 */
400 auto coro = std::make_shared<Coro>(
401 Coro_create_t{}, *this, t, name, std::forward<F>(f));
402 if (!coro->post())
403 {
404 // The Coro was not successfully posted. Disable it so it's destructor
405 // can run with no negative side effects. Then destroy it.
406 coro->expectEarlyExit();
407 coro.reset();
408 }
409 return coro;
410}
411
412} // namespace ripple
413
414#endif
Represents a JSON value.
Definition json_value.h:130
A generic endpoint for log messages.
Definition Journal.h:41
A metric for measuring an integral value.
Definition Gauge.h:21
A reference to a handler for performing polled collection.
Definition Hook.h:13
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
Coroutines must run to completion.
Definition JobQueue.h:43
void join()
Waits until coroutine returns from the user function.
std::string name_
Definition JobQueue.h:48
std::mutex mutex_run_
Definition JobQueue.h:51
void resume()
Resume coroutine execution.
std::condition_variable cv_
Definition JobQueue.h:52
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
bool post()
Schedule coroutine execution.
detail::LocalValues lvs_
Definition JobQueue.h:45
Coro & operator=(Coro const &)=delete
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
boost::coroutines::asymmetric_coroutine< void >::push_type * yield_
Definition JobQueue.h:54
void yield() const
Suspend coroutine execution.
boost::coroutines::asymmetric_coroutine< void >::pull_type coro_
Definition JobQueue.h:53
Coro(Coro const &)=delete
A pool of threads to perform work.
Definition JobQueue.h:39
JobTypeData m_invalidJobData
Definition JobQueue.h:236
int getJobLimit(JobType type)
Definition JobQueue.cpp:422
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:394
std::atomic_bool stopped_
Definition JobQueue.h:234
std::uint64_t m_lastJob
Definition JobQueue.h:230
JobDataMap m_jobData
Definition JobQueue.h:235
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:254
JobCounter jobCounter_
Definition JobQueue.h:232
bool isStopping() const
Definition JobQueue.h:213
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:133
bool isStopped() const
Definition JobQueue.cpp:303
perf::PerfLog & perfLog_
Definition JobQueue.h:247
Workers m_workers
Definition JobQueue.h:244
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:261
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:143
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:174
Json::Value getJson(int c=0)
Definition JobQueue.cpp:195
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:248
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:63
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:160
std::set< Job > m_jobSet
Definition JobQueue.h:231
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:123
beast::insight::Hook hook
Definition JobQueue.h:250
beast::Journal m_journal
Definition JobQueue.h:228
std::mutex m_mutex
Definition JobQueue.h:229
beast::insight::Gauge job_count
Definition JobQueue.h:249
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:369
std::atomic_bool stopping_
Definition JobQueue.h:233
void finishJob(JobType type)
Definition JobQueue.cpp:346
void getNextJob(Job &job)
Definition JobQueue.cpp:309
std::condition_variable cv_
Definition JobQueue.h:252
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
Manages partitions for logging.
Definition Log.h:33
Workers is effectively a thread pool.
Definition Workers.h:63
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:33
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
JobType
Definition Job.h:16
T reset(T... args)
Called to perform tasks as needed.
Definition Workers.h:67