1#include <xrpl/beast/insight/StatsDCollector.h>
3#include <xrpl/beast/core/List.h>
4#include <xrpl/beast/insight/CounterImpl.h>
5#include <xrpl/beast/insight/EventImpl.h>
6#include <xrpl/beast/insight/GaugeImpl.h>
7#include <xrpl/beast/insight/Hook.h>
8#include <xrpl/beast/insight/HookImpl.h>
9#include <xrpl/beast/insight/MeterImpl.h>
10#include <xrpl/beast/net/IPEndpoint.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/beast/utility/instrumentation.h>
14#include <boost/asio/basic_waitable_timer.hpp>
15#include <boost/asio/bind_executor.hpp>
16#include <boost/asio/buffer.hpp>
17#include <boost/asio/dispatch.hpp>
18#include <boost/asio/error.hpp>
19#include <boost/asio/executor_work_guard.hpp>
20#include <boost/asio/io_context.hpp>
21#include <boost/asio/ip/udp.hpp>
22#include <boost/asio/strand.hpp>
23#include <boost/system/detail/error_code.hpp>
24#include <boost/system/system_error.hpp>
40#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
41#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
214 boost::asio::strand<boost::asio::io_context::executor_type>
strand_;
215 boost::asio::basic_waitable_timer<std::chrono::steady_clock>
timer_;
224 static boost::asio::ip::udp::endpoint
227 return boost::asio::ip::udp::endpoint(ep.
address(), ep.
port());
249 catch (boost::system::system_error
const&)
306 boost::asio::io_context&
321 data_.emplace_back(buffer);
327 boost::asio::dispatch(
329 boost::asio::bind_executor(
338 boost::system::error_code ec,
341 if (ec == boost::asio::error::operation_aborted)
347 stream <<
"async_send failed: " << ec.message();
356#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
357 for (
auto const& buffer : buffers)
359 std::string const s(buffer.data(), boost::asio::buffer_size(buffer));
383 for (
auto const& s : *keepAlive)
388 "beast::insight::detail::StatsDCollectorImp::sendBuffers : "
389 "non-empty payload");
399 std::placeholders::_1,
400 std::placeholders::_2));
409 if (!buffers.
empty())
418 std::placeholders::_1,
419 std::placeholders::_2));
426 using namespace std::chrono_literals;
434 if (ec == boost::asio::error::operation_aborted)
440 stream <<
"onTimer failed: " << ec.message();
457 boost::system::error_code ec;
462 stream <<
"Connect failed: " << ec.message();
471 socket_.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
489 impl_->remove(*
this);
508 impl_->remove(*
this);
514 boost::asio::dispatch(
515 impl_->getIoContext(),
559 boost::asio::dispatch(
560 impl_->getIoContext(),
571 ss <<
impl_->prefix() <<
"." <<
name_ <<
":" << value.count() <<
"|ms"
586 impl_->remove(*
this);
592 boost::asio::dispatch(
593 impl_->getIoContext(),
603 boost::asio::dispatch(
604 impl_->getIoContext(),
651 value = (d >= value) ? 0 : value - d;
673 impl_->remove(*
this);
679 boost::asio::dispatch(
680 impl_->getIoContext(),
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
Port port() const
Returns the port number on the endpoint.
A generic endpoint for log messages.
Intrusive doubly linked list.
A metric for measuring an integral value.
std::chrono::milliseconds value_type
A metric for reporting event timing.
std::int64_t difference_type
A metric for measuring an integral value.
std::function< void(void)> HandlerType
A reference to a handler for performing polled collection.
A metric for measuring an integral value.
StatsDCollector()=default
static std::shared_ptr< StatsDCollector > make(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
void doPostBuffer(std::string const &buffer)
Gauge makeGauge(std::string const &name) override
Create a gauge with the specified name.
void remove(StatsDMetricBase &metric)
void postBuffer(std::string &&buffer)
boost::asio::io_context & getIoContext()
void onTimer(boost::system::error_code ec)
void add(StatsDMetricBase &metric)
boost::asio::io_context ioContext_
static boost::asio::ip::udp::endpoint toEndpoint(IP::Endpoint const &ep)
Hook makeHook(HookImpl::HandlerType const &handler) override
Counter makeCounter(std::string const &name) override
Create a counter with the specified name.
std::recursive_mutex metricsLock_
Meter makeMeter(std::string const &name) override
Create a meter with the specified name.
Event makeEvent(std::string const &name) override
Create an event with the specified name.
StatsDCollectorImp(IP::Endpoint address, std::string prefix, Journal journal)
void onSend(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
static constexpr auto kMaxPacketSize
std::string const & prefix() const
List< StatsDMetricBase > metrics_
~StatsDCollectorImp() override
boost::asio::ip::udp::socket socket_
std::deque< std::string > data_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
static void log(std::vector< boost::asio::const_buffer > const &buffers)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
StatsDCounterImpl & operator=(StatsDCounterImpl const &)=delete
void doIncrement(CounterImpl::value_type amount)
void increment(CounterImpl::value_type amount) override
CounterImpl::value_type value_
StatsDCounterImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
~StatsDCounterImpl() override
void doProcess() override
std::shared_ptr< StatsDCollectorImp > impl_
void notify(EventImpl::value_type const &value) override
StatsDEventImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
StatsDEventImpl & operator=(StatsDEventImpl const &)
void doNotify(EventImpl::value_type const &value)
std::shared_ptr< StatsDCollectorImp > impl_
~StatsDEventImpl() override=default
void increment(GaugeImpl::difference_type amount) override
~StatsDGaugeImpl() override
void set(GaugeImpl::value_type value) override
void doSet(GaugeImpl::value_type value)
GaugeImpl::value_type lastValue_
void doProcess() override
GaugeImpl::value_type value_
std::shared_ptr< StatsDCollectorImp > impl_
void doIncrement(GaugeImpl::difference_type amount)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)=delete
StatsDGaugeImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
StatsDHookImpl(HandlerType handler, std::shared_ptr< StatsDCollectorImp > impl)
std::shared_ptr< StatsDCollectorImp > impl_
void doProcess() override
~StatsDHookImpl() override
StatsDHookImpl & operator=(StatsDHookImpl const &)=delete
~StatsDMeterImpl() override
void doIncrement(MeterImpl::value_type amount)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)=delete
void doProcess() override
StatsDMeterImpl(std::string name, std::shared_ptr< StatsDCollectorImp > impl)
void increment(MeterImpl::value_type amount) override
std::shared_ptr< StatsDCollectorImp > impl_
MeterImpl::value_type value_
virtual void doProcess()=0
virtual ~StatsDMetricBase()=default
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase()=default
StatsDMetricBase(StatsDMetricBase const &)=delete
T emplace_back(T... args)
T static_pointer_cast(T... args)
T shared_from_this(T... args)