xrpld
Loading...
Searching...
No Matches
StatsDCollector.cpp
1#include <xrpl/beast/insight/StatsDCollector.h>
2
3#include <xrpl/beast/core/List.h>
4#include <xrpl/beast/insight/CounterImpl.h>
5#include <xrpl/beast/insight/EventImpl.h>
6#include <xrpl/beast/insight/GaugeImpl.h>
7#include <xrpl/beast/insight/Hook.h>
8#include <xrpl/beast/insight/HookImpl.h>
9#include <xrpl/beast/insight/MeterImpl.h>
10#include <xrpl/beast/net/IPEndpoint.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/beast/utility/instrumentation.h>
13
14#include <boost/asio/basic_waitable_timer.hpp>
15#include <boost/asio/bind_executor.hpp>
16#include <boost/asio/buffer.hpp>
17#include <boost/asio/dispatch.hpp>
18#include <boost/asio/error.hpp>
19#include <boost/asio/executor_work_guard.hpp>
20#include <boost/asio/io_context.hpp>
21#include <boost/asio/ip/udp.hpp>
22#include <boost/asio/strand.hpp>
23#include <boost/system/detail/error_code.hpp>
24#include <boost/system/system_error.hpp>
25
26#include <chrono>
27#include <cstddef>
28#include <deque>
29#include <functional>
30#include <limits>
31#include <memory>
32#include <mutex>
33#include <optional>
34#include <sstream>
35#include <string>
36#include <thread>
37#include <utility>
38#include <vector>
39
40#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
41#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
42#endif
43
44namespace beast::insight {
45
46namespace detail {
47
49
50//------------------------------------------------------------------------------
51
52class StatsDMetricBase : public List<StatsDMetricBase>::Node
53{
54public:
55 virtual void
56 doProcess() = 0;
57 virtual ~StatsDMetricBase() = default;
58 StatsDMetricBase() = default;
61 operator=(StatsDMetricBase const&) = delete;
62};
63
64//------------------------------------------------------------------------------
65
67{
68public:
70
71 ~StatsDHookImpl() override;
72
73 void
74 doProcess() override;
75
77 operator=(StatsDHookImpl const&) = delete;
78
79private:
82};
83
84//------------------------------------------------------------------------------
85
87{
88public:
90
91 ~StatsDCounterImpl() override;
92
93 void
94 increment(CounterImpl::value_type amount) override;
95
96 void
97 flush();
98 void
100 void
101 doProcess() override;
102
104 operator=(StatsDCounterImpl const&) = delete;
105
106private:
110 bool dirty_{false};
111};
112
113//------------------------------------------------------------------------------
114
116{
117public:
119
120 ~StatsDEventImpl() override = default;
121
122 void
123 notify(EventImpl::value_type const& value) override;
124
125 void
126 doNotify(EventImpl::value_type const& value);
127 void
129
130private:
133
136};
137
138//------------------------------------------------------------------------------
139
141{
142public:
144
145 ~StatsDGaugeImpl() override;
146
147 void
148 set(GaugeImpl::value_type value) override;
149 void
150 increment(GaugeImpl::difference_type amount) override;
151
152 void
153 flush();
154 void
156 void
158 void
159 doProcess() override;
160
162 operator=(StatsDGaugeImpl const&) = delete;
163
164private:
169 bool dirty_{false};
170};
171
172//------------------------------------------------------------------------------
173
175{
176public:
178
179 ~StatsDMeterImpl() override;
180
181 void
182 increment(MeterImpl::value_type amount) override;
183
184 void
185 flush();
186 void
188 void
189 doProcess() override;
190
192 operator=(StatsDMeterImpl const&) = delete;
193
194private:
198 bool dirty_{false};
199};
200
201//------------------------------------------------------------------------------
202
204 public std::enable_shared_from_this<StatsDCollectorImp>
205{
206private:
207 static constexpr auto kMaxPacketSize = 1472;
208
212 boost::asio::io_context ioContext_;
214 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
215 boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
216 boost::asio::ip::udp::socket socket_;
220
221 // Must come last for order of init
223
224 static boost::asio::ip::udp::endpoint
226 {
227 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
228 }
229
230public:
232 : journal_(journal)
233 , address_(std::move(address))
234 , prefix_(std::move(prefix))
235 , work_(boost::asio::make_work_guard(ioContext_))
236 , strand_(boost::asio::make_strand(ioContext_))
240 {
241 }
242
244 {
245 try
246 {
247 timer_.cancel();
248 }
249 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
250 {
251 // ignored
252 }
253
254 work_.reset();
255 thread_.join();
256 }
257
258 Hook
259 makeHook(HookImpl::HandlerType const& handler) override
260 {
262 }
263
264 Counter
265 makeCounter(std::string const& name) override
266 {
268 }
269
270 Event
271 makeEvent(std::string const& name) override
272 {
274 }
275
276 Gauge
277 makeGauge(std::string const& name) override
278 {
280 }
281
282 Meter
283 makeMeter(std::string const& name) override
284 {
286 }
287
288 //--------------------------------------------------------------------------
289
290 void
292 {
294 metrics_.pushBack(metric);
295 }
296
297 void
299 {
301 metrics_.erase(metrics_.iteratorTo(metric));
302 }
303
304 //--------------------------------------------------------------------------
305
306 boost::asio::io_context&
308 {
309 return ioContext_;
310 }
311
312 std::string const&
313 prefix() const
314 {
315 return prefix_;
316 }
317
318 void
320 {
321 data_.emplace_back(buffer);
322 }
323
324 void
326 {
327 boost::asio::dispatch(
329 boost::asio::bind_executor(
330 strand_, std::bind(&StatsDCollectorImp::doPostBuffer, this, std::move(buffer))));
331 }
332
333 // The keepAlive parameter makes sure the buffers sent to
334 // boost::asio::async_send do not go away until the call is finished
335 void
338 boost::system::error_code ec,
340 {
341 if (ec == boost::asio::error::operation_aborted)
342 return;
343
344 if (ec)
345 {
346 if (auto stream = journal_.error())
347 stream << "async_send failed: " << ec.message();
348 return;
349 }
350 }
351
352 static void
354 {
355 (void)buffers;
356#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
357 for (auto const& buffer : buffers)
358 {
359 std::string const s(buffer.data(), boost::asio::buffer_size(buffer));
360 std::cerr << s;
361 }
362 std::cerr << '\n';
363#endif
364 }
365
366 // Send what we have
367 void
369 {
370 if (data_.empty())
371 return;
372
373 // Break up the array of strings into blocks
374 // that each fit into one UDP packet.
375 //
377 buffers.reserve(data_.size());
378 std::size_t size(0);
379
380 auto keepAlive = std::make_shared<std::deque<std::string>>(std::move(data_));
381 data_.clear();
382
383 for (auto const& s : *keepAlive)
384 {
385 std::size_t const length(s.size());
386 XRPL_ASSERT(
387 !s.empty(),
388 "beast::insight::detail::StatsDCollectorImp::sendBuffers : "
389 "non-empty payload");
390 if (!buffers.empty() && (size + length) > kMaxPacketSize)
391 {
392 log(buffers);
393 socket_.async_send(
394 buffers,
395 std::bind(
397 this,
398 keepAlive,
399 std::placeholders::_1,
400 std::placeholders::_2));
401 buffers.clear();
402 size = 0;
403 }
404
405 buffers.emplace_back(&s[0], length);
406 size += length;
407 }
408
409 if (!buffers.empty())
410 {
411 log(buffers);
412 socket_.async_send(
413 buffers,
414 std::bind(
416 this,
417 keepAlive,
418 std::placeholders::_1,
419 std::placeholders::_2));
420 }
421 }
422
423 void
425 {
426 using namespace std::chrono_literals;
427 timer_.expires_after(1s);
428 timer_.async_wait(std::bind(&StatsDCollectorImp::onTimer, this, std::placeholders::_1));
429 }
430
431 void
432 onTimer(boost::system::error_code ec)
433 {
434 if (ec == boost::asio::error::operation_aborted)
435 return;
436
437 if (ec)
438 {
439 if (auto stream = journal_.error())
440 stream << "onTimer failed: " << ec.message();
441 return;
442 }
443
445
446 for (auto& m : metrics_)
447 m.doProcess();
448
449 sendBuffers();
450
451 setTimer();
452 }
453
454 void
456 {
457 boost::system::error_code ec;
458
459 if (socket_.connect(toEndpoint(address_), ec))
460 {
461 if (auto stream = journal_.error())
462 stream << "Connect failed: " << ec.message();
463 return;
464 }
465
466 setTimer();
467
468 ioContext_.run();
469
470 // NOLINTNEXTLINE(bugprone-unused-return-value)
471 socket_.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
472
473 socket_.close();
474
475 ioContext_.poll();
476 }
477};
478
479//------------------------------------------------------------------------------
480
482 : impl_(std::move(impl)), handler_(std::move(handler))
483{
484 impl_->add(*this);
485}
486
488{
489 impl_->remove(*this);
490}
491
492void
497
498//------------------------------------------------------------------------------
499
501 : impl_(std::move(impl)), name_(std::move(name))
502{
503 impl_->add(*this);
504}
505
507{
508 impl_->remove(*this);
509}
510
511void
513{
514 boost::asio::dispatch(
515 impl_->getIoContext(),
516 std::bind(
519 amount));
520}
521
522void
524{
525 if (dirty_)
526 {
527 dirty_ = false;
529 ss << impl_->prefix() << "." << name_ << ":" << value_ << "|c"
530 << "\n";
531 value_ = 0;
532 impl_->postBuffer(ss.str());
533 }
534}
535
536void
538{
539 value_ += amount;
540 dirty_ = true;
541}
542
543void
548
549//------------------------------------------------------------------------------
550
555
556void
558{
559 boost::asio::dispatch(
560 impl_->getIoContext(),
561 std::bind(
564 value));
565}
566
567void
569{
571 ss << impl_->prefix() << "." << name_ << ":" << value.count() << "|ms"
572 << "\n";
573 impl_->postBuffer(ss.str());
574}
575
576//------------------------------------------------------------------------------
577
579 : impl_(std::move(impl)), name_(std::move(name))
580{
581 impl_->add(*this);
582}
583
585{
586 impl_->remove(*this);
587}
588
589void
591{
592 boost::asio::dispatch(
593 impl_->getIoContext(),
594 std::bind(
597 value));
598}
599
600void
602{
603 boost::asio::dispatch(
604 impl_->getIoContext(),
605 std::bind(
608 amount));
609}
610
611void
613{
614 if (dirty_)
615 {
616 dirty_ = false;
618 ss << impl_->prefix() << "." << name_ << ":" << value_ << "|g"
619 << "\n";
620 impl_->postBuffer(ss.str());
621 }
622}
623
624void
626{
627 value_ = value;
628
629 if (value_ != lastValue_)
630 {
632 dirty_ = true;
633 }
634}
635
636void
638{
640
641 if (amount > 0)
642 {
643 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(amount));
646 : d;
647 }
648 else if (amount < 0)
649 {
650 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(-amount));
651 value = (d >= value) ? 0 : value - d;
652 }
653
654 doSet(value);
655}
656
657void
662
663//------------------------------------------------------------------------------
664
666 : impl_(std::move(impl)), name_(std::move(name))
667{
668 impl_->add(*this);
669}
670
672{
673 impl_->remove(*this);
674}
675
676void
678{
679 boost::asio::dispatch(
680 impl_->getIoContext(),
681 std::bind(
684 amount));
685}
686
687void
689{
690 if (dirty_)
691 {
692 dirty_ = false;
694 ss << impl_->prefix() << "." << name_ << ":" << value_ << "|m"
695 << "\n";
696 value_ = 0;
697 impl_->postBuffer(ss.str());
698 }
699}
700
701void
703{
704 value_ += amount;
705 dirty_ = true;
706}
707
708void
713
714} // namespace detail
715
716//------------------------------------------------------------------------------
717
719StatsDCollector::make(IP::Endpoint const& address, std::string const& prefix, Journal journal)
720{
721 return std::make_shared<detail::StatsDCollectorImp>(address, prefix, journal);
722}
723
724} // namespace beast::insight
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:17
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:54
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:40
A generic endpoint for log messages.
Definition Journal.h:38
Intrusive doubly linked list.
Definition List.h:260
A metric for measuring an integral value.
Definition Counter.h:19
std::chrono::milliseconds value_type
Definition EventImpl.h:13
A metric for reporting event timing.
Definition Event.h:21
std::uint64_t value_type
Definition GaugeImpl.h:13
std::int64_t difference_type
Definition GaugeImpl.h:14
A metric for measuring an integral value.
Definition Gauge.h:20
std::function< void(void)> HandlerType
Definition HookImpl.h:11
A reference to a handler for performing polled collection.
Definition Hook.h:12
std::uint64_t value_type
Definition MeterImpl.h:13
A metric for measuring an integral value.
Definition Meter.h:18
static std::shared_ptr< StatsDCollector > make(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
void doPostBuffer(std::string const &buffer)
Gauge makeGauge(std::string const &name) override
Create a gauge with the specified name.
void onTimer(boost::system::error_code ec)
static boost::asio::ip::udp::endpoint toEndpoint(IP::Endpoint const &ep)
Hook makeHook(HookImpl::HandlerType const &handler) override
Counter makeCounter(std::string const &name) override
Create a counter with the specified name.
Meter makeMeter(std::string const &name) override
Create a meter with the specified name.
Event makeEvent(std::string const &name) override
Create an event with the specified name.
StatsDCollectorImp(IP::Endpoint address, std::string prefix, Journal journal)
void onSend(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
static void log(std::vector< boost::asio::const_buffer > const &buffers)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
StatsDCounterImpl & operator=(StatsDCounterImpl const &)=delete
void doIncrement(CounterImpl::value_type amount)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
std::shared_ptr< StatsDCollectorImp > impl_
void notify(EventImpl::value_type const &value) override
StatsDEventImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
StatsDEventImpl & operator=(StatsDEventImpl const &)
void doNotify(EventImpl::value_type const &value)
std::shared_ptr< StatsDCollectorImp > impl_
void increment(GaugeImpl::difference_type amount) override
void set(GaugeImpl::value_type value) override
void doSet(GaugeImpl::value_type value)
std::shared_ptr< StatsDCollectorImp > impl_
void doIncrement(GaugeImpl::difference_type amount)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)=delete
StatsDGaugeImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
StatsDHookImpl(HandlerType handler, std::shared_ptr< StatsDCollectorImp > impl)
std::shared_ptr< StatsDCollectorImp > impl_
StatsDHookImpl & operator=(StatsDHookImpl const &)=delete
void doIncrement(MeterImpl::value_type amount)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)=delete
StatsDMeterImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
void increment(MeterImpl::value_type amount) override
std::shared_ptr< StatsDCollectorImp > impl_
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T make_shared(T... args)
T max(T... args)
STL namespace.
T static_pointer_cast(T... args)
T reserve(T... args)
T str(T... args)