rippled
Loading...
Searching...
No Matches
PerfLogImp.cpp
1#include <xrpld/perflog/detail/PerfLogImp.h>
2
3#include <xrpl/basics/BasicConfig.h>
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/utility/Journal.h>
6#include <xrpl/core/JobTypes.h>
7#include <xrpl/json/json_writer.h>
8
9#include <atomic>
10#include <cstdint>
11#include <cstdlib>
12#include <iterator>
13#include <mutex>
14#include <optional>
15#include <sstream>
16#include <stdexcept>
17#include <string>
18#include <unordered_map>
19#include <utility>
20
21namespace xrpl {
22namespace perf {
23
25 std::set<char const*> const& labels,
26 JobTypes const& jobTypes)
27{
28 {
29 // populateRpc
30 rpc_.reserve(labels.size());
31 for (std::string const label : labels)
32 {
33 auto const inserted = rpc_.emplace(label, Rpc()).second;
34 if (!inserted)
35 {
36 // Ensure that no other function populates this entry.
37 // LCOV_EXCL_START
38 UNREACHABLE(
39 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
40 "insert label");
41 // LCOV_EXCL_STOP
42 }
43 }
44 }
45 {
46 // populateJq
47 jq_.reserve(jobTypes.size());
48 for (auto const& [jobType, _] : jobTypes)
49 {
50 auto const inserted = jq_.emplace(jobType, Jq()).second;
51 if (!inserted)
52 {
53 // Ensure that no other function populates this entry.
54 // LCOV_EXCL_START
55 UNREACHABLE(
56 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
57 "insert job type");
58 // LCOV_EXCL_STOP
59 }
60 }
61 }
62}
63
66{
68 // totalRpc represents all rpc methods. All that started, finished, etc.
69 Rpc totalRpc;
70 for (auto const& proc : rpc_)
71 {
72 Rpc value;
73 {
74 std::lock_guard lock(proc.second.mutex);
75 if (!proc.second.value.started && !proc.second.value.finished &&
76 !proc.second.value.errored)
77 {
78 continue;
79 }
80 value = proc.second.value;
81 }
82
84 p[jss::started] = std::to_string(value.started);
85 totalRpc.started += value.started;
86 p[jss::finished] = std::to_string(value.finished);
87 totalRpc.finished += value.finished;
88 p[jss::errored] = std::to_string(value.errored);
89 totalRpc.errored += value.errored;
90 p[jss::duration_us] = std::to_string(value.duration.count());
91 totalRpc.duration += value.duration;
92 rpcobj[proc.first] = p;
93 }
94
95 if (totalRpc.started)
96 {
97 Json::Value totalRpcJson(Json::objectValue);
98 totalRpcJson[jss::started] = std::to_string(totalRpc.started);
99 totalRpcJson[jss::finished] = std::to_string(totalRpc.finished);
100 totalRpcJson[jss::errored] = std::to_string(totalRpc.errored);
101 totalRpcJson[jss::duration_us] =
102 std::to_string(totalRpc.duration.count());
103 rpcobj[jss::total] = totalRpcJson;
104 }
105
106 Json::Value jobQueueObj(Json::objectValue);
107 // totalJq represents all jobs. All enqueued, started, finished, etc.
108 Jq totalJq;
109 for (auto const& proc : jq_)
110 {
111 Jq value;
112 {
113 std::lock_guard lock(proc.second.mutex);
114 if (!proc.second.value.queued && !proc.second.value.started &&
115 !proc.second.value.finished)
116 {
117 continue;
118 }
119 value = proc.second.value;
120 }
121
123 j[jss::queued] = std::to_string(value.queued);
124 totalJq.queued += value.queued;
125 j[jss::started] = std::to_string(value.started);
126 totalJq.started += value.started;
127 j[jss::finished] = std::to_string(value.finished);
128 totalJq.finished += value.finished;
129 j[jss::queued_duration_us] =
131 totalJq.queuedDuration += value.queuedDuration;
132 j[jss::running_duration_us] =
134 totalJq.runningDuration += value.runningDuration;
135 jobQueueObj[JobTypes::name(proc.first)] = j;
136 }
137
138 if (totalJq.queued)
139 {
140 Json::Value totalJqJson(Json::objectValue);
141 totalJqJson[jss::queued] = std::to_string(totalJq.queued);
142 totalJqJson[jss::started] = std::to_string(totalJq.started);
143 totalJqJson[jss::finished] = std::to_string(totalJq.finished);
144 totalJqJson[jss::queued_duration_us] =
146 totalJqJson[jss::running_duration_us] =
148 jobQueueObj[jss::total] = totalJqJson;
149 }
150
152 // Be kind to reporting tools and let them expect rpc and jq objects
153 // even if empty.
154 counters[jss::rpc] = rpcobj;
155 counters[jss::job_queue] = jobQueueObj;
156 return counters;
157}
158
161{
162 auto const present = steady_clock::now();
163
164 Json::Value jobsArray(Json::arrayValue);
165 auto const jobs = [this] {
166 std::lock_guard lock(jobsMutex_);
167 return jobs_;
168 }();
169
170 for (auto const& j : jobs)
171 {
172 if (j.first == jtINVALID)
173 continue;
175 jobj[jss::job] = JobTypes::name(j.first);
176 jobj[jss::duration_us] = std::to_string(
177 std::chrono::duration_cast<microseconds>(present - j.second)
178 .count());
179 jobsArray.append(jobj);
180 }
181
182 Json::Value methodsArray(Json::arrayValue);
184 {
185 std::lock_guard lock(methodsMutex_);
186 methods.reserve(methods_.size());
187 for (auto const& m : methods_)
188 methods.push_back(m.second);
189 }
190 for (auto m : methods)
191 {
193 methodobj[jss::method] = m.first;
194 methodobj[jss::duration_us] = std::to_string(
195 std::chrono::duration_cast<microseconds>(present - m.second)
196 .count());
197 methodsArray.append(methodobj);
198 }
199
201 current[jss::jobs] = jobsArray;
202 current[jss::methods] = methodsArray;
203 return current;
204}
205
206//-----------------------------------------------------------------------------
207
208void
210{
211 if (setup_.perfLog.empty())
212 return;
213
214 if (logFile_.is_open())
215 logFile_.close();
216
217 auto logDir = setup_.perfLog.parent_path();
218 if (!boost::filesystem::is_directory(logDir))
219 {
220 boost::system::error_code ec;
221 boost::filesystem::create_directories(logDir, ec);
222 if (ec)
223 {
224 JLOG(j_.fatal()) << "Unable to create performance log "
225 "directory "
226 << logDir << ": " << ec.message();
227 signalStop_();
228 return;
229 }
230 }
231
232 logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
233
234 if (!logFile_)
235 {
236 JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog
237 << ".";
238 signalStop_();
239 }
240}
241
242void
244{
247
248 while (true)
249 {
250 {
252 if (cond_.wait_until(
253 lock, lastLog_ + setup_.logInterval, [&] { return stop_; }))
254 {
255 return;
256 }
257 if (rotate_)
258 {
259 openLog();
260 rotate_ = false;
261 }
262 }
263 report();
264 }
265}
266
267void
269{
270 if (!logFile_)
271 // If logFile_ is not writable do no further work.
272 return;
273
274 auto const present = system_clock::now();
275 if (present < lastLog_ + setup_.logInterval)
276 return;
277 lastLog_ = present;
278
280 report[jss::time] = to_string(std::chrono::floor<microseconds>(present));
281 {
283 report[jss::workers] =
284 static_cast<unsigned int>(counters_.jobs_.size());
285 }
286 report[jss::hostid] = hostname_;
287 report[jss::counters] = counters_.countersJson();
288 report[jss::nodestore] = Json::objectValue;
289 app_.getNodeStore().getCountsJson(report[jss::nodestore]);
290 report[jss::current_activities] = counters_.currentJson();
292
293 logFile_ << Json::Compact{std::move(report)} << std::endl;
294}
295
297 Setup const& setup,
298 Application& app,
299 beast::Journal journal,
300 std::function<void()>&& signalStop)
301 : setup_(setup), app_(app), j_(journal), signalStop_(std::move(signalStop))
302{
303 openLog();
304}
305
307{
308 stop();
309}
310
311void
312PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
313{
314 auto counter = counters_.rpc_.find(method);
315 if (counter == counters_.rpc_.end())
316 {
317 // LCOV_EXCL_START
318 UNREACHABLE("xrpl::perf::PerfLogImp::rpcStart : valid method input");
319 return;
320 // LCOV_EXCL_STOP
321 }
322
323 {
324 std::lock_guard lock(counter->second.mutex);
325 ++counter->second.value.started;
326 }
328 counters_.methods_[requestId] = {
329 counter->first.c_str(), steady_clock::now()};
330}
331
332void
334 std::string const& method,
335 std::uint64_t const requestId,
336 bool finish)
337{
338 auto counter = counters_.rpc_.find(method);
339 if (counter == counters_.rpc_.end())
340 {
341 // LCOV_EXCL_START
342 UNREACHABLE("xrpl::perf::PerfLogImp::rpcEnd : valid method input");
343 return;
344 // LCOV_EXCL_STOP
345 }
346 steady_time_point startTime;
347 {
349 auto const e = counters_.methods_.find(requestId);
350 if (e != counters_.methods_.end())
351 {
352 startTime = e->second.second;
353 counters_.methods_.erase(e);
354 }
355 else
356 {
357 // LCOV_EXCL_START
358 UNREACHABLE(
359 "xrpl::perf::PerfLogImp::rpcEnd : valid requestId input");
360 // LCOV_EXCL_STOP
361 }
362 }
363 std::lock_guard lock(counter->second.mutex);
364 if (finish)
365 ++counter->second.value.finished;
366 else
367 ++counter->second.value.errored;
368 counter->second.value.duration += std::chrono::duration_cast<microseconds>(
369 steady_clock::now() - startTime);
370}
371
372void
374{
375 auto counter = counters_.jq_.find(type);
376 if (counter == counters_.jq_.end())
377 {
378 // LCOV_EXCL_START
379 UNREACHABLE("xrpl::perf::PerfLogImp::jobQueue : valid job type input");
380 return;
381 // LCOV_EXCL_STOP
382 }
383 std::lock_guard lock(counter->second.mutex);
384 ++counter->second.value.queued;
385}
386
387void
389 JobType const type,
390 microseconds dur,
391 steady_time_point startTime,
392 int instance)
393{
394 auto counter = counters_.jq_.find(type);
395 if (counter == counters_.jq_.end())
396 {
397 // LCOV_EXCL_START
398 UNREACHABLE("xrpl::perf::PerfLogImp::jobStart : valid job type input");
399 return;
400 // LCOV_EXCL_STOP
401 }
402
403 {
404 std::lock_guard lock(counter->second.mutex);
405 ++counter->second.value.started;
406 counter->second.value.queuedDuration += dur;
407 }
409 if (instance >= 0 && instance < counters_.jobs_.size())
410 counters_.jobs_[instance] = {type, startTime};
411}
412
413void
414PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
415{
416 auto counter = counters_.jq_.find(type);
417 if (counter == counters_.jq_.end())
418 {
419 // LCOV_EXCL_START
420 UNREACHABLE("xrpl::perf::PerfLogImp::jobFinish : valid job type input");
421 return;
422 // LCOV_EXCL_STOP
423 }
424
425 {
426 std::lock_guard lock(counter->second.mutex);
427 ++counter->second.value.finished;
428 counter->second.value.runningDuration += dur;
429 }
431 if (instance >= 0 && instance < counters_.jobs_.size())
433}
434
435void
436PerfLogImp::resizeJobs(int const resize)
437{
439 if (resize > counters_.jobs_.size())
441}
442
443void
445{
446 if (setup_.perfLog.empty())
447 return;
448
450 rotate_ = true;
452}
453
454void
456{
457 if (setup_.perfLog.size())
459}
460
461void
463{
464 if (thread_.joinable())
465 {
466 {
468 stop_ = true;
470 }
471 thread_.join();
472 }
473}
474
475//-----------------------------------------------------------------------------
476
478setup_PerfLog(Section const& section, boost::filesystem::path const& configDir)
479{
480 PerfLog::Setup setup;
481 std::string perfLog;
482 set(perfLog, "perf_log", section);
483 if (perfLog.size())
484 {
485 setup.perfLog = boost::filesystem::path(perfLog);
486 if (setup.perfLog.is_relative())
487 {
488 setup.perfLog =
489 boost::filesystem::absolute(setup.perfLog, configDir);
490 }
491 }
492
493 std::uint64_t logInterval;
494 if (get_if_exists(section, "log_interval", logInterval))
495 setup.logInterval = std::chrono::seconds(logInterval);
496 return setup;
497}
498
501 PerfLog::Setup const& setup,
502 Application& app,
503 beast::Journal journal,
504 std::function<void()>&& signalStop)
505{
507 setup, app, journal, std::move(signalStop));
508}
509
510} // namespace perf
511} // namespace xrpl
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
virtual NodeStore::Database & getNodeStore()=0
virtual NetworkOPs & getOPs()=0
Map::size_type size() const
Definition JobTypes.h:140
static std::string const & name(JobType jt)
Definition JobTypes.h:116
virtual void stateAccounting(Json::Value &obj)=0
void getCountsJson(Json::Value &obj)
Definition Database.cpp:248
Holds a collection of configuration values.
Definition BasicConfig.h:26
std::condition_variable cond_
Definition PerfLogImp.h:110
void jobFinish(JobType const type, microseconds dur, int instance) override
Log job finishing.
void start() override
std::ofstream logFile_
Definition PerfLogImp.h:107
void rpcStart(std::string const &method, std::uint64_t const requestId) override
Log start of RPC call.
void jobQueue(JobType const type) override
Log queued job.
void resizeJobs(int const resize) override
Ensure enough room to store each currently executing job.
system_time_point lastLog_
Definition PerfLogImp.h:111
beast::Journal const j_
Definition PerfLogImp.h:104
void rpcEnd(std::string const &method, std::uint64_t const requestId, bool finish)
void stop() override
void rotate() override
Rotate perf log file.
std::function< void()> const signalStop_
Definition PerfLogImp.h:105
PerfLogImp(Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance) override
Log job executing.
std::string const hostname_
Definition PerfLogImp.h:112
std::chrono::time_point< steady_clock > steady_time_point
Definition PerfLog.h:36
T close(T... args)
T endl(T... args)
T is_open(T... args)
T is_same_v
T join(T... args)
T joinable(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
std::unique_ptr< PerfLog > make_PerfLog(PerfLog::Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
PerfLog::Setup setup_PerfLog(Section const &section, boost::filesystem::path const &configDir)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
@ current
This was a new validation and was added.
JobType
Definition Job.h:15
@ jtINVALID
Definition Job.h:17
bool get_if_exists(Section const &section, std::string const &name, T &v)
T open(T... args)
T push_back(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
Job Queue task performance counters.
Definition PerfLogImp.h:75
RPC performance counters.
Definition PerfLogImp.h:61
std::unordered_map< JobType, Locked< Jq > > jq_
Definition PerfLogImp.h:89
std::vector< std::pair< JobType, steady_time_point > > jobs_
Definition PerfLogImp.h:90
Json::Value currentJson() const
std::unordered_map< std::uint64_t, MethodStart > methods_
Definition PerfLogImp.h:92
std::unordered_map< std::string, Locked< Rpc > > rpc_
Definition PerfLogImp.h:88
Json::Value countersJson() const
Counters(std::set< char const * > const &labels, JobTypes const &jobTypes)
Configuration from [perf] section of xrpld.cfg.
Definition PerfLog.h:46
boost::filesystem::path perfLog
Definition PerfLog.h:47
milliseconds logInterval
Definition PerfLog.h:49
T to_string(T... args)