rippled
Loading...
Searching...
No Matches
PerfLogImp.cpp
1#include <xrpld/core/JobTypes.h>
2#include <xrpld/perflog/detail/PerfLogImp.h>
3
4#include <xrpl/basics/BasicConfig.h>
5#include <xrpl/beast/core/CurrentThreadName.h>
6#include <xrpl/beast/utility/Journal.h>
7#include <xrpl/json/json_writer.h>
8
9#include <atomic>
10#include <cstdint>
11#include <cstdlib>
12#include <iterator>
13#include <mutex>
14#include <optional>
15#include <sstream>
16#include <stdexcept>
17#include <string>
18#include <unordered_map>
19#include <utility>
20
21namespace ripple {
22namespace perf {
23
25 std::set<char const*> const& labels,
26 JobTypes const& jobTypes)
27{
28 {
29 // populateRpc
30 rpc_.reserve(labels.size());
31 for (std::string const label : labels)
32 {
33 auto const inserted = rpc_.emplace(label, Rpc()).second;
34 if (!inserted)
35 {
36 // Ensure that no other function populates this entry.
37 // LCOV_EXCL_START
38 UNREACHABLE(
39 "ripple::perf::PerfLogImp::Counters::Counters : failed to "
40 "insert label");
41 // LCOV_EXCL_STOP
42 }
43 }
44 }
45 {
46 // populateJq
47 jq_.reserve(jobTypes.size());
48 for (auto const& [jobType, _] : jobTypes)
49 {
50 auto const inserted = jq_.emplace(jobType, Jq()).second;
51 if (!inserted)
52 {
53 // Ensure that no other function populates this entry.
54 // LCOV_EXCL_START
55 UNREACHABLE(
56 "ripple::perf::PerfLogImp::Counters::Counters : failed to "
57 "insert job type");
58 // LCOV_EXCL_STOP
59 }
60 }
61 }
62}
63
66{
68 // totalRpc represents all rpc methods. All that started, finished, etc.
69 Rpc totalRpc;
70 for (auto const& proc : rpc_)
71 {
72 Rpc value;
73 {
74 std::lock_guard lock(proc.second.mutex);
75 if (!proc.second.value.started && !proc.second.value.finished &&
76 !proc.second.value.errored)
77 {
78 continue;
79 }
80 value = proc.second.value;
81 }
82
84 p[jss::started] = std::to_string(value.started);
85 totalRpc.started += value.started;
86 p[jss::finished] = std::to_string(value.finished);
87 totalRpc.finished += value.finished;
88 p[jss::errored] = std::to_string(value.errored);
89 totalRpc.errored += value.errored;
90 p[jss::duration_us] = std::to_string(value.duration.count());
91 totalRpc.duration += value.duration;
92 rpcobj[proc.first] = p;
93 }
94
95 if (totalRpc.started)
96 {
97 Json::Value totalRpcJson(Json::objectValue);
98 totalRpcJson[jss::started] = std::to_string(totalRpc.started);
99 totalRpcJson[jss::finished] = std::to_string(totalRpc.finished);
100 totalRpcJson[jss::errored] = std::to_string(totalRpc.errored);
101 totalRpcJson[jss::duration_us] =
102 std::to_string(totalRpc.duration.count());
103 rpcobj[jss::total] = totalRpcJson;
104 }
105
107 // totalJq represents all jobs. All enqueued, started, finished, etc.
108 Jq totalJq;
109 for (auto const& proc : jq_)
110 {
111 Jq value;
112 {
113 std::lock_guard lock(proc.second.mutex);
114 if (!proc.second.value.queued && !proc.second.value.started &&
115 !proc.second.value.finished)
116 {
117 continue;
118 }
119 value = proc.second.value;
120 }
121
123 j[jss::queued] = std::to_string(value.queued);
124 totalJq.queued += value.queued;
125 j[jss::started] = std::to_string(value.started);
126 totalJq.started += value.started;
127 j[jss::finished] = std::to_string(value.finished);
128 totalJq.finished += value.finished;
129 j[jss::queued_duration_us] =
131 totalJq.queuedDuration += value.queuedDuration;
132 j[jss::running_duration_us] =
134 totalJq.runningDuration += value.runningDuration;
135 jqobj[JobTypes::name(proc.first)] = j;
136 }
137
138 if (totalJq.queued)
139 {
140 Json::Value totalJqJson(Json::objectValue);
141 totalJqJson[jss::queued] = std::to_string(totalJq.queued);
142 totalJqJson[jss::started] = std::to_string(totalJq.started);
143 totalJqJson[jss::finished] = std::to_string(totalJq.finished);
144 totalJqJson[jss::queued_duration_us] =
146 totalJqJson[jss::running_duration_us] =
148 jqobj[jss::total] = totalJqJson;
149 }
150
152 // Be kind to reporting tools and let them expect rpc and jq objects
153 // even if empty.
154 counters[jss::rpc] = rpcobj;
155 counters[jss::job_queue] = jqobj;
156 return counters;
157}
158
161{
162 auto const present = steady_clock::now();
163
164 Json::Value jobsArray(Json::arrayValue);
165 auto const jobs = [this] {
166 std::lock_guard lock(jobsMutex_);
167 return jobs_;
168 }();
169
170 for (auto const& j : jobs)
171 {
172 if (j.first == jtINVALID)
173 continue;
175 jobj[jss::job] = JobTypes::name(j.first);
176 jobj[jss::duration_us] = std::to_string(
177 std::chrono::duration_cast<microseconds>(present - j.second)
178 .count());
179 jobsArray.append(jobj);
180 }
181
182 Json::Value methodsArray(Json::arrayValue);
184 {
185 std::lock_guard lock(methodsMutex_);
186 methods.reserve(methods_.size());
187 for (auto const& m : methods_)
188 methods.push_back(m.second);
189 }
190 for (auto m : methods)
191 {
193 methodobj[jss::method] = m.first;
194 methodobj[jss::duration_us] = std::to_string(
195 std::chrono::duration_cast<microseconds>(present - m.second)
196 .count());
197 methodsArray.append(methodobj);
198 }
199
201 current[jss::jobs] = jobsArray;
202 current[jss::methods] = methodsArray;
203 return current;
204}
205
206//-----------------------------------------------------------------------------
207
208void
210{
211 if (setup_.perfLog.empty())
212 return;
213
214 if (logFile_.is_open())
215 logFile_.close();
216
217 auto logDir = setup_.perfLog.parent_path();
218 if (!boost::filesystem::is_directory(logDir))
219 {
220 boost::system::error_code ec;
221 boost::filesystem::create_directories(logDir, ec);
222 if (ec)
223 {
224 JLOG(j_.fatal()) << "Unable to create performance log "
225 "directory "
226 << logDir << ": " << ec.message();
227 signalStop_();
228 return;
229 }
230 }
231
232 logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
233
234 if (!logFile_)
235 {
236 JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog
237 << ".";
238 signalStop_();
239 }
240}
241
242void
244{
247
248 while (true)
249 {
250 {
252 if (cond_.wait_until(
253 lock, lastLog_ + setup_.logInterval, [&] { return stop_; }))
254 {
255 return;
256 }
257 if (rotate_)
258 {
259 openLog();
260 rotate_ = false;
261 }
262 }
263 report();
264 }
265}
266
267void
269{
270 if (!logFile_)
271 // If logFile_ is not writable do no further work.
272 return;
273
274 auto const present = system_clock::now();
275 if (present < lastLog_ + setup_.logInterval)
276 return;
277 lastLog_ = present;
278
280 report[jss::time] = to_string(std::chrono::floor<microseconds>(present));
281 {
283 report[jss::workers] =
284 static_cast<unsigned int>(counters_.jobs_.size());
285 }
286 report[jss::hostid] = hostname_;
287 report[jss::counters] = counters_.countersJson();
288 report[jss::nodestore] = Json::objectValue;
289 app_.getNodeStore().getCountsJson(report[jss::nodestore]);
290 report[jss::current_activities] = counters_.currentJson();
292
293 logFile_ << Json::Compact{std::move(report)} << std::endl;
294}
295
297 Setup const& setup,
298 Application& app,
299 beast::Journal journal,
300 std::function<void()>&& signalStop)
301 : setup_(setup), app_(app), j_(journal), signalStop_(std::move(signalStop))
302{
303 openLog();
304}
305
307{
308 stop();
309}
310
311void
312PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
313{
314 auto counter = counters_.rpc_.find(method);
315 if (counter == counters_.rpc_.end())
316 {
317 // LCOV_EXCL_START
318 UNREACHABLE("ripple::perf::PerfLogImp::rpcStart : valid method input");
319 return;
320 // LCOV_EXCL_STOP
321 }
322
323 {
324 std::lock_guard lock(counter->second.mutex);
325 ++counter->second.value.started;
326 }
328 counters_.methods_[requestId] = {
329 counter->first.c_str(), steady_clock::now()};
330}
331
332void
334 std::string const& method,
335 std::uint64_t const requestId,
336 bool finish)
337{
338 auto counter = counters_.rpc_.find(method);
339 if (counter == counters_.rpc_.end())
340 {
341 // LCOV_EXCL_START
342 UNREACHABLE("ripple::perf::PerfLogImp::rpcEnd : valid method input");
343 return;
344 // LCOV_EXCL_STOP
345 }
346 steady_time_point startTime;
347 {
349 auto const e = counters_.methods_.find(requestId);
350 if (e != counters_.methods_.end())
351 {
352 startTime = e->second.second;
353 counters_.methods_.erase(e);
354 }
355 else
356 {
357 // LCOV_EXCL_START
358 UNREACHABLE(
359 "ripple::perf::PerfLogImp::rpcEnd : valid requestId input");
360 // LCOV_EXCL_STOP
361 }
362 }
363 std::lock_guard lock(counter->second.mutex);
364 if (finish)
365 ++counter->second.value.finished;
366 else
367 ++counter->second.value.errored;
368 counter->second.value.duration += std::chrono::duration_cast<microseconds>(
369 steady_clock::now() - startTime);
370}
371
372void
374{
375 auto counter = counters_.jq_.find(type);
376 if (counter == counters_.jq_.end())
377 {
378 // LCOV_EXCL_START
379 UNREACHABLE(
380 "ripple::perf::PerfLogImp::jobQueue : valid job type input");
381 return;
382 // LCOV_EXCL_STOP
383 }
384 std::lock_guard lock(counter->second.mutex);
385 ++counter->second.value.queued;
386}
387
388void
390 JobType const type,
391 microseconds dur,
392 steady_time_point startTime,
393 int instance)
394{
395 auto counter = counters_.jq_.find(type);
396 if (counter == counters_.jq_.end())
397 {
398 // LCOV_EXCL_START
399 UNREACHABLE(
400 "ripple::perf::PerfLogImp::jobStart : valid job type input");
401 return;
402 // LCOV_EXCL_STOP
403 }
404
405 {
406 std::lock_guard lock(counter->second.mutex);
407 ++counter->second.value.started;
408 counter->second.value.queuedDuration += dur;
409 }
411 if (instance >= 0 && instance < counters_.jobs_.size())
412 counters_.jobs_[instance] = {type, startTime};
413}
414
415void
416PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
417{
418 auto counter = counters_.jq_.find(type);
419 if (counter == counters_.jq_.end())
420 {
421 // LCOV_EXCL_START
422 UNREACHABLE(
423 "ripple::perf::PerfLogImp::jobFinish : valid job type input");
424 return;
425 // LCOV_EXCL_STOP
426 }
427
428 {
429 std::lock_guard lock(counter->second.mutex);
430 ++counter->second.value.finished;
431 counter->second.value.runningDuration += dur;
432 }
434 if (instance >= 0 && instance < counters_.jobs_.size())
436}
437
438void
439PerfLogImp::resizeJobs(int const resize)
440{
442 if (resize > counters_.jobs_.size())
444}
445
446void
448{
449 if (setup_.perfLog.empty())
450 return;
451
453 rotate_ = true;
455}
456
457void
459{
460 if (setup_.perfLog.size())
462}
463
464void
466{
467 if (thread_.joinable())
468 {
469 {
471 stop_ = true;
473 }
474 thread_.join();
475 }
476}
477
478//-----------------------------------------------------------------------------
479
481setup_PerfLog(Section const& section, boost::filesystem::path const& configDir)
482{
483 PerfLog::Setup setup;
484 std::string perfLog;
485 set(perfLog, "perf_log", section);
486 if (perfLog.size())
487 {
488 setup.perfLog = boost::filesystem::path(perfLog);
489 if (setup.perfLog.is_relative())
490 {
491 setup.perfLog =
492 boost::filesystem::absolute(setup.perfLog, configDir);
493 }
494 }
495
496 std::uint64_t logInterval;
497 if (get_if_exists(section, "log_interval", logInterval))
498 setup.logInterval = std::chrono::seconds(logInterval);
499 return setup;
500}
501
504 PerfLog::Setup const& setup,
505 Application& app,
506 beast::Journal journal,
507 std::function<void()>&& signalStop)
508{
510 setup, app, journal, std::move(signalStop));
511}
512
513} // namespace perf
514} // namespace ripple
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
virtual NodeStore::Database & getNodeStore()=0
virtual NetworkOPs & getOPs()=0
static std::string const & name(JobType jt)
Definition JobTypes.h:116
Map::size_type size() const
Definition JobTypes.h:140
virtual void stateAccounting(Json::Value &obj)=0
void getCountsJson(Json::Value &obj)
Definition Database.cpp:248
Holds a collection of configuration values.
Definition BasicConfig.h:26
std::string const hostname_
Definition PerfLogImp.h:112
void rpcEnd(std::string const &method, std::uint64_t const requestId, bool finish)
void resizeJobs(int const resize) override
Ensure enough room to store each currently executing job.
beast::Journal const j_
Definition PerfLogImp.h:104
std::function< void()> const signalStop_
Definition PerfLogImp.h:105
void jobQueue(JobType const type) override
Log queued job.
void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance) override
Log job executing.
void rpcStart(std::string const &method, std::uint64_t const requestId) override
Log start of RPC call.
void rotate() override
Rotate perf log file.
std::condition_variable cond_
Definition PerfLogImp.h:110
PerfLogImp(Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
void jobFinish(JobType const type, microseconds dur, int instance) override
Log job finishing.
system_time_point lastLog_
Definition PerfLogImp.h:111
std::chrono::time_point< steady_clock > steady_time_point
Definition PerfLog.h:37
T close(T... args)
T endl(T... args)
T is_open(T... args)
T is_same_v
T join(T... args)
T joinable(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
PerfLog::Setup setup_PerfLog(Section const &section, boost::filesystem::path const &configDir)
std::unique_ptr< PerfLog > make_PerfLog(PerfLog::Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool get_if_exists(Section const &section, std::string const &name, T &v)
@ current
This was a new validation and was added.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
JobType
Definition Job.h:16
@ jtINVALID
Definition Job.h:18
STL namespace.
T open(T... args)
T push_back(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
Job Queue task performance counters.
Definition PerfLogImp.h:75
RPC performance counters.
Definition PerfLogImp.h:61
std::unordered_map< std::string, Locked< Rpc > > rpc_
Definition PerfLogImp.h:88
std::vector< std::pair< JobType, steady_time_point > > jobs_
Definition PerfLogImp.h:90
std::unordered_map< std::uint64_t, MethodStart > methods_
Definition PerfLogImp.h:92
Counters(std::set< char const * > const &labels, JobTypes const &jobTypes)
std::unordered_map< JobType, Locked< Jq > > jq_
Definition PerfLogImp.h:89
Configuration from [perf] section of rippled.cfg.
Definition PerfLog.h:47
boost::filesystem::path perfLog
Definition PerfLog.h:48
T to_string(T... args)