xrpld
Loading...
Searching...
No Matches
JobQueue.h
1#pragma once
2
3#include <xrpl/basics/LocalValue.h>
4#include <xrpl/core/ClosureCounter.h>
5#include <xrpl/core/JobTypeData.h>
6#include <xrpl/core/JobTypes.h>
7#include <xrpl/core/detail/Workers.h>
8#include <xrpl/json/json_value.h>
9
10// Include only the specific Boost.Coroutine2 headers actually used here.
11// Avoid `boost/coroutine2/all.hpp` because it transitively pulls in
12// `boost/context/pooled_fixedsize_stack.hpp`, whose `.malloc()` / `.free()`
13// member calls on `boost::pool` collide with MSVC's `_CRTDBG_MAP_ALLOC` macros
14// in Debug builds (see cmake/XrplCompiler.cmake).
15#include <boost/context/protected_fixedsize_stack.hpp>
16#include <boost/coroutine2/coroutine.hpp>
17
18#include <set>
19
20namespace xrpl {
21
22namespace perf {
23class PerfLog;
24} // namespace perf
25
26class Logs;
28{
29 explicit CoroCreateT() = default;
30};
31
43{
44public:
47 {
48 private:
53 bool running_{false};
57 boost::coroutines2::coroutine<void>::push_type* yield_{};
58 boost::coroutines2::coroutine<void>::pull_type coro_;
59#ifndef NDEBUG
60 bool finished_ = false;
61#endif
62
63 public:
64 template <class F>
66
67 // Not copy-constructible or assignable
68 Coro(Coro const&) = delete;
69 Coro&
70 operator=(Coro const&) = delete;
71
73
83 void
84 yield() const;
85
99 bool
101
111 void
113
115 bool
116 runnable() const;
117
119 void
121
123 void
125 };
126
127 using JobFunction = std::function<void()>;
128
129 JobQueue(
130 int threadCount,
131 beast::insight::Collector::ptr const& collector,
132 beast::Journal journal,
133 Logs& logs,
134 perf::PerfLog& perfLog);
135 ~JobQueue() override;
136
146 template <
147 typename JobHandler,
149 bool
150 addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
151 {
152 if (auto optionalCountedJob = jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
153 {
154 return addRefCountedJob(type, name, std::move(*optionalCountedJob));
155 }
156 return false;
157 }
158
168 template <class F>
170 postCoro(JobType t, std::string const& name, F&& f);
171
174 int
175 getJobCount(JobType t) const;
176
179 int
180 getJobCountTotal(JobType t) const;
181
184 int
185 getJobCountGE(JobType t) const;
186
190 makeLoadEvent(JobType t, std::string const& name);
191
194 void
195 addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed);
196
197 // Cannot be const because LoadMonitor has no const methods.
198 bool
199 isOverloaded();
200
201 // Cannot be const because LoadMonitor has no const methods.
203 getJson(int c = 0);
204
206 void
207 rendezvous();
208
209 void
210 stop();
211
212 bool
214 {
215 return stopping_;
216 }
217
218 // We may be able to move away from this, but we can keep it during the
219 // transition.
220 bool
221 isStopped() const;
222
223private:
224 friend class Coro;
225
227
237
238 // The number of jobs currently in processTask()
240
241 // The number of suspended coroutines
242 int nSuspend_ = 0;
243
245
246 // Statistics tracking
251
253
254 void
255 collect();
258
259 // Adds a reference counted job to the JobQueue.
260 //
261 // param type The type of job.
262 // param name Name of the job.
263 // param func std::function with signature void (Job&). Called when the
264 // job is executed.
265 //
266 // return true if func added to queue.
267 bool
268 addRefCountedJob(JobType type, std::string const& name, JobFunction const& func);
269
270 // Returns the next Job we should run now.
271 //
272 // RunnableJob:
273 // A Job in the JobSet whose slots count for its type is greater than zero.
274 //
275 // Pre-conditions:
276 // jobSet_ must not be empty.
277 // jobSet_ holds at least one RunnableJob
278 //
279 // Post-conditions:
280 // job is a valid Job object.
281 // job is removed from jobQueue_.
282 // Waiting job count of its type is decremented
283 // Running job count of its type is incremented
284 //
285 // Invariants:
286 // The calling thread owns the JobLock
287 void
288 getNextJob(Job& job);
289
290 // Indicates that a running Job has completed its task.
291 //
292 // Pre-conditions:
293 // Job must not exist in jobSet_.
294 // The JobType must not be invalid.
295 //
296 // Post-conditions:
297 // The running count of that JobType is decremented
298 // A new task is signaled if there are more waiting Jobs than the limit, if
299 // any.
300 //
301 // Invariants:
302 // <none>
303 void
304 finishJob(JobType type);
305
306 // Runs the next appropriate waiting Job.
307 //
308 // Pre-conditions:
309 // A RunnableJob must exist in the JobSet
310 //
311 // Post-conditions:
312 // The chosen RunnableJob will have Job::doJob() called.
313 //
314 // Invariants:
315 // <none>
316 void
317 processTask(int instance) override;
318
319 // Returns the limit of running jobs for the given job type.
320 // For jobs with no limit, we return the largest int. Hopefully that
321 // will be enough.
322 static int
323 getJobLimit(JobType type);
324};
325
326/*
327 An RPC command is received and is handled via ServerHandler(HTTP) or
328 Handler(websocket), depending on the connection type. The handler then calls
329 the JobQueue::postCoro() method to create a coroutine and run it at a later
330 point. This frees up the handler thread and allows it to continue handling
331 other requests while the RPC command completes its work asynchronously.
332
333 postCoro() creates a Coro object. When the Coro ctor is called, and its
334 coro_ member is initialized (a boost::coroutines::pull_type), execution
335 automatically passes to the coroutine, which we don't want at this point,
336 since we are still in the handler thread context. It's important to note
337 here that construction of a boost pull_type automatically passes execution to
338 the coroutine. A pull_type object automatically generates a push_type that is
339 passed as a parameter (do_yield) in the signature of the function the
340 pull_type was created with. This function is immediately called during coro_
341 construction and within it, Coro::yield_ is assigned the push_type
342 parameter (do_yield) address and called (yield()) so we can return execution
343 back to the caller's stack.
344
345 postCoro() then calls Coro::post(), which schedules a job on the job
346 queue to continue execution of the coroutine in a JobQueue worker thread at
347 some later time. When the job runs, we lock on the Coro::mutex_ and call
348 coro_ which continues where we had left off. Since we the last thing we did
349 in coro_ was call yield(), the next thing we continue with is calling the
350 function param f, that was passed into Coro ctor. It is within this
351 function body that the caller specifies what he would like to do while
352 running in the coroutine and allow them to suspend and resume execution.
353 A task that relies on other events to complete, such as path finding, calls
354 Coro::yield() to suspend its execution while waiting on those events to
355 complete and continue when signaled via the Coro::post() method.
356
357 There is a potential race condition that exists here where post() can get
358 called before yield() after f is called. Technically the problem only occurs
359 if the job that post() scheduled is executed before yield() is called.
360 If the post() job were to be executed before yield(), undefined behavior
361 would occur. The lock ensures that coro_ is not called again until we exit
362 the coroutine. At which point a scheduled resume() job waiting on the lock
363 would gain entry. resume() checks if the coroutine has already completed
364 (coro_ converts to false) and, if so, skips invoking operator() since
365 calling operator() on a completed boost::coroutine2 pull_type is undefined
366 behavior.
367
368 The race condition occurs as follows:
369
370 1- The coroutine is running.
371 2- The coroutine is about to suspend, but before it can do so, it must
372 arrange for some event to wake it up.
373 3- The coroutine arranges for some event to wake it up.
374 4- Before the coroutine can suspend, that event occurs and the
375 resumption of the coroutine is scheduled on the job queue. 5- Again, before
376 the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
377 Again, before the coroutine can suspend, the resumption code runs the
378 coroutine.
379 The coroutine is now running in two threads.
380
381 The lock prevents this from happening as step 6 will block until the
382 lock is released which only happens after the coroutine completes.
383*/
384
385} // namespace xrpl
386
387#include <xrpl/core/Coro.ipp>
388
389namespace xrpl {
390
391template <class F>
394{
395 /* First param is a detail type to make construction private.
396 Last param is the function the coroutine runs. Signature of
397 void(std::shared_ptr<Coro>).
398 */
399 auto coro = std::make_shared<Coro>(CoroCreateT{}, *this, t, name, std::forward<F>(f));
400 if (!coro->post())
401 {
402 // The Coro was not successfully posted. Disable it so it's destructor
403 // can run with no negative side effects. Then destroy it.
404 coro->expectEarlyExit();
405 coro.reset();
406 }
407 return coro;
408}
409
410} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:38
std::shared_ptr< Collector > ptr
Definition Collector.h:26
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
Represents a JSON value.
Definition json_value.h:130
Coroutines must run to completion.
Definition JobQueue.h:47
boost::coroutines2::coroutine< void >::pull_type coro_
Definition JobQueue.h:58
Coro(Coro const &)=delete
std::mutex mutex_
Definition JobQueue.h:54
boost::coroutines2::coroutine< void >::push_type * yield_
Definition JobQueue.h:57
bool post()
Schedule coroutine execution.
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
void yield() const
Suspend coroutine execution.
std::string name_
Definition JobQueue.h:52
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
std::condition_variable cv_
Definition JobQueue.h:56
Coro(CoroCreateT, JobQueue &, JobType, std::string, F &&)
void join()
Waits until coroutine returns from the user function.
Coro & operator=(Coro const &)=delete
void resume()
Resume coroutine execution.
detail::LocalValues lvs_
Definition JobQueue.h:49
std::mutex mutexRun_
Definition JobQueue.h:55
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:140
json::Value getJson(int c=0)
Definition JobQueue.cpp:186
std::function< void()> JobFunction
Definition JobQueue.h:127
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:342
JobCounter jobCounter_
Definition JobQueue.h:232
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:250
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:393
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:130
bool isStopped() const
Definition JobQueue.cpp:285
~JobQueue() override
Definition JobQueue.cpp:59
Workers workers_
Definition JobQueue.h:244
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:243
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:24
std::atomic_bool stopping_
Definition JobQueue.h:233
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:150
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:120
beast::insight::Collector::ptr collector_
Definition JobQueue.h:248
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:73
bool isOverloaded()
Definition JobQueue.cpp:180
beast::Journal journal_
Definition JobQueue.h:228
JobDataMap jobData_
Definition JobQueue.h:235
std::atomic_bool stopped_
Definition JobQueue.h:234
bool isStopping() const
Definition JobQueue.h:213
beast::insight::Gauge jobCount_
Definition JobQueue.h:249
std::condition_variable cv_
Definition JobQueue.h:252
static int getJobLimit(JobType type)
Definition JobQueue.cpp:393
JobTypeData invalidJobData_
Definition JobQueue.h:236
std::mutex mutex_
Definition JobQueue.h:229
std::map< JobType, JobTypeData > JobDataMap
Definition JobQueue.h:226
beast::insight::Hook hook_
Definition JobQueue.h:250
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:169
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:157
std::uint64_t lastJob_
Definition JobQueue.h:230
perf::PerfLog & perfLog_
Definition JobQueue.h:247
void finishJob(JobType type)
Definition JobQueue.cpp:321
void getNextJob(Job &job)
Definition JobQueue.cpp:291
std::set< Job > jobSet_
Definition JobQueue.h:231
Manages partitions for logging.
Definition Log.h:20
Workers is effectively a thread pool.
Definition Workers.h:61
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
T forward(T... args)
T make_shared(T... args)
Dummy class for unit tests.
Definition Workers.h:14
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
JobType
Definition Job.h:16
ClosureCounter< void > JobCounter
Definition Job.h:133
CoroCreateT()=default
Called to perform tasks as needed.
Definition Workers.h:65