rippled
Loading...
Searching...
No Matches
OverlayImpl.h
1#pragma once
2
3#include <xrpld/app/main/Application.h>
4#include <xrpld/overlay/Message.h>
5#include <xrpld/overlay/Overlay.h>
6#include <xrpld/overlay/Slot.h>
7#include <xrpld/overlay/detail/Handshake.h>
8#include <xrpld/overlay/detail/TrafficCount.h>
9#include <xrpld/overlay/detail/TxMetrics.h>
10#include <xrpld/peerfinder/PeerfinderManager.h>
11#include <xrpld/rpc/ServerHandler.h>
12
13#include <xrpl/basics/Resolver.h>
14#include <xrpl/basics/UnorderedContainers.h>
15#include <xrpl/basics/chrono.h>
16#include <xrpl/beast/utility/instrumentation.h>
17#include <xrpl/core/Job.h>
18#include <xrpl/resource/ResourceManager.h>
19#include <xrpl/server/Handoff.h>
20
21#include <boost/algorithm/string/predicate.hpp>
22#include <boost/asio/basic_waitable_timer.hpp>
23#include <boost/asio/ip/tcp.hpp>
24#include <boost/asio/ssl/context.hpp>
25#include <boost/asio/strand.hpp>
26#include <boost/container/flat_map.hpp>
27
28#include <atomic>
29#include <chrono>
30#include <condition_variable>
31#include <cstdint>
32#include <memory>
33#include <mutex>
34#include <optional>
35#include <unordered_map>
36
37namespace xrpl {
38
39class PeerImp;
40class BasicConfig;
41
43{
44public:
45 class Child
46 {
47 protected:
49
50 explicit Child(OverlayImpl& overlay);
51
52 virtual ~Child();
53
54 public:
55 virtual void
56 stop() = 0;
57 };
58
59private:
61 using socket_type = boost::asio::ip::tcp::socket;
62 using address_type = boost::asio::ip::address;
63 using endpoint_type = boost::asio::ip::tcp::endpoint;
64 using error_code = boost::system::error_code;
65
67 {
68 boost::asio::basic_waitable_timer<clock_type> timer_;
69 bool stopping_{false};
70
71 explicit Timer(OverlayImpl& overlay);
72
73 void
74 stop() override;
75
76 void
77 async_wait();
78
79 void
81 };
82
84 boost::asio::io_context& io_context_;
86 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
87 mutable std::recursive_mutex mutex_; // VFALCO use std::mutex
90 boost::container::flat_map<Child*, std::weak_ptr<Child>> list_;
105
107
108 // Transaction reduce-relay metrics
110
111 // A message with the list of manifests we send to peers
113 // Used to track whether we need to update the cached list of manifests
115 // Protects the message and the sequence list of manifests
117
118 //--------------------------------------------------------------------------
119
120public:
122 Application& app,
123 Setup const& setup,
124 ServerHandler& serverHandler,
126 Resolver& resolver,
127 boost::asio::io_context& io_context,
128 BasicConfig const& config,
129 beast::insight::Collector::ptr const& collector);
130
131 OverlayImpl(OverlayImpl const&) = delete;
133 operator=(OverlayImpl const&) = delete;
134
135 void
136 start() override;
137
138 void
139 stop() override;
140
143 {
144 return *m_peerFinder;
145 }
146
149 {
150 return m_resourceManager;
151 }
152
153 Setup const&
154 setup() const
155 {
156 return setup_;
157 }
158
159 Handoff
160 onHandoff(std::unique_ptr<stream_type>&& bundle, http_request_type&& request, endpoint_type remote_endpoint)
161 override;
162
163 void
164 connect(beast::IP::Endpoint const& remote_endpoint) override;
165
166 int
167 limit() override;
168
170 size() const override;
171
173 json() override;
174
176 getActivePeers() const override;
177
189 std::set<Peer::id_t> const& toSkip,
190 std::size_t& active,
191 std::size_t& disabled,
192 std::size_t& enabledInSkip) const;
193
194 void checkTracking(std::uint32_t) override;
195
197 findPeerByShortID(Peer::id_t const& id) const override;
198
200 findPeerByPublicKey(PublicKey const& pubKey) override;
201
202 void
203 broadcast(protocol::TMProposeSet& m) override;
204
205 void
206 broadcast(protocol::TMValidation& m) override;
207
209 relay(protocol::TMProposeSet& m, uint256 const& uid, PublicKey const& validator) override;
210
212 relay(protocol::TMValidation& m, uint256 const& uid, PublicKey const& validator) override;
213
214 void
215 relay(
216 uint256 const&,
218 std::set<Peer::id_t> const& skip) override;
219
222
223 //--------------------------------------------------------------------------
224 //
225 // OverlayImpl
226 //
227
228 void
230
231 void
233
239 void
241
242 // Called when an active peer is destroyed.
243 void
245
246 // UnaryFunc will be called as
247 // void(std::shared_ptr<PeerImp>&&)
248 //
249 template <class UnaryFunc>
250 void
251 for_each(UnaryFunc&& f) const
252 {
254 {
256
257 // Iterate over a copy of the peer list because peer
258 // destruction can invalidate iterators.
259 wp.reserve(ids_.size());
260
261 for (auto& x : ids_)
262 wp.push_back(x.second);
263 }
264
265 for (auto& w : wp)
266 {
267 if (auto p = w.lock())
268 f(std::move(p));
269 }
270 }
271
272 // Called when TMManifests is received from a peer
273 void
275
276 static bool
277 isPeerUpgrade(http_request_type const& request);
278
279 template <class Body>
280 static bool
281 isPeerUpgrade(boost::beast::http::response<Body> const& response)
282 {
283 if (!is_upgrade(response))
284 return false;
285 return response.result() == boost::beast::http::status::switching_protocols;
286 }
287
288 template <class Fields>
289 static bool
290 is_upgrade(boost::beast::http::header<true, Fields> const& req)
291 {
292 if (req.version() < 11)
293 return false;
294 if (req.method() != boost::beast::http::verb::get)
295 return false;
296 if (!boost::beast::http::token_list{req["Connection"]}.exists("upgrade"))
297 return false;
298 return true;
299 }
300
301 template <class Fields>
302 static bool
303 is_upgrade(boost::beast::http::header<false, Fields> const& req)
304 {
305 if (req.version() < 11)
306 return false;
307 if (!boost::beast::http::token_list{req["Connection"]}.exists("upgrade"))
308 return false;
309 return true;
310 }
311
312 static std::string
314
315 void
317
318 void
320
321 void
323 {
325 }
326
328 getJqTransOverflow() const override
329 {
330 return jqTransOverflow_;
331 }
332
333 void
335 {
337 }
338
340 getPeerDisconnect() const override
341 {
342 return peerDisconnects_;
343 }
344
345 void
350
353 {
355 }
356
358 networkID() const override
359 {
360 return setup_.networkID;
361 }
362
372 void
374 uint256 const& key,
375 PublicKey const& validator,
376 std::set<Peer::id_t>&& peers,
377 protocol::MessageType type);
378
381 void
382 updateSlotAndSquelch(uint256 const& key, PublicKey const& validator, Peer::id_t peer, protocol::MessageType type);
383
389 void
391
393 txMetrics() const override
394 {
395 return txMetrics_.json();
396 }
397
399 template <typename... Args>
400 void
401 addTxMetrics(Args... args)
402 {
403 if (!strand_.running_in_this_thread())
404 return post(strand_, std::bind(&OverlayImpl::addTxMetrics<Args...>, this, args...));
405
406 txMetrics_.addMetrics(args...);
407 }
408
409private:
410 void
411 squelch(PublicKey const& validator, Peer::id_t const id, std::uint32_t squelchDuration) const override;
412
413 void
414 unsquelch(PublicKey const& validator, Peer::id_t id) const override;
415
419 http_request_type const& request,
420 address_type remote_address);
421
425 http_request_type const& request,
426 address_type remote_address,
427 std::string msg);
428
434 bool
435 processCrawl(http_request_type const& req, Handoff& handoff);
436
444 bool
445 processValidatorList(http_request_type const& req, Handoff& handoff);
446
452 bool
453 processHealth(http_request_type const& req, Handoff& handoff);
454
459 bool
460 processRequest(http_request_type const& req, Handoff& handoff);
461
468
475
482
488 getUnlInfo();
489
490 //--------------------------------------------------------------------------
491
492 //
493 // PropertyStream
494 //
495
496 void
497 onWrite(beast::PropertyStream::Map& stream) override;
498
499 //--------------------------------------------------------------------------
500
501 void
502 remove(Child& child);
503
504 void
505 stopChildren();
506
507 void
508 autoConnect();
509
510 void
512
514 void
515 sendTxQueue();
516
519 void
521
522private:
524 {
526 : name(name)
527 , bytesIn(collector->make_gauge(name, "Bytes_In"))
528 , bytesOut(collector->make_gauge(name, "Bytes_Out"))
529 , messagesIn(collector->make_gauge(name, "Messages_In"))
530 , messagesOut(collector->make_gauge(name, "Messages_Out"))
531 {
532 }
538 };
539
540 struct Stats
541 {
542 template <class Handler>
544 Handler const& handler,
545 beast::insight::Collector::ptr const& collector,
547 : peerDisconnects(collector->make_gauge("Overlay", "Peer_Disconnects"))
548 , trafficGauges(std::move(trafficGauges_))
549 , hook(collector->make_hook(handler))
550 {
551 }
552
556 };
557
560
561private:
562 void
564 {
565 auto counts = m_traffic.getCounts();
567 XRPL_ASSERT(
568 counts.size() == m_stats.trafficGauges.size(), "xrpl::OverlayImpl::collect_metrics : counts size do match");
569
570 for (auto const& [key, value] : counts)
571 {
572 auto it = m_stats.trafficGauges.find(key);
573 if (it == m_stats.trafficGauges.end())
574 continue;
575
576 auto& gauge = it->second;
577
578 XRPL_ASSERT(
579 gauge.name == value.name,
580 "xrpl::OverlayImpl::collect_metrics : gauge and counter "
581 "match");
582
583 gauge.bytesIn = value.bytesIn;
584 gauge.bytesOut = value.bytesOut;
585 gauge.messagesIn = value.messagesIn;
586 gauge.messagesOut = value.messagesOut;
587 }
588
590 }
591};
592
593} // namespace xrpl
T bind(T... args)
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
A generic endpoint for log messages.
Definition Journal.h:40
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
Holds unparsed configuration information.
virtual void stop()=0
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
std::weak_ptr< Timer > timer_
Definition OverlayImpl.h:89
boost::asio::io_context & io_context_
Definition OverlayImpl.h:84
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
boost::asio::ip::address address_type
Definition OverlayImpl.h:62
static bool isPeerUpgrade(http_request_type const &request)
Resource::Manager & m_resourceManager
Definition OverlayImpl.h:94
boost::system::error_code error_code
Definition OverlayImpl.h:64
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
OverlayImpl(OverlayImpl const &)=delete
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
Json::Value getServerCounts()
Returns information about the local server's performance counters.
std::mutex m_statsMutex
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
std::atomic< uint64_t > peerDisconnectsCharges_
boost::asio::ip::tcp::socket socket_type
Definition OverlayImpl.h:61
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition OverlayImpl.h:85
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
static bool is_upgrade(boost::beast::http::header< false, Fields > const &req)
void for_each(UnaryFunc &&f) const
void stop() override
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
OverlayImpl & operator=(OverlayImpl const &)=delete
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
ServerHandler & serverHandler_
Definition OverlayImpl.h:93
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::uint64_t getPeerDisconnectCharges() const override
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Definition OverlayImpl.h:98
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
TrafficCount m_traffic
Definition OverlayImpl.h:96
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
void sendTxQueue()
Send once a second transactions' hashes aggregated by peers.
std::shared_ptr< Message > manifestMessage_
std::unique_ptr< PeerFinder::Manager > m_peerFinder
Definition OverlayImpl.h:95
std::uint64_t getPeerDisconnect() const override
std::optional< std::uint32_t > manifestListSeq_
Json::Value txMetrics() const override
Returns tx reduce-relay metrics.
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
void add_active(std::shared_ptr< PeerImp > const &peer)
PeerFinder::Manager & peerFinder()
Application & app_
Definition OverlayImpl.h:83
std::recursive_mutex mutex_
Definition OverlayImpl.h:87
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
beast::Journal const journal_
Definition OverlayImpl.h:92
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:63
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition OverlayImpl.h:86
Json::Value json() override
Return diagnostics on the status of all peers.
static std::string makePrefix(std::uint32_t id)
Setup const & setup() const
std::atomic< uint64_t > peerDisconnects_
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
metrics::TxMetrics txMetrics_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition OverlayImpl.h:90
int limit() override
Returns the maximum number of peers we are configured to allow.
std::condition_variable_any cond_
Definition OverlayImpl.h:88
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
Definition OverlayImpl.h:97
std::shared_ptr< Message > getManifestsMessage()
Json::Value getUnlInfo()
Returns information about the local server's UNL.
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
std::atomic< uint64_t > jqTransOverflow_
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
std::optional< std::uint32_t > networkID() const override
Returns the ID of the network this server is configured for, if any.
std::atomic< Peer::id_t > next_id_
void reportInboundTraffic(TrafficCount::category cat, int bytes)
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
Resolver & m_resolver
Definition OverlayImpl.h:99
Json::Value getServerInfo()
Returns information about the local server.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
std::uint64_t getJqTransOverflow() const override
static bool isPeerUpgrade(boost::beast::http::response< Body > const &response)
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
void start() override
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
Json::Value getOverlayInfo()
Returns information about peers on the overlay network.
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
Manages the set of connected peers.
Definition Overlay.h:29
std::vector< std::shared_ptr< Peer > > PeerSequence
Definition Overlay.h:56
Maintains a set of IP addresses used for getting into the network.
A public key.
Definition PublicKey.h:42
Tracks load and resource consumption.
TrafficCount is used to count ingress and egress wire bytes and number of messages.
auto const & getCounts() const
An up-to-date copy of all the counters.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
T push_back(T... args)
T reserve(T... args)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:18
beast::insight::Gauge peerDisconnects
std::unordered_map< TrafficCount::category, TrafficGauges > trafficGauges
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector, std::unordered_map< TrafficCount::category, TrafficGauges > &&trafficGauges_)
beast::insight::Hook hook
void on_timer(error_code ec)
boost::asio::basic_waitable_timer< clock_type > timer_
Definition OverlayImpl.h:68
beast::insight::Gauge messagesOut
beast::insight::Gauge bytesOut
TrafficGauges(std::string const &name, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge messagesIn
beast::insight::Gauge bytesIn
std::optional< std::uint32_t > networkID
Definition Overlay.h:52
Run transaction reduce-relay feature related metrics.
Definition TxMetrics.h:69
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition TxMetrics.cpp:12
Json::Value json() const
Get json representation of the metrics.
Definition TxMetrics.cpp:94