3#include <xrpld/app/main/Application.h>
4#include <xrpld/overlay/Message.h>
5#include <xrpld/overlay/Overlay.h>
6#include <xrpld/overlay/Slot.h>
7#include <xrpld/overlay/detail/Handshake.h>
8#include <xrpld/overlay/detail/TrafficCount.h>
9#include <xrpld/overlay/detail/TxMetrics.h>
10#include <xrpld/peerfinder/PeerfinderManager.h>
11#include <xrpld/rpc/ServerHandler.h>
13#include <xrpl/basics/Resolver.h>
14#include <xrpl/basics/UnorderedContainers.h>
15#include <xrpl/basics/chrono.h>
16#include <xrpl/beast/utility/instrumentation.h>
17#include <xrpl/core/Job.h>
18#include <xrpl/resource/ResourceManager.h>
19#include <xrpl/server/Handoff.h>
21#include <boost/algorithm/string/predicate.hpp>
22#include <boost/asio/basic_waitable_timer.hpp>
23#include <boost/asio/ip/tcp.hpp>
24#include <boost/asio/ssl/context.hpp>
25#include <boost/asio/strand.hpp>
26#include <boost/container/flat_map.hpp>
67 boost::asio::basic_waitable_timer<clock_type>
timer;
85 boost::asio::strand<boost::asio::io_context::executor_type>
strand_;
89 boost::container::flat_map<Child*, std::weak_ptr<Child>>
list_;
126 boost::asio::io_context& ioContext,
171 size()
const override;
205 broadcast(protocol::TMProposeSet
const& m)
override;
208 broadcast(protocol::TMValidation
const& m)
override;
251 template <
class UnaryFunc>
269 if (
auto p = w.lock())
283 template <
class Body>
289 return response.result() == boost::beast::http::status::switching_protocols;
292 template <
class Fields>
294 isUpgrade(boost::beast::http::header<true, Fields>
const& req)
296 if (req.version() < 11)
298 if (req.method() != boost::beast::http::verb::get)
300 if (!boost::beast::http::token_list{req[
"Connection"]}.exists(
"upgrade"))
305 template <
class Fields>
307 isUpgrade(boost::beast::http::header<false, Fields>
const& req)
309 if (req.version() < 11)
311 if (!boost::beast::http::token_list{req[
"Connection"]}.exists(
"upgrade"))
381 protocol::MessageType type);
390 protocol::MessageType type);
407 template <
typename... Args>
411 if (!
strand_.running_in_this_thread())
551 template <
class Handler>
553 Handler
const& handler,
558 ,
hook(collector->makeHook(handler))
577 counts.size() ==
stats_.trafficGauges.size(),
578 "xrpl::OverlayImpl::collect_metrics : counts size do match");
580 for (
auto const& [key, value] : counts)
582 auto it =
stats_.trafficGauges.find(key);
583 if (it ==
stats_.trafficGauges.end())
586 auto& gauge = it->second;
589 gauge.name == value.name,
590 "xrpl::OverlayImpl::collect_metrics : gauge and counter "
593 gauge.bytesIn = value.bytesIn;
594 gauge.bytesOut = value.bytesOut;
595 gauge.messagesIn = value.messagesIn;
596 gauge.messagesOut = value.messagesOut;
A version-independent IP address and port combination.
A generic endpoint for log messages.
std::shared_ptr< Collector > ptr
A metric for measuring an integral value.
A reference to a handler for performing polled collection.
Holds unparsed configuration information.
Child(OverlayImpl &overlay)
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
std::weak_ptr< Timer > timer_
boost::asio::io_context & ioContext_
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
Resource::Manager & resourceManager_
json::Value getOverlayInfo() const
Returns information about peers on the overlay network.
boost::asio::ip::address address_type
static bool isPeerUpgrade(http_request_type const &request)
void addActive(std::shared_ptr< PeerImp > const &peer)
boost::system::error_code error_code
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
void broadcast(protocol::TMProposeSet const &m) override
Broadcast a proposal.
OverlayImpl(OverlayImpl const &)=delete
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
std::atomic< uint64_t > peerDisconnectsCharges_
boost::asio::ip::tcp::socket socket_type
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remoteAddress)
static bool isUpgrade(boost::beast::http::header< false, Fields > const &req)
static std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remoteAddress, std::string const &msg)
json::Value txMetrics() const override
Returns tx reduce-relay metrics.
std::set< Peer::id_t > relay(protocol::TMProposeSet const &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
void connect(beast::IP::Endpoint const &remoteEndpoint) override
Establish a peer connection to the specified endpoint.
OverlayImpl & operator=(OverlayImpl const &)=delete
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
static bool isUpgrade(boost::beast::http::header< true, Fields > const &req)
std::chrono::steady_clock clock_type
ServerHandler & serverHandler_
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::uint64_t getPeerDisconnectCharges() const override
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remoteEndpoint) override
Conditionally accept an incoming HTTP request.
reduce_relay::Slots< UptimeClock > slots_
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
OverlayImpl(Application &app, Setup setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &ioContext, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
void reportInboundTraffic(TrafficCount::Category cat, int bytes)
std::shared_ptr< Message > manifestMessage_
void sendTxQueue() const
Send once a second transactions' hashes aggregated by peers.
std::uint64_t getPeerDisconnect() const override
std::optional< std::uint32_t > manifestListSeq_
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
PeerFinder::Manager & peerFinder()
std::atomic< Peer::id_t > nextId_
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > peers_
json::Value getServerCounts()
Returns information about the local server's performance counters.
std::recursive_mutex mutex_
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
beast::Journal const journal_
json::Value json() override
Return diagnostics on the status of all peers.
boost::asio::ip::tcp::endpoint endpoint_type
void forEach(UnaryFunc &&f) const
void onPeerDeactivate(Peer::id_t id)
boost::asio::strand< boost::asio::io_context::executor_type > strand_
static std::string makePrefix(std::uint32_t id)
Setup const & setup() const
std::unique_ptr< PeerFinder::Manager > peerFinder_
std::atomic< uint64_t > peerDisconnects_
metrics::TxMetrics txMetrics_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
int limit() override
Returns the maximum number of peers we are configured to allow.
std::condition_variable_any cond_
json::Value getUnlInfo()
Returns information about the local server's UNL.
std::shared_ptr< Message > getManifestsMessage()
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
std::atomic< uint64_t > jqTransOverflow_
std::optional< std::uint32_t > networkID() const override
Returns the ID of the network this server is configured for, if any.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
json::Value getServerInfo()
Returns information about the local server.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
void reportOutboundTraffic(TrafficCount::Category cat, int bytes)
std::uint64_t getJqTransOverflow() const override
static bool isPeerUpgrade(boost::beast::http::response< Body > const &response)
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
std::vector< std::shared_ptr< Peer > > PeerSequence
Maintains a set of IP addresses used for getting into the network.
std::uint32_t id_t
Uniquely identifies a peer.
Tracks load and resource consumption.
TrafficCount is used to count ingress and egress wire bytes and number of messages.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Used to indicate the result of a server connection handoff.
beast::insight::Gauge peerDisconnects
std::unordered_map< TrafficCount::Category, TrafficGauges > trafficGauges
beast::insight::Hook hook
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector, std::unordered_map< TrafficCount::Category, TrafficGauges > &&trafficGauges)
void onTimer(error_code ec)
boost::asio::basic_waitable_timer< clock_type > timer
Timer(OverlayImpl &overlay)
beast::insight::Gauge messagesOut
beast::insight::Gauge bytesOut
TrafficGauges(std::string const &name, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge messagesIn
beast::insight::Gauge bytesIn
Run transaction reduce-relay feature related metrics.