xrpld
Loading...
Searching...
No Matches
PerfLogImp.cpp
1#include <xrpld/perflog/detail/PerfLogImp.h>
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/chrono.h>
5#include <xrpl/beast/core/CurrentThreadName.h>
6#include <xrpl/beast/utility/Journal.h>
7#include <xrpl/beast/utility/instrumentation.h>
8#include <xrpl/config/BasicConfig.h>
9#include <xrpl/config/Constants.h>
10#include <xrpl/core/Job.h>
11#include <xrpl/core/JobTypes.h>
12#include <xrpl/core/PerfLog.h>
13#include <xrpl/json/json_value.h>
14#include <xrpl/json/json_writer.h>
15#include <xrpl/protocol/jss.h>
16
17#include <boost/filesystem/operations.hpp>
18#include <boost/system/detail/error_code.hpp>
19
20#include <chrono>
21#include <cstdint>
22#include <functional>
23#include <ios>
24#include <memory>
25#include <mutex>
26#include <ostream>
27#include <set>
28#include <string>
29#include <unordered_map>
30#include <utility>
31#include <vector>
32
33namespace xrpl::perf {
34
36{
37 {
38 // populateRpc
39 rpc.reserve(labels.size());
40 for (std::string const label : labels)
41 {
42 auto const inserted = rpc.emplace(label, Rpc()).second;
43 if (!inserted)
44 {
45 // Ensure that no other function populates this entry.
46 // LCOV_EXCL_START
47 UNREACHABLE(
48 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
49 "insert label");
50 // LCOV_EXCL_STOP
51 }
52 }
53 }
54 {
55 // populateJq
56 jq.reserve(jobTypes.size());
57 for (auto const& [jobType, _] : jobTypes)
58 {
59 auto const inserted = jq.emplace(jobType, Jq()).second;
60 if (!inserted)
61 {
62 // Ensure that no other function populates this entry.
63 // LCOV_EXCL_START
64 UNREACHABLE(
65 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
66 "insert job type");
67 // LCOV_EXCL_STOP
68 }
69 }
70 }
71}
72
75{
77 // totalRpc represents all rpc methods. All that started, finished, etc.
78 Rpc totalRpc;
79 for (auto const& proc : rpc)
80 {
81 Rpc value;
82 {
83 std::scoped_lock const lock(proc.second.mutex);
84 if ((proc.second.value.started == 0u) && (proc.second.value.finished == 0u) &&
85 (proc.second.value.errored == 0u))
86 {
87 continue;
88 }
89 value = proc.second.value;
90 }
91
93 p[jss::started] = std::to_string(value.started);
94 totalRpc.started += value.started;
95 p[jss::finished] = std::to_string(value.finished);
96 totalRpc.finished += value.finished;
97 p[jss::errored] = std::to_string(value.errored);
98 totalRpc.errored += value.errored;
99 p[jss::duration_us] = std::to_string(value.duration.count());
100 totalRpc.duration += value.duration;
101 rpcobj[proc.first] = p;
102 }
103
104 if (totalRpc.started != 0u)
105 {
107 totalRpcJson[jss::started] = std::to_string(totalRpc.started);
108 totalRpcJson[jss::finished] = std::to_string(totalRpc.finished);
109 totalRpcJson[jss::errored] = std::to_string(totalRpc.errored);
110 totalRpcJson[jss::duration_us] = std::to_string(totalRpc.duration.count());
111 rpcobj[jss::total] = totalRpcJson;
112 }
113
115 // totalJq represents all jobs. All enqueued, started, finished, etc.
116 Jq totalJq;
117 for (auto const& proc : jq)
118 {
119 Jq value;
120 {
121 std::scoped_lock const lock(proc.second.mutex);
122 if ((proc.second.value.queued == 0u) && (proc.second.value.started == 0u) &&
123 (proc.second.value.finished == 0u))
124 {
125 continue;
126 }
127 value = proc.second.value;
128 }
129
131 j[jss::queued] = std::to_string(value.queued);
132 totalJq.queued += value.queued;
133 j[jss::started] = std::to_string(value.started);
134 totalJq.started += value.started;
135 j[jss::finished] = std::to_string(value.finished);
136 totalJq.finished += value.finished;
137 j[jss::queued_duration_us] = std::to_string(value.queuedDuration.count());
138 totalJq.queuedDuration += value.queuedDuration;
139 j[jss::running_duration_us] = std::to_string(value.runningDuration.count());
140 totalJq.runningDuration += value.runningDuration;
141 jobQueueObj[JobTypes::name(proc.first)] = j;
142 }
143
144 if (totalJq.queued != 0u)
145 {
147 totalJqJson[jss::queued] = std::to_string(totalJq.queued);
148 totalJqJson[jss::started] = std::to_string(totalJq.started);
149 totalJqJson[jss::finished] = std::to_string(totalJq.finished);
150 totalJqJson[jss::queued_duration_us] = std::to_string(totalJq.queuedDuration.count());
151 totalJqJson[jss::running_duration_us] = std::to_string(totalJq.runningDuration.count());
152 jobQueueObj[jss::total] = totalJqJson;
153 }
154
156 // Be kind to reporting tools and let them expect rpc and jq objects
157 // even if empty.
158 counters[jss::rpc] = rpcobj;
159 counters[jss::job_queue] = jobQueueObj;
160 return counters;
161}
162
165{
166 auto const present = steady_clock::now();
167
169 auto const jobs = [this] {
170 std::scoped_lock const lock(jobsMutex);
171 return this->jobs;
172 }();
173
174 for (auto const& j : jobs)
175 {
176 if (j.first == JtInvalid)
177 continue;
179 jobj[jss::job] = JobTypes::name(j.first);
180 jobj[jss::duration_us] =
181 std::to_string(std::chrono::duration_cast<microseconds>(present - j.second).count());
182 jobsArray.append(jobj);
183 }
184
187 {
188 std::scoped_lock const lock(methodsMutex);
189 methods.reserve(this->methods.size());
190 for (auto const& m : this->methods)
191 methods.push_back(m.second);
192 }
193 for (auto m : methods)
194 {
196 methodobj[jss::method] = m.first;
197 methodobj[jss::duration_us] =
198 std::to_string(std::chrono::duration_cast<microseconds>(present - m.second).count());
199 methodsArray.append(methodobj);
200 }
201
203 current[jss::jobs] = jobsArray;
204 current[jss::methods] = methodsArray;
205 return current;
206}
207
208//-----------------------------------------------------------------------------
209
210void
212{
213 if (setup_.perfLog.empty())
214 return;
215
216 if (logFile_.is_open())
217 logFile_.close();
218
219 auto logDir = setup_.perfLog.parent_path();
220 if (!boost::filesystem::is_directory(logDir))
221 {
222 boost::system::error_code ec;
223 boost::filesystem::create_directories(logDir, ec);
224 if (ec)
225 {
226 JLOG(j_.fatal()) << "Unable to create performance log "
227 "directory "
228 << logDir << ": " << ec.message();
229 signalStop_();
230 return;
231 }
232 }
233
234 logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
235
236 if (!logFile_)
237 {
238 JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog << ".";
239 signalStop_();
240 }
241}
242
243void
245{
248
249 while (true)
250 {
251 {
253 if (cond_.wait_until(lock, lastLog_ + setup_.logInterval, [&] { return stop_; }))
254 {
255 return;
256 }
257 if (rotate_)
258 {
259 openLog();
260 rotate_ = false;
261 }
262 }
263 report();
264 }
265}
266
267void
269{
270 if (!logFile_)
271 {
272 // If logFile_ is not writable do no further work.
273 return;
274 }
275
276 auto const present = system_clock::now();
277 if (present < lastLog_ + setup_.logInterval)
278 return;
279 lastLog_ = present;
280
282 report[jss::time] = to_string(std::chrono::floor<microseconds>(present));
283 {
284 std::scoped_lock const lock{counters_.jobsMutex};
285 report[jss::workers] = static_cast<unsigned int>(counters_.jobs.size());
286 }
287 report[jss::hostid] = hostname_;
288 report[jss::counters] = counters_.countersJson();
289 report[jss::nodestore] = json::ValueType::Object;
290 app_.getNodeStore().getCountsJson(report[jss::nodestore]);
291 report[jss::current_activities] = counters_.currentJson();
292 app_.getOPs().stateAccounting(report);
293
294 logFile_ << json::Compact{std::move(report)} << std::endl;
295}
296
298 Setup setup,
299 Application& app,
300 beast::Journal journal,
301 std::function<void()>&& signalStop)
302 : setup_(std::move(setup)), app_(app), j_(journal), signalStop_(std::move(signalStop))
303{
304 openLog();
305}
306
308{
309 stop();
310}
311
312void
313PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
314{
315 auto counter = counters_.rpc.find(method);
316 if (counter == counters_.rpc.end())
317 {
318 // LCOV_EXCL_START
319 UNREACHABLE("xrpl::perf::PerfLogImp::rpcStart : valid method input");
320 return;
321 // LCOV_EXCL_STOP
322 }
323
324 {
325 std::scoped_lock const lock(counter->second.mutex);
326 ++counter->second.value.started;
327 }
328 std::scoped_lock const lock(counters_.methodsMutex);
329 counters_.methods[requestId] = {counter->first.c_str(), steady_clock::now()};
330}
331
332void
333PerfLogImp::rpcEnd(std::string const& method, std::uint64_t const requestId, bool finish)
334{
335 auto counter = counters_.rpc.find(method);
336 if (counter == counters_.rpc.end())
337 {
338 // LCOV_EXCL_START
339 UNREACHABLE("xrpl::perf::PerfLogImp::rpcEnd : valid method input");
340 return;
341 // LCOV_EXCL_STOP
342 }
343 steady_time_point startTime;
344 {
345 std::scoped_lock const lock(counters_.methodsMutex);
346 auto const e = counters_.methods.find(requestId);
347 if (e != counters_.methods.end())
348 {
349 startTime = e->second.second;
350 counters_.methods.erase(e);
351 }
352 else
353 {
354 // LCOV_EXCL_START
355 UNREACHABLE("xrpl::perf::PerfLogImp::rpcEnd : valid requestId input");
356 // LCOV_EXCL_STOP
357 }
358 }
359 std::scoped_lock const lock(counter->second.mutex);
360 if (finish)
361 {
362 ++counter->second.value.finished;
363 }
364 else
365 {
366 ++counter->second.value.errored;
367 }
368 counter->second.value.duration +=
370}
371
372void
374{
375 auto counter = counters_.jq.find(type);
376 if (counter == counters_.jq.end())
377 {
378 // LCOV_EXCL_START
379 UNREACHABLE("xrpl::perf::PerfLogImp::jobQueue : valid job type input");
380 return;
381 // LCOV_EXCL_STOP
382 }
383 std::scoped_lock const lock(counter->second.mutex);
384 ++counter->second.value.queued;
385}
386
387void
389 JobType const type,
390 microseconds dur,
391 steady_time_point startTime,
392 int instance)
393{
394 auto counter = counters_.jq.find(type);
395 if (counter == counters_.jq.end())
396 {
397 // LCOV_EXCL_START
398 UNREACHABLE("xrpl::perf::PerfLogImp::jobStart : valid job type input");
399 return;
400 // LCOV_EXCL_STOP
401 }
402
403 {
404 std::scoped_lock const lock(counter->second.mutex);
405 ++counter->second.value.started;
406 counter->second.value.queuedDuration += dur;
407 }
408 std::scoped_lock const lock(counters_.jobsMutex);
409 if (instance >= 0 && instance < counters_.jobs.size())
410 counters_.jobs[instance] = {type, startTime};
411}
412
413void
414PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
415{
416 auto counter = counters_.jq.find(type);
417 if (counter == counters_.jq.end())
418 {
419 // LCOV_EXCL_START
420 UNREACHABLE("xrpl::perf::PerfLogImp::jobFinish : valid job type input");
421 return;
422 // LCOV_EXCL_STOP
423 }
424
425 {
426 std::scoped_lock const lock(counter->second.mutex);
427 ++counter->second.value.finished;
428 counter->second.value.runningDuration += dur;
429 }
430 std::scoped_lock const lock(counters_.jobsMutex);
431 if (instance >= 0 && instance < counters_.jobs.size())
432 counters_.jobs[instance] = {JtInvalid, steady_time_point()};
433}
434
435void
436PerfLogImp::resizeJobs(int const resize)
437{
438 std::scoped_lock const lock(counters_.jobsMutex);
439 if (resize > counters_.jobs.size())
440 counters_.jobs.resize(resize, {JtInvalid, steady_time_point()});
441}
442
443void
445{
446 if (setup_.perfLog.empty())
447 return;
448
449 std::scoped_lock const lock(mutex_);
450 rotate_ = true;
451 cond_.notify_one();
452}
453
454void
456{
457 if (!setup_.perfLog.empty())
459}
460
461void
463{
464 if (thread_.joinable())
465 {
466 {
467 std::scoped_lock const lock(mutex_);
468 stop_ = true;
469 cond_.notify_one();
470 }
471 thread_.join();
472 }
473}
474
475//-----------------------------------------------------------------------------
476
478setupPerfLog(Section const& section, boost::filesystem::path const& configDir)
479{
480 PerfLog::Setup setup;
481 std::string perfLog;
482 set(perfLog, "perf_log", section);
483 if (!perfLog.empty())
484 {
485 setup.perfLog = boost::filesystem::path(perfLog);
486 if (setup.perfLog.is_relative())
487 {
488 setup.perfLog = boost::filesystem::absolute(setup.perfLog, configDir);
489 }
490 }
491
492 std::uint64_t logInterval = 0;
493 if (getIfExists(section, Keys::kLogInterval, logInterval))
494 setup.logInterval = std::chrono::seconds(logInterval);
495 return setup;
496}
497
500 PerfLog::Setup const& setup,
501 Application& app,
502 beast::Journal journal,
503 std::function<void()>&& signalStop)
504{
505 return std::make_unique<PerfLogImp>(setup, app, journal, std::move(signalStop));
506}
507
508} // namespace xrpl::perf
A generic endpoint for log messages.
Definition Journal.h:38
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
Map::size_type size() const
Definition JobTypes.h:133
static std::string const & name(JobType jt)
Definition JobTypes.h:109
Holds a collection of configuration values.
Definition BasicConfig.h:24
std::condition_variable cond_
Definition PerfLogImp.h:108
void jobFinish(JobType const type, microseconds dur, int instance) override
Log job finishing.
void start() override
PerfLogImp(Setup setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
std::ofstream logFile_
Definition PerfLogImp.h:105
void rpcStart(std::string const &method, std::uint64_t const requestId) override
Log start of RPC call.
void jobQueue(JobType const type) override
Log queued job.
void resizeJobs(int const resize) override
Ensure enough room to store each currently executing job.
system_time_point lastLog_
Definition PerfLogImp.h:109
beast::Journal const j_
Definition PerfLogImp.h:102
void rpcEnd(std::string const &method, std::uint64_t const requestId, bool finish)
void stop() override
void rotate() override
Rotate perf log file.
std::function< void()> const signalStop_
Definition PerfLogImp.h:103
void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance) override
Log job executing.
std::string const hostname_
Definition PerfLogImp.h:110
std::chrono::microseconds microseconds
Definition PerfLog.h:39
std::chrono::time_point< steady_clock > steady_time_point
Definition PerfLog.h:35
T duration_cast(T... args)
T empty(T... args)
T endl(T... args)
T make_unique(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
@ Array
array value (ordered list)
Definition json_value.h:25
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
Dummy class for unit tests.
Definition Workers.h:14
std::unique_ptr< PerfLog > makePerfLog(PerfLog::Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
PerfLog::Setup setupPerfLog(Section const &section, boost::filesystem::path const &configDir)
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool getIfExists(Section const &section, std::string const &name, T &v)
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
JobType
Definition Job.h:16
@ JtInvalid
Definition Job.h:18
T size(T... args)
static constexpr auto kLogInterval
Definition Constants.h:118
Job Queue task performance counters.
Definition PerfLogImp.h:73
RPC performance counters.
Definition PerfLogImp.h:59
std::vector< std::pair< JobType, steady_time_point > > jobs
Definition PerfLogImp.h:88
std::unordered_map< std::string, Locked< Rpc > > rpc
Definition PerfLogImp.h:86
json::Value countersJson() const
json::Value currentJson() const
Counters(std::set< char const * > const &labels, JobTypes const &jobTypes)
std::unordered_map< JobType, Locked< Jq > > jq
Definition PerfLogImp.h:87
std::unordered_map< std::uint64_t, MethodStart > methods
Definition PerfLogImp.h:90
Configuration from [perf] section of xrpld.cfg.
Definition PerfLog.h:45
boost::filesystem::path perfLog
Definition PerfLog.h:46
milliseconds logInterval
Definition PerfLog.h:48
T to_string(T... args)