rippled
Loading...
Searching...
No Matches
OverlayImpl.h
1#pragma once
2
3#include <xrpld/app/main/Application.h>
4#include <xrpld/overlay/Message.h>
5#include <xrpld/overlay/Overlay.h>
6#include <xrpld/overlay/Slot.h>
7#include <xrpld/overlay/detail/Handshake.h>
8#include <xrpld/overlay/detail/TrafficCount.h>
9#include <xrpld/overlay/detail/TxMetrics.h>
10#include <xrpld/peerfinder/PeerfinderManager.h>
11#include <xrpld/rpc/ServerHandler.h>
12
13#include <xrpl/basics/Resolver.h>
14#include <xrpl/basics/UnorderedContainers.h>
15#include <xrpl/basics/chrono.h>
16#include <xrpl/beast/utility/instrumentation.h>
17#include <xrpl/core/Job.h>
18#include <xrpl/resource/ResourceManager.h>
19#include <xrpl/server/Handoff.h>
20
21#include <boost/algorithm/string/predicate.hpp>
22#include <boost/asio/basic_waitable_timer.hpp>
23#include <boost/asio/ip/tcp.hpp>
24#include <boost/asio/ssl/context.hpp>
25#include <boost/asio/strand.hpp>
26#include <boost/container/flat_map.hpp>
27
28#include <atomic>
29#include <chrono>
30#include <condition_variable>
31#include <cstdint>
32#include <memory>
33#include <mutex>
34#include <optional>
35#include <unordered_map>
36
37namespace xrpl {
38
39class PeerImp;
40class BasicConfig;
41
43{
44public:
45 class Child
46 {
47 protected:
49
50 explicit Child(OverlayImpl& overlay);
51
52 public:
53 virtual ~Child();
54 virtual void
55 stop() = 0;
56 };
57
58private:
60 using socket_type = boost::asio::ip::tcp::socket;
61 using address_type = boost::asio::ip::address;
62 using endpoint_type = boost::asio::ip::tcp::endpoint;
63 using error_code = boost::system::error_code;
64
66 {
67 boost::asio::basic_waitable_timer<clock_type> timer_;
68 bool stopping_{false};
69
70 explicit Timer(OverlayImpl& overlay);
71
72 void
73 stop() override;
74
75 void
76 async_wait();
77
78 void
80 };
81
83 boost::asio::io_context& io_context_;
85 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
86 mutable std::recursive_mutex mutex_; // VFALCO use std::mutex
89 boost::container::flat_map<Child*, std::weak_ptr<Child>> list_;
104
106
107 // Transaction reduce-relay metrics
109
110 // A message with the list of manifests we send to peers
112 // Used to track whether we need to update the cached list of manifests
114 // Protects the message and the sequence list of manifests
116
117 //--------------------------------------------------------------------------
118
119public:
121 Application& app,
122 Setup const& setup,
123 ServerHandler& serverHandler,
125 Resolver& resolver,
126 boost::asio::io_context& io_context,
127 BasicConfig const& config,
128 beast::insight::Collector::ptr const& collector);
129
130 OverlayImpl(OverlayImpl const&) = delete;
132 operator=(OverlayImpl const&) = delete;
133
134 void
135 start() override;
136
137 void
138 stop() override;
139
142 {
143 return *m_peerFinder;
144 }
145
148 {
149 return m_resourceManager;
150 }
151
152 Setup const&
153 setup() const
154 {
155 return setup_;
156 }
157
158 Handoff
159 onHandoff(
161 http_request_type&& request,
162 endpoint_type remote_endpoint) override;
163
164 void
165 connect(beast::IP::Endpoint const& remote_endpoint) override;
166
167 int
168 limit() override;
169
171 size() const override;
172
174 json() override;
175
177 getActivePeers() const override;
178
190 std::set<Peer::id_t> const& toSkip,
191 std::size_t& active,
192 std::size_t& disabled,
193 std::size_t& enabledInSkip) const;
194
195 void
197
199 findPeerByShortID(Peer::id_t const& id) const override;
200
202 findPeerByPublicKey(PublicKey const& pubKey) override;
203
204 void
205 broadcast(protocol::TMProposeSet& m) override;
206
207 void
208 broadcast(protocol::TMValidation& m) override;
209
211 relay(protocol::TMProposeSet& m, uint256 const& uid, PublicKey const& validator) override;
212
214 relay(protocol::TMValidation& m, uint256 const& uid, PublicKey const& validator) override;
215
216 void
217 relay(
218 uint256 const&,
220 std::set<Peer::id_t> const& skip) override;
221
224
225 //--------------------------------------------------------------------------
226 //
227 // OverlayImpl
228 //
229
230 void
232
233 void
235
241 void
243
244 // Called when an active peer is destroyed.
245 void
247
248 // UnaryFunc will be called as
249 // void(std::shared_ptr<PeerImp>&&)
250 //
251 template <class UnaryFunc>
252 void
253 for_each(UnaryFunc&& f) const
254 {
256 {
257 std::lock_guard const lock(mutex_);
258
259 // Iterate over a copy of the peer list because peer
260 // destruction can invalidate iterators.
261 wp.reserve(ids_.size());
262
263 for (auto& x : ids_)
264 wp.push_back(x.second);
265 }
266
267 for (auto& w : wp)
268 {
269 if (auto p = w.lock())
270 f(std::move(p));
271 }
272 }
273
274 // Called when TMManifests is received from a peer
275 void
278 std::shared_ptr<PeerImp> const& from);
279
280 static bool
281 isPeerUpgrade(http_request_type const& request);
282
283 template <class Body>
284 static bool
285 isPeerUpgrade(boost::beast::http::response<Body> const& response)
286 {
287 if (!is_upgrade(response))
288 return false;
289 return response.result() == boost::beast::http::status::switching_protocols;
290 }
291
292 template <class Fields>
293 static bool
294 is_upgrade(boost::beast::http::header<true, Fields> const& req)
295 {
296 if (req.version() < 11)
297 return false;
298 if (req.method() != boost::beast::http::verb::get)
299 return false;
300 if (!boost::beast::http::token_list{req["Connection"]}.exists("upgrade"))
301 return false;
302 return true;
303 }
304
305 template <class Fields>
306 static bool
307 is_upgrade(boost::beast::http::header<false, Fields> const& req)
308 {
309 if (req.version() < 11)
310 return false;
311 if (!boost::beast::http::token_list{req["Connection"]}.exists("upgrade"))
312 return false;
313 return true;
314 }
315
316 static std::string
318
319 void
321
322 void
324
325 void
327 {
329 }
330
332 getJqTransOverflow() const override
333 {
334 return jqTransOverflow_;
335 }
336
337 void
339 {
341 }
342
344 getPeerDisconnect() const override
345 {
346 return peerDisconnects_;
347 }
348
349 void
354
357 {
359 }
360
362 networkID() const override
363 {
364 return setup_.networkID;
365 }
366
376 void
378 uint256 const& key,
379 PublicKey const& validator,
380 std::set<Peer::id_t>&& peers,
381 protocol::MessageType type);
382
385 void
387 uint256 const& key,
388 PublicKey const& validator,
389 Peer::id_t peer,
390 protocol::MessageType type);
391
397 void
399
401 txMetrics() const override
402 {
403 return txMetrics_.json();
404 }
405
407 template <typename... Args>
408 void
409 addTxMetrics(Args... args)
410 {
411 if (!strand_.running_in_this_thread())
412 return post(strand_, std::bind(&OverlayImpl::addTxMetrics<Args...>, this, args...));
413
414 txMetrics_.addMetrics(args...);
415 }
416
417private:
418 void
419 squelch(PublicKey const& validator, Peer::id_t const id, std::uint32_t squelchDuration)
420 const override;
421
422 void
423 unsquelch(PublicKey const& validator, Peer::id_t id) const override;
424
428 http_request_type const& request,
429 address_type remote_address);
430
434 http_request_type const& request,
435 address_type remote_address,
436 std::string msg);
437
443 bool
444 processCrawl(http_request_type const& req, Handoff& handoff);
445
453 bool
454 processValidatorList(http_request_type const& req, Handoff& handoff);
455
461 bool
462 processHealth(http_request_type const& req, Handoff& handoff);
463
468 bool
469 processRequest(http_request_type const& req, Handoff& handoff);
470
476 getOverlayInfo() const;
477
484
491
497 getUnlInfo();
498
499 //--------------------------------------------------------------------------
500
501 //
502 // PropertyStream
503 //
504
505 void
506 onWrite(beast::PropertyStream::Map& stream) override;
507
508 //--------------------------------------------------------------------------
509
510 void
511 remove(Child& child);
512
513 void
514 stopChildren();
515
516 void
517 autoConnect();
518
519 void
521
523 void
524 sendTxQueue() const;
525
528 void
530
531private:
533 {
535 : name(name)
536 , bytesIn(collector->make_gauge(name, "Bytes_In"))
537 , bytesOut(collector->make_gauge(name, "Bytes_Out"))
538 , messagesIn(collector->make_gauge(name, "Messages_In"))
539 , messagesOut(collector->make_gauge(name, "Messages_Out"))
540 {
541 }
547 };
548
549 struct Stats
550 {
551 template <class Handler>
553 Handler const& handler,
554 beast::insight::Collector::ptr const& collector,
556 : peerDisconnects(collector->make_gauge("Overlay", "Peer_Disconnects"))
557 , trafficGauges(std::move(trafficGauges_))
558 , hook(collector->make_hook(handler))
559 {
560 }
561
565 };
566
569
570private:
571 void
573 {
574 auto counts = m_traffic.getCounts();
575 std::lock_guard const lock(m_statsMutex);
576 XRPL_ASSERT(
577 counts.size() == m_stats.trafficGauges.size(),
578 "xrpl::OverlayImpl::collect_metrics : counts size do match");
579
580 for (auto const& [key, value] : counts)
581 {
582 auto it = m_stats.trafficGauges.find(key);
583 if (it == m_stats.trafficGauges.end())
584 continue;
585
586 auto& gauge = it->second;
587
588 XRPL_ASSERT(
589 gauge.name == value.name,
590 "xrpl::OverlayImpl::collect_metrics : gauge and counter "
591 "match");
592
593 gauge.bytesIn = value.bytesIn;
594 gauge.bytesOut = value.bytesOut;
595 gauge.messagesIn = value.messagesIn;
596 gauge.messagesOut = value.messagesOut;
597 }
598
600 }
601};
602
603} // namespace xrpl
T bind(T... args)
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
A generic endpoint for log messages.
Definition Journal.h:40
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
Holds unparsed configuration information.
virtual void stop()=0
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
std::weak_ptr< Timer > timer_
Definition OverlayImpl.h:88
boost::asio::io_context & io_context_
Definition OverlayImpl.h:83
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
boost::asio::ip::address address_type
Definition OverlayImpl.h:61
static bool isPeerUpgrade(http_request_type const &request)
Resource::Manager & m_resourceManager
Definition OverlayImpl.h:93
boost::system::error_code error_code
Definition OverlayImpl.h:63
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
OverlayImpl(OverlayImpl const &)=delete
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
Json::Value getServerCounts()
Returns information about the local server's performance counters.
std::mutex m_statsMutex
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
std::atomic< uint64_t > peerDisconnectsCharges_
boost::asio::ip::tcp::socket socket_type
Definition OverlayImpl.h:60
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition OverlayImpl.h:84
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
static bool is_upgrade(boost::beast::http::header< false, Fields > const &req)
void for_each(UnaryFunc &&f) const
void stop() override
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
OverlayImpl & operator=(OverlayImpl const &)=delete
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
ServerHandler & serverHandler_
Definition OverlayImpl.h:92
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::uint64_t getPeerDisconnectCharges() const override
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
static std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Definition OverlayImpl.h:97
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
TrafficCount m_traffic
Definition OverlayImpl.h:95
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
std::shared_ptr< Message > manifestMessage_
void sendTxQueue() const
Send once a second transactions' hashes aggregated by peers.
std::unique_ptr< PeerFinder::Manager > m_peerFinder
Definition OverlayImpl.h:94
std::uint64_t getPeerDisconnect() const override
std::optional< std::uint32_t > manifestListSeq_
Json::Value txMetrics() const override
Returns tx reduce-relay metrics.
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
void add_active(std::shared_ptr< PeerImp > const &peer)
PeerFinder::Manager & peerFinder()
Application & app_
Definition OverlayImpl.h:82
std::recursive_mutex mutex_
Definition OverlayImpl.h:86
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
beast::Journal const journal_
Definition OverlayImpl.h:91
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:62
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition OverlayImpl.h:85
Json::Value json() override
Return diagnostics on the status of all peers.
static std::string makePrefix(std::uint32_t id)
Setup const & setup() const
std::atomic< uint64_t > peerDisconnects_
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
metrics::TxMetrics txMetrics_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition OverlayImpl.h:89
int limit() override
Returns the maximum number of peers we are configured to allow.
std::condition_variable_any cond_
Definition OverlayImpl.h:87
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
Definition OverlayImpl.h:96
std::shared_ptr< Message > getManifestsMessage()
Json::Value getUnlInfo()
Returns information about the local server's UNL.
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
std::atomic< uint64_t > jqTransOverflow_
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
std::optional< std::uint32_t > networkID() const override
Returns the ID of the network this server is configured for, if any.
std::atomic< Peer::id_t > next_id_
Definition OverlayImpl.h:99
void reportInboundTraffic(TrafficCount::category cat, int bytes)
Json::Value getOverlayInfo() const
Returns information about peers on the overlay network.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
Resolver & m_resolver
Definition OverlayImpl.h:98
Json::Value getServerInfo()
Returns information about the local server.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
std::uint64_t getJqTransOverflow() const override
static bool isPeerUpgrade(boost::beast::http::response< Body > const &response)
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
void start() override
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
Manages the set of connected peers.
Definition Overlay.h:29
std::vector< std::shared_ptr< Peer > > PeerSequence
Definition Overlay.h:56
Maintains a set of IP addresses used for getting into the network.
A public key.
Definition PublicKey.h:42
Tracks load and resource consumption.
TrafficCount is used to count ingress and egress wire bytes and number of messages.
auto const & getCounts() const
An up-to-date copy of all the counters.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
T push_back(T... args)
T reserve(T... args)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:18
beast::insight::Gauge peerDisconnects
std::unordered_map< TrafficCount::category, TrafficGauges > trafficGauges
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector, std::unordered_map< TrafficCount::category, TrafficGauges > &&trafficGauges_)
beast::insight::Hook hook
void on_timer(error_code ec)
boost::asio::basic_waitable_timer< clock_type > timer_
Definition OverlayImpl.h:67
beast::insight::Gauge messagesOut
beast::insight::Gauge bytesOut
TrafficGauges(std::string const &name, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge messagesIn
beast::insight::Gauge bytesIn
std::optional< std::uint32_t > networkID
Definition Overlay.h:52
Run transaction reduce-relay feature related metrics.
Definition TxMetrics.h:69
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition TxMetrics.cpp:12
Json::Value json() const
Get json representation of the metrics.
Definition TxMetrics.cpp:95