rippled
Loading...
Searching...
No Matches
StatsDCollector.cpp
1#include <xrpl/beast/core/List.h>
2#include <xrpl/beast/insight/CounterImpl.h>
3#include <xrpl/beast/insight/EventImpl.h>
4#include <xrpl/beast/insight/GaugeImpl.h>
5#include <xrpl/beast/insight/Hook.h>
6#include <xrpl/beast/insight/HookImpl.h>
7#include <xrpl/beast/insight/MeterImpl.h>
8#include <xrpl/beast/insight/StatsDCollector.h>
9#include <xrpl/beast/net/IPEndpoint.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/beast/utility/instrumentation.h>
12
13#include <boost/asio/basic_waitable_timer.hpp>
14#include <boost/asio/bind_executor.hpp>
15#include <boost/asio/buffer.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/executor_work_guard.hpp>
18#include <boost/asio/io_context.hpp>
19#include <boost/asio/ip/udp.hpp>
20#include <boost/asio/strand.hpp>
21#include <boost/system/detail/error_code.hpp>
22
23#include <chrono>
24#include <cstddef>
25#include <deque>
26#include <functional>
27#include <limits>
28#include <memory>
29#include <mutex>
30#include <optional>
31#include <sstream>
32#include <string>
33#include <thread>
34#include <utility>
35#include <vector>
36
37#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
38#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
39#endif
40
41namespace beast {
42namespace insight {
43
44namespace detail {
45
46class StatsDCollectorImp;
47
48//------------------------------------------------------------------------------
49
50class StatsDMetricBase : public List<StatsDMetricBase>::Node
51{
52public:
53 virtual void
55 virtual ~StatsDMetricBase() = default;
56 StatsDMetricBase() = default;
59 operator=(StatsDMetricBase const&) = delete;
60};
61
62//------------------------------------------------------------------------------
63
65{
66public:
68
69 ~StatsDHookImpl() override;
70
71 void
72 do_process() override;
73
74private:
77
80};
81
82//------------------------------------------------------------------------------
83
85{
86public:
88
89 ~StatsDCounterImpl() override;
90
91 void
92 increment(CounterImpl::value_type amount) override;
93
94 void
95 flush();
96 void
98 void
99 do_process() override;
100
101private:
104
108 bool m_dirty{false};
109};
110
111//------------------------------------------------------------------------------
112
114{
115public:
117
118 ~StatsDEventImpl() = default;
119
120 void
121 notify(EventImpl::value_type const& value) override;
122
123 void
124 do_notify(EventImpl::value_type const& value);
125 void
127
128private:
131
134};
135
136//------------------------------------------------------------------------------
137
139{
140public:
142
143 ~StatsDGaugeImpl() override;
144
145 void
146 set(GaugeImpl::value_type value) override;
147 void
148 increment(GaugeImpl::difference_type amount) override;
149
150 void
151 flush();
152 void
154 void
156 void
157 do_process() override;
158
159private:
162
167 bool m_dirty{false};
168};
169
170//------------------------------------------------------------------------------
171
173{
174public:
175 explicit StatsDMeterImpl(
176 std::string const& name,
178
179 ~StatsDMeterImpl() override;
180
181 void
182 increment(MeterImpl::value_type amount) override;
183
184 void
185 flush();
186 void
188 void
189 do_process() override;
190
191private:
194
198 bool m_dirty{false};
199};
200
201//------------------------------------------------------------------------------
202
204 public std::enable_shared_from_this<StatsDCollectorImp>
205{
206private:
207 enum {
208 // max_packet_size = 484
209 max_packet_size = 1472
210 };
211
215 boost::asio::io_context m_io_context;
217 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
218 boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
219 boost::asio::ip::udp::socket m_socket;
223
224 // Must come last for order of init
226
227 static boost::asio::ip::udp::endpoint
229 {
230 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
231 }
232
233public:
235 : m_journal(journal)
236 , m_address(address)
238 , m_work(boost::asio::make_work_guard(m_io_context))
239 , m_strand(boost::asio::make_strand(m_io_context))
243 {
244 }
245
247 {
248 try
249 {
250 m_timer.cancel();
251 }
252 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
253 {
254 // ignored
255 }
256
257 m_work.reset();
258 m_thread.join();
259 }
260
261 Hook
262 make_hook(HookImpl::HandlerType const& handler) override
263 {
265 }
266
267 Counter
268 make_counter(std::string const& name) override
269 {
271 }
272
273 Event
274 make_event(std::string const& name) override
275 {
277 }
278
279 Gauge
280 make_gauge(std::string const& name) override
281 {
283 }
284
285 Meter
286 make_meter(std::string const& name) override
287 {
289 }
290
291 //--------------------------------------------------------------------------
292
293 void
295 {
297 metrics_.push_back(metric);
298 }
299
300 void
302 {
304 metrics_.erase(metrics_.iterator_to(metric));
305 }
306
307 //--------------------------------------------------------------------------
308
309 boost::asio::io_context&
311 {
312 return m_io_context;
313 }
314
315 std::string const&
316 prefix() const
317 {
318 return m_prefix;
319 }
320
321 void
323 {
324 m_data.emplace_back(buffer);
325 }
326
327 void
329 {
330 boost::asio::dispatch(
332 boost::asio::bind_executor(
333 m_strand, std::bind(&StatsDCollectorImp::do_post_buffer, this, std::move(buffer))));
334 }
335
336 // The keepAlive parameter makes sure the buffers sent to
337 // boost::asio::async_send do not go away until the call is finished
338 void
341 boost::system::error_code ec,
343 {
344 if (ec == boost::asio::error::operation_aborted)
345 return;
346
347 if (ec)
348 {
349 if (auto stream = m_journal.error())
350 stream << "async_send failed: " << ec.message();
351 return;
352 }
353 }
354
355 static void
357 {
358 (void)buffers;
359#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
360 for (auto const& buffer : buffers)
361 {
362 std::string const s(buffer.data(), boost::asio::buffer_size(buffer));
363 std::cerr << s;
364 }
365 std::cerr << '\n';
366#endif
367 }
368
369 // Send what we have
370 void
372 {
373 if (m_data.empty())
374 return;
375
376 // Break up the array of strings into blocks
377 // that each fit into one UDP packet.
378 //
380 buffers.reserve(m_data.size());
381 std::size_t size(0);
382
383 auto keepAlive = std::make_shared<std::deque<std::string>>(std::move(m_data));
384 m_data.clear();
385
386 for (auto const& s : *keepAlive)
387 {
388 std::size_t const length(s.size());
389 XRPL_ASSERT(
390 !s.empty(),
391 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
392 "non-empty payload");
393 if (!buffers.empty() && (size + length) > max_packet_size)
394 {
395 log(buffers);
396 m_socket.async_send(
397 buffers,
398 std::bind(
400 this,
401 keepAlive,
402 std::placeholders::_1,
403 std::placeholders::_2));
404 buffers.clear();
405 size = 0;
406 }
407
408 buffers.emplace_back(&s[0], length);
409 size += length;
410 }
411
412 if (!buffers.empty())
413 {
414 log(buffers);
415 m_socket.async_send(
416 buffers,
417 std::bind(
419 this,
420 keepAlive,
421 std::placeholders::_1,
422 std::placeholders::_2));
423 }
424 }
425
426 void
428 {
429 using namespace std::chrono_literals;
430 m_timer.expires_after(1s);
431 m_timer.async_wait(std::bind(&StatsDCollectorImp::on_timer, this, std::placeholders::_1));
432 }
433
434 void
435 on_timer(boost::system::error_code ec)
436 {
437 if (ec == boost::asio::error::operation_aborted)
438 return;
439
440 if (ec)
441 {
442 if (auto stream = m_journal.error())
443 stream << "on_timer failed: " << ec.message();
444 return;
445 }
446
448
449 for (auto& m : metrics_)
450 m.do_process();
451
452 send_buffers();
453
454 set_timer();
455 }
456
457 void
459 {
460 boost::system::error_code ec;
461
462 if (m_socket.connect(to_endpoint(m_address), ec))
463 {
464 if (auto stream = m_journal.error())
465 stream << "Connect failed: " << ec.message();
466 return;
467 }
468
469 set_timer();
470
471 m_io_context.run();
472
473 // NOLINTNEXTLINE(bugprone-unused-return-value)
474 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
475
476 m_socket.close();
477
478 m_io_context.poll();
479 }
480};
481
482//------------------------------------------------------------------------------
483
485 HandlerType const& handler,
487 : m_impl(impl), m_handler(handler)
488{
489 m_impl->add(*this);
490}
491
493{
494 m_impl->remove(*this);
495}
496
497void
502
503//------------------------------------------------------------------------------
504
506 std::string const& name,
508 : m_impl(impl), m_name(name)
509{
510 m_impl->add(*this);
511}
512
514{
515 m_impl->remove(*this);
516}
517
518void
520{
521 boost::asio::dispatch(
522 m_impl->get_io_context(),
523 std::bind(
526 amount));
527}
528
529void
531{
532 if (m_dirty)
533 {
534 m_dirty = false;
536 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|c"
537 << "\n";
538 m_value = 0;
539 m_impl->post_buffer(ss.str());
540 }
541}
542
543void
545{
546 m_value += amount;
547 m_dirty = true;
548}
549
550void
555
556//------------------------------------------------------------------------------
557
559 std::string const& name,
561 : m_impl(impl), m_name(name)
562{
563}
564
565void
567{
568 boost::asio::dispatch(
569 m_impl->get_io_context(),
570 std::bind(
573 value));
574}
575
576void
578{
580 ss << m_impl->prefix() << "." << m_name << ":" << value.count() << "|ms"
581 << "\n";
582 m_impl->post_buffer(ss.str());
583}
584
585//------------------------------------------------------------------------------
586
588 std::string const& name,
590 : m_impl(impl), m_name(name)
591{
592 m_impl->add(*this);
593}
594
596{
597 m_impl->remove(*this);
598}
599
600void
602{
603 boost::asio::dispatch(
604 m_impl->get_io_context(),
605 std::bind(
608 value));
609}
610
611void
613{
614 boost::asio::dispatch(
615 m_impl->get_io_context(),
616 std::bind(
619 amount));
620}
621
622void
624{
625 if (m_dirty)
626 {
627 m_dirty = false;
629 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|g"
630 << "\n";
631 m_impl->post_buffer(ss.str());
632 }
633}
634
635void
637{
638 m_value = value;
639
640 if (m_value != m_last_value)
641 {
643 m_dirty = true;
644 }
645}
646
647void
649{
651
652 if (amount > 0)
653 {
654 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(amount));
657 : d;
658 }
659 else if (amount < 0)
660 {
661 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(-amount));
662 value = (d >= value) ? 0 : value - d;
663 }
664
665 do_set(value);
666}
667
668void
673
674//------------------------------------------------------------------------------
675
677 std::string const& name,
679 : m_impl(impl), m_name(name)
680{
681 m_impl->add(*this);
682}
683
685{
686 m_impl->remove(*this);
687}
688
689void
691{
692 boost::asio::dispatch(
693 m_impl->get_io_context(),
694 std::bind(
697 amount));
698}
699
700void
702{
703 if (m_dirty)
704 {
705 m_dirty = false;
707 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|m"
708 << "\n";
709 m_value = 0;
710 m_impl->post_buffer(ss.str());
711 }
712}
713
714void
716{
717 m_value += amount;
718 m_dirty = true;
719}
720
721void
726
727} // namespace detail
728
729//------------------------------------------------------------------------------
730
732StatsDCollector::New(IP::Endpoint const& address, std::string const& prefix, Journal journal)
733{
734 return std::make_shared<detail::StatsDCollectorImp>(address, prefix, journal);
735}
736
737} // namespace insight
738} // namespace beast
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:55
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:41
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Intrusive doubly linked list.
Definition List.h:258
A metric for measuring an integral value.
Definition Counter.h:19
A metric for reporting event timing.
Definition Event.h:21
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
A metric for measuring an integral value.
Definition Meter.h:18
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > m_work
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
Hook make_hook(HookImpl::HandlerType const &handler) override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
static void log(std::vector< boost::asio::const_buffer > const &buffers)
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
void do_increment(GaugeImpl::difference_type amount)
void increment(GaugeImpl::difference_type amount) override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T max(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)