rippled
Loading...
Searching...
No Matches
PerfLogImp.cpp
1#include <xrpld/perflog/detail/PerfLogImp.h>
2
3#include <xrpl/basics/BasicConfig.h>
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/utility/Journal.h>
6#include <xrpl/core/JobTypes.h>
7#include <xrpl/json/json_writer.h>
8
9#include <atomic>
10#include <cstdint>
11#include <cstdlib>
12#include <iterator>
13#include <mutex>
14#include <optional>
15#include <sstream>
16#include <stdexcept>
17#include <string>
18#include <unordered_map>
19#include <utility>
20
21namespace xrpl {
22namespace perf {
23
25{
26 {
27 // populateRpc
28 rpc_.reserve(labels.size());
29 for (std::string const label : labels)
30 {
31 auto const inserted = rpc_.emplace(label, Rpc()).second;
32 if (!inserted)
33 {
34 // Ensure that no other function populates this entry.
35 // LCOV_EXCL_START
36 UNREACHABLE(
37 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
38 "insert label");
39 // LCOV_EXCL_STOP
40 }
41 }
42 }
43 {
44 // populateJq
45 jq_.reserve(jobTypes.size());
46 for (auto const& [jobType, _] : jobTypes)
47 {
48 auto const inserted = jq_.emplace(jobType, Jq()).second;
49 if (!inserted)
50 {
51 // Ensure that no other function populates this entry.
52 // LCOV_EXCL_START
53 UNREACHABLE(
54 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
55 "insert job type");
56 // LCOV_EXCL_STOP
57 }
58 }
59 }
60}
61
64{
66 // totalRpc represents all rpc methods. All that started, finished, etc.
67 Rpc totalRpc;
68 for (auto const& proc : rpc_)
69 {
70 Rpc value;
71 {
72 std::lock_guard const lock(proc.second.mutex);
73 if ((proc.second.value.started == 0u) && (proc.second.value.finished == 0u) &&
74 (proc.second.value.errored == 0u))
75 {
76 continue;
77 }
78 value = proc.second.value;
79 }
80
82 p[jss::started] = std::to_string(value.started);
83 totalRpc.started += value.started;
84 p[jss::finished] = std::to_string(value.finished);
85 totalRpc.finished += value.finished;
86 p[jss::errored] = std::to_string(value.errored);
87 totalRpc.errored += value.errored;
88 p[jss::duration_us] = std::to_string(value.duration.count());
89 totalRpc.duration += value.duration;
90 rpcobj[proc.first] = p;
91 }
92
93 if (totalRpc.started != 0u)
94 {
95 Json::Value totalRpcJson(Json::objectValue);
96 totalRpcJson[jss::started] = std::to_string(totalRpc.started);
97 totalRpcJson[jss::finished] = std::to_string(totalRpc.finished);
98 totalRpcJson[jss::errored] = std::to_string(totalRpc.errored);
99 totalRpcJson[jss::duration_us] = std::to_string(totalRpc.duration.count());
100 rpcobj[jss::total] = totalRpcJson;
101 }
102
103 Json::Value jobQueueObj(Json::objectValue);
104 // totalJq represents all jobs. All enqueued, started, finished, etc.
105 Jq totalJq;
106 for (auto const& proc : jq_)
107 {
108 Jq value;
109 {
110 std::lock_guard const lock(proc.second.mutex);
111 if ((proc.second.value.queued == 0u) && (proc.second.value.started == 0u) &&
112 (proc.second.value.finished == 0u))
113 {
114 continue;
115 }
116 value = proc.second.value;
117 }
118
120 j[jss::queued] = std::to_string(value.queued);
121 totalJq.queued += value.queued;
122 j[jss::started] = std::to_string(value.started);
123 totalJq.started += value.started;
124 j[jss::finished] = std::to_string(value.finished);
125 totalJq.finished += value.finished;
126 j[jss::queued_duration_us] = std::to_string(value.queuedDuration.count());
127 totalJq.queuedDuration += value.queuedDuration;
128 j[jss::running_duration_us] = std::to_string(value.runningDuration.count());
129 totalJq.runningDuration += value.runningDuration;
130 jobQueueObj[JobTypes::name(proc.first)] = j;
131 }
132
133 if (totalJq.queued != 0u)
134 {
135 Json::Value totalJqJson(Json::objectValue);
136 totalJqJson[jss::queued] = std::to_string(totalJq.queued);
137 totalJqJson[jss::started] = std::to_string(totalJq.started);
138 totalJqJson[jss::finished] = std::to_string(totalJq.finished);
139 totalJqJson[jss::queued_duration_us] = std::to_string(totalJq.queuedDuration.count());
140 totalJqJson[jss::running_duration_us] = std::to_string(totalJq.runningDuration.count());
141 jobQueueObj[jss::total] = totalJqJson;
142 }
143
145 // Be kind to reporting tools and let them expect rpc and jq objects
146 // even if empty.
147 counters[jss::rpc] = rpcobj;
148 counters[jss::job_queue] = jobQueueObj;
149 return counters;
150}
151
154{
155 auto const present = steady_clock::now();
156
157 Json::Value jobsArray(Json::arrayValue);
158 auto const jobs = [this] {
159 std::lock_guard const lock(jobsMutex_);
160 return jobs_;
161 }();
162
163 for (auto const& j : jobs)
164 {
165 if (j.first == jtINVALID)
166 continue;
168 jobj[jss::job] = JobTypes::name(j.first);
169 jobj[jss::duration_us] =
170 std::to_string(std::chrono::duration_cast<microseconds>(present - j.second).count());
171 jobsArray.append(jobj);
172 }
173
174 Json::Value methodsArray(Json::arrayValue);
176 {
177 std::lock_guard const lock(methodsMutex_);
178 methods.reserve(methods_.size());
179 for (auto const& m : methods_)
180 methods.push_back(m.second);
181 }
182 for (auto m : methods)
183 {
185 methodobj[jss::method] = m.first;
186 methodobj[jss::duration_us] =
187 std::to_string(std::chrono::duration_cast<microseconds>(present - m.second).count());
188 methodsArray.append(methodobj);
189 }
190
192 current[jss::jobs] = jobsArray;
193 current[jss::methods] = methodsArray;
194 return current;
195}
196
197//-----------------------------------------------------------------------------
198
199void
201{
202 if (setup_.perfLog.empty())
203 return;
204
205 if (logFile_.is_open())
206 logFile_.close();
207
208 auto logDir = setup_.perfLog.parent_path();
209 if (!boost::filesystem::is_directory(logDir))
210 {
211 boost::system::error_code ec;
212 boost::filesystem::create_directories(logDir, ec);
213 if (ec)
214 {
215 JLOG(j_.fatal()) << "Unable to create performance log "
216 "directory "
217 << logDir << ": " << ec.message();
218 signalStop_();
219 return;
220 }
221 }
222
223 logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
224
225 if (!logFile_)
226 {
227 JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog << ".";
228 signalStop_();
229 }
230}
231
232void
234{
237
238 while (true)
239 {
240 {
242 if (cond_.wait_until(lock, lastLog_ + setup_.logInterval, [&] { return stop_; }))
243 {
244 return;
245 }
246 if (rotate_)
247 {
248 openLog();
249 rotate_ = false;
250 }
251 }
252 report();
253 }
254}
255
256void
258{
259 if (!logFile_)
260 {
261 // If logFile_ is not writable do no further work.
262 return;
263 }
264
265 auto const present = system_clock::now();
266 if (present < lastLog_ + setup_.logInterval)
267 return;
268 lastLog_ = present;
269
271 report[jss::time] = to_string(std::chrono::floor<microseconds>(present));
272 {
274 report[jss::workers] = static_cast<unsigned int>(counters_.jobs_.size());
275 }
276 report[jss::hostid] = hostname_;
277 report[jss::counters] = counters_.countersJson();
278 report[jss::nodestore] = Json::objectValue;
279 app_.getNodeStore().getCountsJson(report[jss::nodestore]);
280 report[jss::current_activities] = counters_.currentJson();
282
283 logFile_ << Json::Compact{std::move(report)} << std::endl;
284}
285
287 Setup const& setup,
288 Application& app,
289 beast::Journal journal,
290 std::function<void()>&& signalStop)
291 : setup_(setup), app_(app), j_(journal), signalStop_(std::move(signalStop))
292{
293 openLog();
294}
295
297{
298 stop();
299}
300
301void
302PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
303{
304 auto counter = counters_.rpc_.find(method);
305 if (counter == counters_.rpc_.end())
306 {
307 // LCOV_EXCL_START
308 UNREACHABLE("xrpl::perf::PerfLogImp::rpcStart : valid method input");
309 return;
310 // LCOV_EXCL_STOP
311 }
312
313 {
314 std::lock_guard const lock(counter->second.mutex);
315 ++counter->second.value.started;
316 }
318 counters_.methods_[requestId] = {counter->first.c_str(), steady_clock::now()};
319}
320
321void
322PerfLogImp::rpcEnd(std::string const& method, std::uint64_t const requestId, bool finish)
323{
324 auto counter = counters_.rpc_.find(method);
325 if (counter == counters_.rpc_.end())
326 {
327 // LCOV_EXCL_START
328 UNREACHABLE("xrpl::perf::PerfLogImp::rpcEnd : valid method input");
329 return;
330 // LCOV_EXCL_STOP
331 }
332 steady_time_point startTime;
333 {
335 auto const e = counters_.methods_.find(requestId);
336 if (e != counters_.methods_.end())
337 {
338 startTime = e->second.second;
339 counters_.methods_.erase(e);
340 }
341 else
342 {
343 // LCOV_EXCL_START
344 UNREACHABLE("xrpl::perf::PerfLogImp::rpcEnd : valid requestId input");
345 // LCOV_EXCL_STOP
346 }
347 }
348 std::lock_guard const lock(counter->second.mutex);
349 if (finish)
350 {
351 ++counter->second.value.finished;
352 }
353 else
354 {
355 ++counter->second.value.errored;
356 }
357 counter->second.value.duration +=
358 std::chrono::duration_cast<microseconds>(steady_clock::now() - startTime);
359}
360
361void
363{
364 auto counter = counters_.jq_.find(type);
365 if (counter == counters_.jq_.end())
366 {
367 // LCOV_EXCL_START
368 UNREACHABLE("xrpl::perf::PerfLogImp::jobQueue : valid job type input");
369 return;
370 // LCOV_EXCL_STOP
371 }
372 std::lock_guard const lock(counter->second.mutex);
373 ++counter->second.value.queued;
374}
375
376void
378 JobType const type,
379 microseconds dur,
380 steady_time_point startTime,
381 int instance)
382{
383 auto counter = counters_.jq_.find(type);
384 if (counter == counters_.jq_.end())
385 {
386 // LCOV_EXCL_START
387 UNREACHABLE("xrpl::perf::PerfLogImp::jobStart : valid job type input");
388 return;
389 // LCOV_EXCL_STOP
390 }
391
392 {
393 std::lock_guard const lock(counter->second.mutex);
394 ++counter->second.value.started;
395 counter->second.value.queuedDuration += dur;
396 }
398 if (instance >= 0 && instance < counters_.jobs_.size())
399 counters_.jobs_[instance] = {type, startTime};
400}
401
402void
403PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
404{
405 auto counter = counters_.jq_.find(type);
406 if (counter == counters_.jq_.end())
407 {
408 // LCOV_EXCL_START
409 UNREACHABLE("xrpl::perf::PerfLogImp::jobFinish : valid job type input");
410 return;
411 // LCOV_EXCL_STOP
412 }
413
414 {
415 std::lock_guard const lock(counter->second.mutex);
416 ++counter->second.value.finished;
417 counter->second.value.runningDuration += dur;
418 }
420 if (instance >= 0 && instance < counters_.jobs_.size())
422}
423
424void
425PerfLogImp::resizeJobs(int const resize)
426{
428 if (resize > counters_.jobs_.size())
430}
431
432void
434{
435 if (setup_.perfLog.empty())
436 return;
437
438 std::lock_guard const lock(mutex_);
439 rotate_ = true;
441}
442
443void
445{
446 if (!setup_.perfLog.empty())
448}
449
450void
452{
453 if (thread_.joinable())
454 {
455 {
456 std::lock_guard const lock(mutex_);
457 stop_ = true;
459 }
460 thread_.join();
461 }
462}
463
464//-----------------------------------------------------------------------------
465
467setup_PerfLog(Section const& section, boost::filesystem::path const& configDir)
468{
469 PerfLog::Setup setup;
470 std::string perfLog;
471 set(perfLog, "perf_log", section);
472 if (!perfLog.empty())
473 {
474 setup.perfLog = boost::filesystem::path(perfLog);
475 if (setup.perfLog.is_relative())
476 {
477 setup.perfLog = boost::filesystem::absolute(setup.perfLog, configDir);
478 }
479 }
480
481 std::uint64_t logInterval = 0;
482 if (get_if_exists(section, "log_interval", logInterval))
483 setup.logInterval = std::chrono::seconds(logInterval);
484 return setup;
485}
486
489 PerfLog::Setup const& setup,
490 Application& app,
491 beast::Journal journal,
492 std::function<void()>&& signalStop)
493{
494 return std::make_unique<PerfLogImp>(setup, app, journal, std::move(signalStop));
495}
496
497} // namespace perf
498} // namespace xrpl
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:40
Stream fatal() const
Definition Journal.h:325
Map::size_type size() const
Definition JobTypes.h:136
static std::string const & name(JobType jt)
Definition JobTypes.h:112
virtual void stateAccounting(Json::Value &obj)=0
void getCountsJson(Json::Value &obj)
Definition Database.cpp:236
Holds a collection of configuration values.
Definition BasicConfig.h:24
virtual NetworkOPs & getOPs()=0
virtual NodeStore::Database & getNodeStore()=0
std::condition_variable cond_
Definition PerfLogImp.h:109
void jobFinish(JobType const type, microseconds dur, int instance) override
Log job finishing.
void start() override
std::ofstream logFile_
Definition PerfLogImp.h:106
void rpcStart(std::string const &method, std::uint64_t const requestId) override
Log start of RPC call.
void jobQueue(JobType const type) override
Log queued job.
void resizeJobs(int const resize) override
Ensure enough room to store each currently executing job.
system_time_point lastLog_
Definition PerfLogImp.h:110
beast::Journal const j_
Definition PerfLogImp.h:103
void rpcEnd(std::string const &method, std::uint64_t const requestId, bool finish)
void stop() override
void rotate() override
Rotate perf log file.
std::function< void()> const signalStop_
Definition PerfLogImp.h:104
PerfLogImp(Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance) override
Log job executing.
std::string const hostname_
Definition PerfLogImp.h:111
std::chrono::time_point< steady_clock > steady_time_point
Definition PerfLog.h:35
T close(T... args)
T empty(T... args)
T endl(T... args)
T is_open(T... args)
T is_same_v
T join(T... args)
T joinable(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
std::unique_ptr< PerfLog > make_PerfLog(PerfLog::Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
PerfLog::Setup setup_PerfLog(Section const &section, boost::filesystem::path const &configDir)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
@ current
This was a new validation and was added.
JobType
Definition Job.h:14
@ jtINVALID
Definition Job.h:16
bool get_if_exists(Section const &section, std::string const &name, T &v)
T open(T... args)
T push_back(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
Job Queue task performance counters.
Definition PerfLogImp.h:74
RPC performance counters.
Definition PerfLogImp.h:60
std::unordered_map< JobType, Locked< Jq > > jq_
Definition PerfLogImp.h:88
std::vector< std::pair< JobType, steady_time_point > > jobs_
Definition PerfLogImp.h:89
Json::Value currentJson() const
std::unordered_map< std::uint64_t, MethodStart > methods_
Definition PerfLogImp.h:91
std::unordered_map< std::string, Locked< Rpc > > rpc_
Definition PerfLogImp.h:87
Json::Value countersJson() const
Counters(std::set< char const * > const &labels, JobTypes const &jobTypes)
Configuration from [perf] section of xrpld.cfg.
Definition PerfLog.h:45
boost::filesystem::path perfLog
Definition PerfLog.h:46
milliseconds logInterval
Definition PerfLog.h:48
T to_string(T... args)