rippled
Loading...
Searching...
No Matches
StatsDCollector.cpp
1#include <xrpl/beast/core/List.h>
2#include <xrpl/beast/insight/CounterImpl.h>
3#include <xrpl/beast/insight/EventImpl.h>
4#include <xrpl/beast/insight/GaugeImpl.h>
5#include <xrpl/beast/insight/Hook.h>
6#include <xrpl/beast/insight/HookImpl.h>
7#include <xrpl/beast/insight/MeterImpl.h>
8#include <xrpl/beast/insight/StatsDCollector.h>
9#include <xrpl/beast/net/IPEndpoint.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/beast/utility/instrumentation.h>
12
13#include <boost/asio/basic_waitable_timer.hpp>
14#include <boost/asio/bind_executor.hpp>
15#include <boost/asio/buffer.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/executor_work_guard.hpp>
18#include <boost/asio/io_context.hpp>
19#include <boost/asio/ip/udp.hpp>
20#include <boost/asio/strand.hpp>
21#include <boost/system/detail/error_code.hpp>
22
23#include <chrono>
24#include <cstddef>
25#include <deque>
26#include <functional>
27#include <limits>
28#include <memory>
29#include <mutex>
30#include <optional>
31#include <sstream>
32#include <string>
33#include <thread>
34#include <utility>
35#include <vector>
36
37#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
38#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
39#endif
40
41namespace beast {
42namespace insight {
43
44namespace detail {
45
46class StatsDCollectorImp;
47
48//------------------------------------------------------------------------------
49
50class StatsDMetricBase : public List<StatsDMetricBase>::Node
51{
52public:
53 virtual void
55 virtual ~StatsDMetricBase() = default;
56 StatsDMetricBase() = default;
59 operator=(StatsDMetricBase const&) = delete;
60};
61
62//------------------------------------------------------------------------------
63
65{
66public:
68 HandlerType const& handler,
70
71 ~StatsDHookImpl() override;
72
73 void
74 do_process() override;
75
76private:
79
82};
83
84//------------------------------------------------------------------------------
85
87{
88public:
90 std::string const& name,
92
93 ~StatsDCounterImpl() override;
94
95 void
96 increment(CounterImpl::value_type amount) override;
97
98 void
99 flush();
100 void
102 void
103 do_process() override;
104
105private:
108
113};
114
115//------------------------------------------------------------------------------
116
118{
119public:
121 std::string const& name,
123
124 ~StatsDEventImpl() = default;
125
126 void
127 notify(EventImpl::value_type const& value) override;
128
129 void
130 do_notify(EventImpl::value_type const& value);
131 void
133
134private:
137
140};
141
142//------------------------------------------------------------------------------
143
145{
146public:
148 std::string const& name,
150
151 ~StatsDGaugeImpl() override;
152
153 void
154 set(GaugeImpl::value_type value) override;
155 void
156 increment(GaugeImpl::difference_type amount) override;
157
158 void
159 flush();
160 void
162 void
164 void
165 do_process() override;
166
167private:
170
176};
177
178//------------------------------------------------------------------------------
179
181{
182public:
183 explicit StatsDMeterImpl(
184 std::string const& name,
186
187 ~StatsDMeterImpl() override;
188
189 void
190 increment(MeterImpl::value_type amount) override;
191
192 void
193 flush();
194 void
196 void
197 do_process() override;
198
199private:
202
207};
208
209//------------------------------------------------------------------------------
210
212 : public StatsDCollector,
213 public std::enable_shared_from_this<StatsDCollectorImp>
214{
215private:
216 enum {
217 // max_packet_size = 484
218 max_packet_size = 1472
219 };
220
224 boost::asio::io_context m_io_context;
225 std::optional<boost::asio::executor_work_guard<
226 boost::asio::io_context::executor_type>>
228 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
229 boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
230 boost::asio::ip::udp::socket m_socket;
234
235 // Must come last for order of init
237
238 static boost::asio::ip::udp::endpoint
240 {
241 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
242 }
243
244public:
246 IP::Endpoint const& address,
247 std::string const& prefix,
248 Journal journal)
249 : m_journal(journal)
250 , m_address(address)
252 , m_work(boost::asio::make_work_guard(m_io_context))
253 , m_strand(boost::asio::make_strand(m_io_context))
257 {
258 }
259
261 {
262 try
263 {
264 m_timer.cancel();
265 }
266 catch (boost::system::system_error const&)
267 {
268 // ignored
269 }
270
271 m_work.reset();
272 m_thread.join();
273 }
274
275 Hook
276 make_hook(HookImpl::HandlerType const& handler) override
277 {
279 handler, shared_from_this()));
280 }
281
282 Counter
283 make_counter(std::string const& name) override
284 {
286 name, shared_from_this()));
287 }
288
289 Event
290 make_event(std::string const& name) override
291 {
293 name, shared_from_this()));
294 }
295
296 Gauge
297 make_gauge(std::string const& name) override
298 {
300 name, shared_from_this()));
301 }
302
303 Meter
304 make_meter(std::string const& name) override
305 {
307 name, shared_from_this()));
308 }
309
310 //--------------------------------------------------------------------------
311
312 void
314 {
316 metrics_.push_back(metric);
317 }
318
319 void
321 {
323 metrics_.erase(metrics_.iterator_to(metric));
324 }
325
326 //--------------------------------------------------------------------------
327
328 boost::asio::io_context&
330 {
331 return m_io_context;
332 }
333
334 std::string const&
335 prefix() const
336 {
337 return m_prefix;
338 }
339
340 void
342 {
343 m_data.emplace_back(buffer);
344 }
345
346 void
348 {
349 boost::asio::dispatch(
351 boost::asio::bind_executor(
352 m_strand,
353 std::bind(
355 this,
356 std::move(buffer))));
357 }
358
359 // The keepAlive parameter makes sure the buffers sent to
360 // boost::asio::async_send do not go away until the call is finished
361 void
364 boost::system::error_code ec,
366 {
367 if (ec == boost::asio::error::operation_aborted)
368 return;
369
370 if (ec)
371 {
372 if (auto stream = m_journal.error())
373 stream << "async_send failed: " << ec.message();
374 return;
375 }
376 }
377
378 void
380 {
381 (void)buffers;
382#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
383 for (auto const& buffer : buffers)
384 {
385 std::string const s(
386 buffer.data(), boost::asio::buffer_size(buffer));
387 std::cerr << s;
388 }
389 std::cerr << '\n';
390#endif
391 }
392
393 // Send what we have
394 void
396 {
397 if (m_data.empty())
398 return;
399
400 // Break up the array of strings into blocks
401 // that each fit into one UDP packet.
402 //
404 buffers.reserve(m_data.size());
405 std::size_t size(0);
406
407 auto keepAlive =
409 m_data.clear();
410
411 for (auto const& s : *keepAlive)
412 {
413 std::size_t const length(s.size());
414 XRPL_ASSERT(
415 !s.empty(),
416 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
417 "non-empty payload");
418 if (!buffers.empty() && (size + length) > max_packet_size)
419 {
420 log(buffers);
421 m_socket.async_send(
422 buffers,
423 std::bind(
425 this,
426 keepAlive,
427 std::placeholders::_1,
428 std::placeholders::_2));
429 buffers.clear();
430 size = 0;
431 }
432
433 buffers.emplace_back(&s[0], length);
434 size += length;
435 }
436
437 if (!buffers.empty())
438 {
439 log(buffers);
440 m_socket.async_send(
441 buffers,
442 std::bind(
444 this,
445 keepAlive,
446 std::placeholders::_1,
447 std::placeholders::_2));
448 }
449 }
450
451 void
453 {
454 using namespace std::chrono_literals;
455 m_timer.expires_after(1s);
456 m_timer.async_wait(std::bind(
457 &StatsDCollectorImp::on_timer, this, std::placeholders::_1));
458 }
459
460 void
461 on_timer(boost::system::error_code ec)
462 {
463 if (ec == boost::asio::error::operation_aborted)
464 return;
465
466 if (ec)
467 {
468 if (auto stream = m_journal.error())
469 stream << "on_timer failed: " << ec.message();
470 return;
471 }
472
474
475 for (auto& m : metrics_)
476 m.do_process();
477
478 send_buffers();
479
480 set_timer();
481 }
482
483 void
485 {
486 boost::system::error_code ec;
487
488 if (m_socket.connect(to_endpoint(m_address), ec))
489 {
490 if (auto stream = m_journal.error())
491 stream << "Connect failed: " << ec.message();
492 return;
493 }
494
495 set_timer();
496
497 m_io_context.run();
498
499 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
500
501 m_socket.close();
502
503 m_io_context.poll();
504 }
505};
506
507//------------------------------------------------------------------------------
508
510 HandlerType const& handler,
512 : m_impl(impl), m_handler(handler)
513{
514 m_impl->add(*this);
515}
516
518{
519 m_impl->remove(*this);
520}
521
522void
527
528//------------------------------------------------------------------------------
529
531 std::string const& name,
533 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
534{
535 m_impl->add(*this);
536}
537
539{
540 m_impl->remove(*this);
541}
542
543void
545{
546 boost::asio::dispatch(
547 m_impl->get_io_context(),
548 std::bind(
551 amount));
552}
553
554void
556{
557 if (m_dirty)
558 {
559 m_dirty = false;
561 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|c"
562 << "\n";
563 m_value = 0;
564 m_impl->post_buffer(ss.str());
565 }
566}
567
568void
570{
571 m_value += amount;
572 m_dirty = true;
573}
574
575void
580
581//------------------------------------------------------------------------------
582
584 std::string const& name,
586 : m_impl(impl), m_name(name)
587{
588}
589
590void
592{
593 boost::asio::dispatch(
594 m_impl->get_io_context(),
595 std::bind(
598 value));
599}
600
601void
603{
605 ss << m_impl->prefix() << "." << m_name << ":" << value.count() << "|ms"
606 << "\n";
607 m_impl->post_buffer(ss.str());
608}
609
610//------------------------------------------------------------------------------
611
613 std::string const& name,
615 : m_impl(impl), m_name(name), m_last_value(0), m_value(0), m_dirty(false)
616{
617 m_impl->add(*this);
618}
619
621{
622 m_impl->remove(*this);
623}
624
625void
627{
628 boost::asio::dispatch(
629 m_impl->get_io_context(),
630 std::bind(
633 value));
634}
635
636void
638{
639 boost::asio::dispatch(
640 m_impl->get_io_context(),
641 std::bind(
644 amount));
645}
646
647void
649{
650 if (m_dirty)
651 {
652 m_dirty = false;
654 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|g"
655 << "\n";
656 m_impl->post_buffer(ss.str());
657 }
658}
659
660void
662{
663 m_value = value;
664
665 if (m_value != m_last_value)
666 {
668 m_dirty = true;
669 }
670}
671
672void
674{
676
677 if (amount > 0)
678 {
679 GaugeImpl::value_type const d(
680 static_cast<GaugeImpl::value_type>(amount));
681 value +=
684 : d;
685 }
686 else if (amount < 0)
687 {
688 GaugeImpl::value_type const d(
689 static_cast<GaugeImpl::value_type>(-amount));
690 value = (d >= value) ? 0 : value - d;
691 }
692
693 do_set(value);
694}
695
696void
701
702//------------------------------------------------------------------------------
703
705 std::string const& name,
707 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
708{
709 m_impl->add(*this);
710}
711
713{
714 m_impl->remove(*this);
715}
716
717void
719{
720 boost::asio::dispatch(
721 m_impl->get_io_context(),
722 std::bind(
725 amount));
726}
727
728void
730{
731 if (m_dirty)
732 {
733 m_dirty = false;
735 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|m"
736 << "\n";
737 m_value = 0;
738 m_impl->post_buffer(ss.str());
739 }
740}
741
742void
744{
745 m_value += amount;
746 m_dirty = true;
747}
748
749void
754
755} // namespace detail
756
757//------------------------------------------------------------------------------
758
761 IP::Endpoint const& address,
762 std::string const& prefix,
763 Journal journal)
764{
766 address, prefix, journal);
767}
768
769} // namespace insight
770} // namespace beast
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:56
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:42
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Intrusive doubly linked list.
Definition List.h:260
A metric for measuring an integral value.
Definition Counter.h:20
A metric for reporting event timing.
Definition Event.h:22
A metric for measuring an integral value.
Definition Gauge.h:21
A reference to a handler for performing polled collection.
Definition Hook.h:13
A metric for measuring an integral value.
Definition Meter.h:19
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > m_work
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
Hook make_hook(HookImpl::HandlerType const &handler) override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
void log(std::vector< boost::asio::const_buffer > const &buffers)
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
void do_increment(GaugeImpl::difference_type amount)
void increment(GaugeImpl::difference_type amount) override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T max(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)