rippled
Loading...
Searching...
No Matches
JobQueue.h
1#ifndef XRPL_CORE_JOBQUEUE_H_INCLUDED
2#define XRPL_CORE_JOBQUEUE_H_INCLUDED
3
4#include <xrpl/basics/LocalValue.h>
5#include <xrpl/core/ClosureCounter.h>
6#include <xrpl/core/JobTypeData.h>
7#include <xrpl/core/JobTypes.h>
8#include <xrpl/core/detail/Workers.h>
9#include <xrpl/json/json_value.h>
10
11#include <boost/coroutine/all.hpp>
12
13#include <set>
14
15namespace xrpl {
16
17namespace perf {
18class PerfLog;
19}
20
21class Logs;
23{
24 explicit Coro_create_t() = default;
25};
26
38{
39public:
42 {
43 private:
52 boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
53 boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
54#ifndef NDEBUG
55 bool finished_ = false;
56#endif
57
58 public:
59 // Private: Used in the implementation
60 template <class F>
62
63 // Not copy-constructible or assignable
64 Coro(Coro const&) = delete;
65 Coro&
66 operator=(Coro const&) = delete;
67
69
79 void
80 yield() const;
81
95 bool
97
107 void
109
111 bool
112 runnable() const;
113
115 void
117
119 void
121 };
122
123 using JobFunction = std::function<void()>;
124
125 JobQueue(
126 int threadCount,
127 beast::insight::Collector::ptr const& collector,
128 beast::Journal journal,
129 Logs& logs,
130 perf::PerfLog& perfLog);
131 ~JobQueue();
132
142 template <
143 typename JobHandler,
145 decltype(std::declval<JobHandler&&>()()),
146 void>::value>>
147 bool
148 addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
149 {
150 if (auto optionalCountedJob =
152 {
153 return addRefCountedJob(type, name, std::move(*optionalCountedJob));
154 }
155 return false;
156 }
157
167 template <class F>
169 postCoro(JobType t, std::string const& name, F&& f);
170
173 int
174 getJobCount(JobType t) const;
175
178 int
179 getJobCountTotal(JobType t) const;
180
183 int
184 getJobCountGE(JobType t) const;
185
189 makeLoadEvent(JobType t, std::string const& name);
190
193 void
194 addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed);
195
196 // Cannot be const because LoadMonitor has no const methods.
197 bool
198 isOverloaded();
199
200 // Cannot be const because LoadMonitor has no const methods.
202 getJson(int c = 0);
203
205 void
206 rendezvous();
207
208 void
209 stop();
210
211 bool
213 {
214 return stopping_;
215 }
216
217 // We may be able to move away from this, but we can keep it during the
218 // transition.
219 bool
220 isStopped() const;
221
222private:
223 friend class Coro;
224
226
236
237 // The number of jobs currently in processTask()
239
240 // The number of suspended coroutines
241 int nSuspend_ = 0;
242
244
245 // Statistics tracking
250
252
253 void
254 collect();
257
258 // Adds a reference counted job to the JobQueue.
259 //
260 // param type The type of job.
261 // param name Name of the job.
262 // param func std::function with signature void (Job&). Called when the
263 // job is executed.
264 //
265 // return true if func added to queue.
266 bool
268 JobType type,
269 std::string const& name,
270 JobFunction const& func);
271
272 // Returns the next Job we should run now.
273 //
274 // RunnableJob:
275 // A Job in the JobSet whose slots count for its type is greater than zero.
276 //
277 // Pre-conditions:
278 // mJobSet must not be empty.
279 // mJobSet holds at least one RunnableJob
280 //
281 // Post-conditions:
282 // job is a valid Job object.
283 // job is removed from mJobQueue.
284 // Waiting job count of its type is decremented
285 // Running job count of its type is incremented
286 //
287 // Invariants:
288 // The calling thread owns the JobLock
289 void
290 getNextJob(Job& job);
291
292 // Indicates that a running Job has completed its task.
293 //
294 // Pre-conditions:
295 // Job must not exist in mJobSet.
296 // The JobType must not be invalid.
297 //
298 // Post-conditions:
299 // The running count of that JobType is decremented
300 // A new task is signaled if there are more waiting Jobs than the limit, if
301 // any.
302 //
303 // Invariants:
304 // <none>
305 void
306 finishJob(JobType type);
307
308 // Runs the next appropriate waiting Job.
309 //
310 // Pre-conditions:
311 // A RunnableJob must exist in the JobSet
312 //
313 // Post-conditions:
314 // The chosen RunnableJob will have Job::doJob() called.
315 //
316 // Invariants:
317 // <none>
318 void
319 processTask(int instance) override;
320
321 // Returns the limit of running jobs for the given job type.
322 // For jobs with no limit, we return the largest int. Hopefully that
323 // will be enough.
324 int
325 getJobLimit(JobType type);
326};
327
328/*
329 An RPC command is received and is handled via ServerHandler(HTTP) or
330 Handler(websocket), depending on the connection type. The handler then calls
331 the JobQueue::postCoro() method to create a coroutine and run it at a later
332 point. This frees up the handler thread and allows it to continue handling
333 other requests while the RPC command completes its work asynchronously.
334
335 postCoro() creates a Coro object. When the Coro ctor is called, and its
336 coro_ member is initialized (a boost::coroutines::pull_type), execution
337 automatically passes to the coroutine, which we don't want at this point,
338 since we are still in the handler thread context. It's important to note
339 here that construction of a boost pull_type automatically passes execution to
340 the coroutine. A pull_type object automatically generates a push_type that is
341 passed as a parameter (do_yield) in the signature of the function the
342 pull_type was created with. This function is immediately called during coro_
343 construction and within it, Coro::yield_ is assigned the push_type
344 parameter (do_yield) address and called (yield()) so we can return execution
345 back to the caller's stack.
346
347 postCoro() then calls Coro::post(), which schedules a job on the job
348 queue to continue execution of the coroutine in a JobQueue worker thread at
349 some later time. When the job runs, we lock on the Coro::mutex_ and call
350 coro_ which continues where we had left off. Since we the last thing we did
351 in coro_ was call yield(), the next thing we continue with is calling the
352 function param f, that was passed into Coro ctor. It is within this
353 function body that the caller specifies what he would like to do while
354 running in the coroutine and allow them to suspend and resume execution.
355 A task that relies on other events to complete, such as path finding, calls
356 Coro::yield() to suspend its execution while waiting on those events to
357 complete and continue when signaled via the Coro::post() method.
358
359 There is a potential race condition that exists here where post() can get
360 called before yield() after f is called. Technically the problem only occurs
361 if the job that post() scheduled is executed before yield() is called.
362 If the post() job were to be executed before yield(), undefined behavior
363 would occur. The lock ensures that coro_ is not called again until we exit
364 the coroutine. At which point a scheduled resume() job waiting on the lock
365 would gain entry, harmlessly call coro_ and immediately return as we have
366 already completed the coroutine.
367
368 The race condition occurs as follows:
369
370 1- The coroutine is running.
371 2- The coroutine is about to suspend, but before it can do so, it must
372 arrange for some event to wake it up.
373 3- The coroutine arranges for some event to wake it up.
374 4- Before the coroutine can suspend, that event occurs and the
375 resumption of the coroutine is scheduled on the job queue. 5- Again, before
376 the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
377 Again, before the coroutine can suspend, the resumption code runs the
378 coroutine.
379 The coroutine is now running in two threads.
380
381 The lock prevents this from happening as step 6 will block until the
382 lock is released which only happens after the coroutine completes.
383*/
384
385} // namespace xrpl
386
387#include <xrpl/core/Coro.ipp>
388
389namespace xrpl {
390
391template <class F>
394{
395 /* First param is a detail type to make construction private.
396 Last param is the function the coroutine runs. Signature of
397 void(std::shared_ptr<Coro>).
398 */
399 auto coro = std::make_shared<Coro>(
400 Coro_create_t{}, *this, t, name, std::forward<F>(f));
401 if (!coro->post())
402 {
403 // The Coro was not successfully posted. Disable it so it's destructor
404 // can run with no negative side effects. Then destroy it.
405 coro->expectEarlyExit();
406 coro.reset();
407 }
408 return coro;
409}
410
411} // namespace xrpl
412
413#endif
Represents a JSON value.
Definition json_value.h:131
A generic endpoint for log messages.
Definition Journal.h:41
A metric for measuring an integral value.
Definition Gauge.h:21
A reference to a handler for performing polled collection.
Definition Hook.h:13
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
Coroutines must run to completion.
Definition JobQueue.h:42
Coro(Coro const &)=delete
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
std::mutex mutex_
Definition JobQueue.h:49
bool post()
Schedule coroutine execution.
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
void yield() const
Suspend coroutine execution.
std::mutex mutex_run_
Definition JobQueue.h:50
std::string name_
Definition JobQueue.h:47
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
std::condition_variable cv_
Definition JobQueue.h:51
boost::coroutines::asymmetric_coroutine< void >::push_type * yield_
Definition JobQueue.h:53
boost::coroutines::asymmetric_coroutine< void >::pull_type coro_
Definition JobQueue.h:52
void join()
Waits until coroutine returns from the user function.
Coro & operator=(Coro const &)=delete
void resume()
Resume coroutine execution.
detail::LocalValues lvs_
Definition JobQueue.h:44
A pool of threads to perform work.
Definition JobQueue.h:38
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:141
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:365
beast::Journal m_journal
Definition JobQueue.h:227
std::mutex m_mutex
Definition JobQueue.h:228
JobCounter jobCounter_
Definition JobQueue.h:231
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:259
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:393
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:131
JobTypeData m_invalidJobData
Definition JobQueue.h:235
beast::insight::Gauge job_count
Definition JobQueue.h:248
bool isStopped() const
Definition JobQueue.cpp:301
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:252
Workers m_workers
Definition JobQueue.h:243
std::atomic_bool stopping_
Definition JobQueue.h:232
JobDataMap m_jobData
Definition JobQueue.h:234
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:148
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:121
std::set< Job > m_jobSet
Definition JobQueue.h:230
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:61
bool isOverloaded()
Definition JobQueue.cpp:185
std::atomic_bool stopped_
Definition JobQueue.h:233
bool isStopping() const
Definition JobQueue.h:212
std::condition_variable cv_
Definition JobQueue.h:251
int getJobLimit(JobType type)
Definition JobQueue.cpp:418
std::uint64_t m_lastJob
Definition JobQueue.h:229
beast::insight::Hook hook
Definition JobQueue.h:249
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:172
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:158
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:247
perf::PerfLog & perfLog_
Definition JobQueue.h:246
void finishJob(JobType type)
Definition JobQueue.cpp:343
Json::Value getJson(int c=0)
Definition JobQueue.cpp:193
void getNextJob(Job &job)
Definition JobQueue.cpp:307
Manages partitions for logging.
Definition Log.h:33
Workers is effectively a thread pool.
Definition Workers.h:62
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:32
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
JobType
Definition Job.h:15
T reset(T... args)
Coro_create_t()=default
Called to perform tasks as needed.
Definition Workers.h:66