1#include <xrpl/beast/core/List.h>
2#include <xrpl/beast/insight/CounterImpl.h>
3#include <xrpl/beast/insight/EventImpl.h>
4#include <xrpl/beast/insight/GaugeImpl.h>
5#include <xrpl/beast/insight/Hook.h>
6#include <xrpl/beast/insight/HookImpl.h>
7#include <xrpl/beast/insight/MeterImpl.h>
8#include <xrpl/beast/insight/StatsDCollector.h>
9#include <xrpl/beast/net/IPEndpoint.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/beast/utility/instrumentation.h>
13#include <boost/asio/basic_waitable_timer.hpp>
14#include <boost/asio/bind_executor.hpp>
15#include <boost/asio/buffer.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/executor_work_guard.hpp>
18#include <boost/asio/io_context.hpp>
19#include <boost/asio/ip/udp.hpp>
20#include <boost/asio/strand.hpp>
21#include <boost/system/detail/error_code.hpp>
37#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
38#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
46class StatsDCollectorImp;
226 boost::asio::io_context::executor_type>>
228 boost::asio::strand<boost::asio::io_context::executor_type>
m_strand;
229 boost::asio::basic_waitable_timer<std::chrono::steady_clock>
m_timer;
238 static boost::asio::ip::udp::endpoint
241 return boost::asio::ip::udp::endpoint(ep.
address(), ep.
port());
266 catch (boost::system::system_error
const&)
328 boost::asio::io_context&
349 boost::asio::dispatch(
351 boost::asio::bind_executor(
356 std::move(buffer))));
364 boost::system::error_code ec,
367 if (ec == boost::asio::error::operation_aborted)
373 stream <<
"async_send failed: " << ec.message();
382#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
383 for (
auto const& buffer : buffers)
386 buffer.data(), boost::asio::buffer_size(buffer));
411 for (
auto const& s : *keepAlive)
416 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
417 "non-empty payload");
427 std::placeholders::_1,
428 std::placeholders::_2));
437 if (!buffers.
empty())
446 std::placeholders::_1,
447 std::placeholders::_2));
454 using namespace std::chrono_literals;
463 if (ec == boost::asio::error::operation_aborted)
469 stream <<
"on_timer failed: " << ec.message();
486 boost::system::error_code ec;
491 stream <<
"Connect failed: " << ec.message();
499 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
512 : m_impl(impl), m_handler(handler)
533 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
546 boost::asio::dispatch(
586 : m_impl(impl), m_name(name)
593 boost::asio::dispatch(
615 : m_impl(impl), m_name(name), m_last_value(0), m_value(0), m_dirty(false)
628 boost::asio::dispatch(
639 boost::asio::dispatch(
690 value = (d >= value) ? 0 : value - d;
707 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
720 boost::asio::dispatch(
766 address, prefix, journal);
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
Port port() const
Returns the port number on the endpoint.
A generic endpoint for log messages.
Intrusive doubly linked list.
A metric for measuring an integral value.
A metric for reporting event timing.
A metric for measuring an integral value.
A reference to a handler for performing polled collection.
A metric for measuring an integral value.
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
void remove(StatsDMetricBase &metric)
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
std::deque< std::string > m_data
void add(StatsDMetricBase &metric)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > m_work
std::recursive_mutex metricsLock_
boost::asio::ip::udp::socket m_socket
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
boost::asio::io_context & get_io_context()
std::string const & prefix() const
Hook make_hook(HookImpl::HandlerType const &handler) override
List< StatsDMetricBase > metrics_
~StatsDCollectorImp() override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
void log(std::vector< boost::asio::const_buffer > const &buffers)
boost::asio::io_context m_io_context
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void post_buffer(std::string &&buffer)
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
void do_process() override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
CounterImpl::value_type m_value
~StatsDCounterImpl() override
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
~StatsDEventImpl()=default
void do_increment(GaugeImpl::difference_type amount)
GaugeImpl::value_type m_last_value
void increment(GaugeImpl::difference_type amount) override
~StatsDGaugeImpl() override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
void do_process() override
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
GaugeImpl::value_type m_value
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
~StatsDHookImpl() override
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_process() override
~StatsDMeterImpl() override
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_process() override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
MeterImpl::value_type m_value
virtual ~StatsDMetricBase()=default
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase()=default
StatsDMetricBase(StatsDMetricBase const &)=delete
virtual void do_process()=0
T emplace_back(T... args)
T shared_from_this(T... args)