rippled
Loading...
Searching...
No Matches
OverlayImpl.h
1#ifndef XRPL_OVERLAY_OVERLAYIMPL_H_INCLUDED
2#define XRPL_OVERLAY_OVERLAYIMPL_H_INCLUDED
3
4#include <xrpld/app/main/Application.h>
5#include <xrpld/core/Job.h>
6#include <xrpld/overlay/Message.h>
7#include <xrpld/overlay/Overlay.h>
8#include <xrpld/overlay/Slot.h>
9#include <xrpld/overlay/detail/Handshake.h>
10#include <xrpld/overlay/detail/TrafficCount.h>
11#include <xrpld/overlay/detail/TxMetrics.h>
12#include <xrpld/peerfinder/PeerfinderManager.h>
13#include <xrpld/rpc/ServerHandler.h>
14
15#include <xrpl/basics/Resolver.h>
16#include <xrpl/basics/UnorderedContainers.h>
17#include <xrpl/basics/chrono.h>
18#include <xrpl/beast/utility/instrumentation.h>
19#include <xrpl/resource/ResourceManager.h>
20#include <xrpl/server/Handoff.h>
21
22#include <boost/algorithm/string/predicate.hpp>
23#include <boost/asio/basic_waitable_timer.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/ssl/context.hpp>
26#include <boost/asio/strand.hpp>
27#include <boost/container/flat_map.hpp>
28
29#include <atomic>
30#include <chrono>
31#include <condition_variable>
32#include <cstdint>
33#include <memory>
34#include <mutex>
35#include <optional>
36#include <unordered_map>
37
38namespace ripple {
39
40class PeerImp;
41class BasicConfig;
42
44{
45public:
46 class Child
47 {
48 protected:
50
51 explicit Child(OverlayImpl& overlay);
52
53 virtual ~Child();
54
55 public:
56 virtual void
57 stop() = 0;
58 };
59
60private:
62 using socket_type = boost::asio::ip::tcp::socket;
63 using address_type = boost::asio::ip::address;
64 using endpoint_type = boost::asio::ip::tcp::endpoint;
65 using error_code = boost::system::error_code;
66
68 {
69 boost::asio::basic_waitable_timer<clock_type> timer_;
70 bool stopping_{false};
71
72 explicit Timer(OverlayImpl& overlay);
73
74 void
75 stop() override;
76
77 void
78 async_wait();
79
80 void
82 };
83
85 boost::asio::io_context& io_context_;
86 std::optional<boost::asio::executor_work_guard<
87 boost::asio::io_context::executor_type>>
89 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
90 mutable std::recursive_mutex mutex_; // VFALCO use std::mutex
93 boost::container::flat_map<Child*, std::weak_ptr<Child>> list_;
108
110
111 // Transaction reduce-relay metrics
113
114 // A message with the list of manifests we send to peers
116 // Used to track whether we need to update the cached list of manifests
118 // Protects the message and the sequence list of manifests
120
121 //--------------------------------------------------------------------------
122
123public:
125 Application& app,
126 Setup const& setup,
127 ServerHandler& serverHandler,
129 Resolver& resolver,
130 boost::asio::io_context& io_context,
131 BasicConfig const& config,
132 beast::insight::Collector::ptr const& collector);
133
134 OverlayImpl(OverlayImpl const&) = delete;
136 operator=(OverlayImpl const&) = delete;
137
138 void
139 start() override;
140
141 void
142 stop() override;
143
146 {
147 return *m_peerFinder;
148 }
149
152 {
153 return m_resourceManager;
154 }
155
156 Setup const&
157 setup() const
158 {
159 return setup_;
160 }
161
162 Handoff
163 onHandoff(
165 http_request_type&& request,
166 endpoint_type remote_endpoint) override;
167
168 void
169 connect(beast::IP::Endpoint const& remote_endpoint) override;
170
171 int
172 limit() override;
173
175 size() const override;
176
178 json() override;
179
181 getActivePeers() const override;
182
194 std::set<Peer::id_t> const& toSkip,
195 std::size_t& active,
196 std::size_t& disabled,
197 std::size_t& enabledInSkip) const;
198
199 void checkTracking(std::uint32_t) override;
200
202 findPeerByShortID(Peer::id_t const& id) const override;
203
205 findPeerByPublicKey(PublicKey const& pubKey) override;
206
207 void
208 broadcast(protocol::TMProposeSet& m) override;
209
210 void
211 broadcast(protocol::TMValidation& m) override;
212
214 relay(
215 protocol::TMProposeSet& m,
216 uint256 const& uid,
217 PublicKey const& validator) override;
218
220 relay(
221 protocol::TMValidation& m,
222 uint256 const& uid,
223 PublicKey const& validator) override;
224
225 void
226 relay(
227 uint256 const&,
229 std::set<Peer::id_t> const& skip) override;
230
233
234 //--------------------------------------------------------------------------
235 //
236 // OverlayImpl
237 //
238
239 void
241
242 void
244
250 void
252
253 // Called when an active peer is destroyed.
254 void
256
257 // UnaryFunc will be called as
258 // void(std::shared_ptr<PeerImp>&&)
259 //
260 template <class UnaryFunc>
261 void
262 for_each(UnaryFunc&& f) const
263 {
265 {
267
268 // Iterate over a copy of the peer list because peer
269 // destruction can invalidate iterators.
270 wp.reserve(ids_.size());
271
272 for (auto& x : ids_)
273 wp.push_back(x.second);
274 }
275
276 for (auto& w : wp)
277 {
278 if (auto p = w.lock())
279 f(std::move(p));
280 }
281 }
282
283 // Called when TMManifests is received from a peer
284 void
287 std::shared_ptr<PeerImp> const& from);
288
289 static bool
290 isPeerUpgrade(http_request_type const& request);
291
292 template <class Body>
293 static bool
294 isPeerUpgrade(boost::beast::http::response<Body> const& response)
295 {
296 if (!is_upgrade(response))
297 return false;
298 return response.result() ==
299 boost::beast::http::status::switching_protocols;
300 }
301
302 template <class Fields>
303 static bool
304 is_upgrade(boost::beast::http::header<true, Fields> const& req)
305 {
306 if (req.version() < 11)
307 return false;
308 if (req.method() != boost::beast::http::verb::get)
309 return false;
310 if (!boost::beast::http::token_list{req["Connection"]}.exists(
311 "upgrade"))
312 return false;
313 return true;
314 }
315
316 template <class Fields>
317 static bool
318 is_upgrade(boost::beast::http::header<false, Fields> const& req)
319 {
320 if (req.version() < 11)
321 return false;
322 if (!boost::beast::http::token_list{req["Connection"]}.exists(
323 "upgrade"))
324 return false;
325 return true;
326 }
327
328 static std::string
330
331 void
333
334 void
336
337 void
339 {
341 }
342
344 getJqTransOverflow() const override
345 {
346 return jqTransOverflow_;
347 }
348
349 void
351 {
353 }
354
356 getPeerDisconnect() const override
357 {
358 return peerDisconnects_;
359 }
360
361 void
366
369 {
371 }
372
374 networkID() const override
375 {
376 return setup_.networkID;
377 }
378
388 void
390 uint256 const& key,
391 PublicKey const& validator,
392 std::set<Peer::id_t>&& peers,
393 protocol::MessageType type);
394
397 void
399 uint256 const& key,
400 PublicKey const& validator,
401 Peer::id_t peer,
402 protocol::MessageType type);
403
409 void
411
413 txMetrics() const override
414 {
415 return txMetrics_.json();
416 }
417
419 template <typename... Args>
420 void
421 addTxMetrics(Args... args)
422 {
423 if (!strand_.running_in_this_thread())
424 return post(
425 strand_,
426 std::bind(&OverlayImpl::addTxMetrics<Args...>, this, args...));
427
428 txMetrics_.addMetrics(args...);
429 }
430
431private:
432 void
433 squelch(
434 PublicKey const& validator,
435 Peer::id_t const id,
436 std::uint32_t squelchDuration) const override;
437
438 void
439 unsquelch(PublicKey const& validator, Peer::id_t id) const override;
440
444 http_request_type const& request,
445 address_type remote_address);
446
450 http_request_type const& request,
451 address_type remote_address,
452 std::string msg);
453
459 bool
460 processCrawl(http_request_type const& req, Handoff& handoff);
461
469 bool
470 processValidatorList(http_request_type const& req, Handoff& handoff);
471
477 bool
478 processHealth(http_request_type const& req, Handoff& handoff);
479
484 bool
485 processRequest(http_request_type const& req, Handoff& handoff);
486
493
500
507
513 getUnlInfo();
514
515 //--------------------------------------------------------------------------
516
517 //
518 // PropertyStream
519 //
520
521 void
522 onWrite(beast::PropertyStream::Map& stream) override;
523
524 //--------------------------------------------------------------------------
525
526 void
527 remove(Child& child);
528
529 void
530 stopChildren();
531
532 void
533 autoConnect();
534
535 void
537
539 void
540 sendTxQueue();
541
544 void
546
547private:
549 {
551 std::string const& name,
552 beast::insight::Collector::ptr const& collector)
553 : name(name)
554 , bytesIn(collector->make_gauge(name, "Bytes_In"))
555 , bytesOut(collector->make_gauge(name, "Bytes_Out"))
556 , messagesIn(collector->make_gauge(name, "Messages_In"))
557 , messagesOut(collector->make_gauge(name, "Messages_Out"))
558 {
559 }
565 };
566
567 struct Stats
568 {
569 template <class Handler>
571 Handler const& handler,
572 beast::insight::Collector::ptr const& collector,
574 trafficGauges_)
576 collector->make_gauge("Overlay", "Peer_Disconnects"))
577 , trafficGauges(std::move(trafficGauges_))
578 , hook(collector->make_hook(handler))
579 {
580 }
581
585 };
586
589
590private:
591 void
593 {
594 auto counts = m_traffic.getCounts();
596 XRPL_ASSERT(
597 counts.size() == m_stats.trafficGauges.size(),
598 "ripple::OverlayImpl::collect_metrics : counts size do match");
599
600 for (auto const& [key, value] : counts)
601 {
602 auto it = m_stats.trafficGauges.find(key);
603 if (it == m_stats.trafficGauges.end())
604 continue;
605
606 auto& gauge = it->second;
607
608 XRPL_ASSERT(
609 gauge.name == value.name,
610 "ripple::OverlayImpl::collect_metrics : gauge and counter "
611 "match");
612
613 gauge.bytesIn = value.bytesIn;
614 gauge.bytesOut = value.bytesOut;
615 gauge.messagesIn = value.messagesIn;
616 gauge.messagesOut = value.messagesOut;
617 }
618
620 }
621};
622
623} // namespace ripple
624
625#endif
T bind(T... args)
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
A generic endpoint for log messages.
Definition Journal.h:41
A metric for measuring an integral value.
Definition Gauge.h:21
A reference to a handler for performing polled collection.
Definition Hook.h:13
Holds unparsed configuration information.
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition OverlayImpl.h:88
boost::system::error_code error_code
Definition OverlayImpl.h:65
Json::Value getUnlInfo()
Returns information about the local server's UNL.
void stop() override
static std::string makePrefix(std::uint32_t id)
PeerFinder::Manager & peerFinder()
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:64
std::atomic< uint64_t > peerDisconnects_
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
boost::asio::ip::address address_type
Definition OverlayImpl.h:63
boost::asio::io_context & io_context_
Definition OverlayImpl.h:85
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
std::condition_variable_any cond_
Definition OverlayImpl.h:91
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
Json::Value txMetrics() const override
Returns tx reduce-relay metrics.
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
Resolver & m_resolver
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void start() override
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
void add_active(std::shared_ptr< PeerImp > const &peer)
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
Resource::Manager & m_resourceManager
Definition OverlayImpl.h:97
std::shared_ptr< Message > manifestMessage_
std::optional< std::uint32_t > manifestListSeq_
OverlayImpl & operator=(OverlayImpl const &)=delete
TrafficCount m_traffic
Definition OverlayImpl.h:99
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
std::atomic< Peer::id_t > next_id_
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
Application & app_
Definition OverlayImpl.h:84
std::optional< std::uint32_t > networkID() const override
Returns the ID of the network this server is configured for, if any.
std::weak_ptr< Timer > timer_
Definition OverlayImpl.h:92
std::atomic< uint64_t > jqTransOverflow_
metrics::TxMetrics txMetrics_
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
std::recursive_mutex mutex_
Definition OverlayImpl.h:90
std::uint64_t getPeerDisconnectCharges() const override
boost::asio::ip::tcp::socket socket_type
Definition OverlayImpl.h:62
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void sendTxQueue()
Send once a second transactions' hashes aggregated by peers.
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition OverlayImpl.h:89
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
void for_each(UnaryFunc &&f) const
static bool isPeerUpgrade(boost::beast::http::response< Body > const &response)
OverlayImpl(OverlayImpl const &)=delete
Json::Value getOverlayInfo()
Returns information about peers on the overlay network.
Resource::Manager & resourceManager()
static bool isPeerUpgrade(http_request_type const &request)
Json::Value getServerCounts()
Returns information about the local server's performance counters.
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::unique_ptr< PeerFinder::Manager > m_peerFinder
Definition OverlayImpl.h:98
std::uint64_t getJqTransOverflow() const override
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
Setup const & setup() const
std::atomic< uint64_t > peerDisconnectsCharges_
std::shared_ptr< Message > getManifestsMessage()
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Json::Value getServerInfo()
Returns information about the local server.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
Json::Value json() override
Return diagnostics on the status of all peers.
std::mutex m_statsMutex
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
void incPeerDisconnectCharges() override
ServerHandler & serverHandler_
Definition OverlayImpl.h:96
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
static bool is_upgrade(boost::beast::http::header< false, Fields > const &req)
int limit() override
Returns the maximum number of peers we are configured to allow.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
beast::Journal const journal_
Definition OverlayImpl.h:95
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition OverlayImpl.h:93
std::uint64_t getPeerDisconnect() const override
Manages the set of connected peers.
Definition Overlay.h:30
std::vector< std::shared_ptr< Peer > > PeerSequence
Definition Overlay.h:57
Maintains a set of IP addresses used for getting into the network.
A public key.
Definition PublicKey.h:43
Tracks load and resource consumption.
TrafficCount is used to count ingress and egress wire bytes and number of messages.
auto const & getCounts() const
An up-to-date copy of all the counters.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:14
STL namespace.
T push_back(T... args)
T reserve(T... args)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:21
beast::insight::Gauge peerDisconnects
std::unordered_map< TrafficCount::category, TrafficGauges > trafficGauges
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector, std::unordered_map< TrafficCount::category, TrafficGauges > &&trafficGauges_)
beast::insight::Hook hook
void on_timer(error_code ec)
boost::asio::basic_waitable_timer< clock_type > timer_
Definition OverlayImpl.h:69
beast::insight::Gauge messagesIn
beast::insight::Gauge bytesIn
beast::insight::Gauge messagesOut
beast::insight::Gauge bytesOut
TrafficGauges(std::string const &name, beast::insight::Collector::ptr const &collector)
std::optional< std::uint32_t > networkID
Definition Overlay.h:53
Run transaction reduce-relay feature related metrics.
Definition TxMetrics.h:70
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition TxMetrics.cpp:12
Json::Value json() const
Get json representation of the metrics.
Definition TxMetrics.cpp:99