rippled
Loading...
Searching...
No Matches
OverlayImpl.cpp
1#include <xrpld/app/misc/HashRouter.h>
2#include <xrpld/app/misc/NetworkOPs.h>
3#include <xrpld/app/misc/ValidatorList.h>
4#include <xrpld/app/misc/ValidatorSite.h>
5#include <xrpld/app/rdb/RelationalDatabase.h>
6#include <xrpld/app/rdb/Wallet.h>
7#include <xrpld/overlay/Cluster.h>
8#include <xrpld/overlay/detail/ConnectAttempt.h>
9#include <xrpld/overlay/detail/PeerImp.h>
10#include <xrpld/overlay/detail/TrafficCount.h>
11#include <xrpld/overlay/detail/Tuning.h>
12#include <xrpld/overlay/predicates.h>
13#include <xrpld/peerfinder/make_Manager.h>
14#include <xrpld/rpc/handlers/GetCounts.h>
15#include <xrpld/rpc/json_body.h>
16
17#include <xrpl/basics/base64.h>
18#include <xrpl/basics/make_SSLContext.h>
19#include <xrpl/basics/random.h>
20#include <xrpl/beast/core/LexicalCast.h>
21#include <xrpl/protocol/STTx.h>
22#include <xrpl/server/SimpleWriter.h>
23
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/asio/executor_work_guard.hpp>
26
27namespace xrpl {
28
29namespace CrawlOptions {
30enum {
32 Overlay = (1 << 0),
33 ServerInfo = (1 << 1),
34 ServerCounts = (1 << 2),
35 Unl = (1 << 3)
36};
37}
38
39//------------------------------------------------------------------------------
40
41OverlayImpl::Child::Child(OverlayImpl& overlay) : overlay_(overlay)
42{
43}
44
46{
47 overlay_.remove(*this);
48}
49
50//------------------------------------------------------------------------------
51
53 : Child(overlay), timer_(overlay_.io_context_)
54{
55}
56
57void
59{
60 // This method is only ever called from the same strand that calls
61 // Timer::on_timer, ensuring they never execute concurrently.
62 stopping_ = true;
63 timer_.cancel();
64}
65
66void
68{
69 timer_.expires_after(std::chrono::seconds(1));
70 timer_.async_wait(boost::asio::bind_executor(
71 overlay_.strand_,
73 &Timer::on_timer, shared_from_this(), std::placeholders::_1)));
74}
75
76void
78{
79 if (ec || stopping_)
80 {
81 if (ec && ec != boost::asio::error::operation_aborted)
82 {
83 JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
84 }
85 return;
86 }
87
88 overlay_.m_peerFinder->once_per_second();
89 overlay_.sendEndpoints();
90 overlay_.autoConnect();
91 if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
92 overlay_.sendTxQueue();
93
94 if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
95 overlay_.deleteIdlePeers();
96
97 async_wait();
98}
99
100//------------------------------------------------------------------------------
101
103 Application& app,
104 Setup const& setup,
105 ServerHandler& serverHandler,
107 Resolver& resolver,
108 boost::asio::io_context& io_context,
109 BasicConfig const& config,
110 beast::insight::Collector::ptr const& collector)
111 : app_(app)
112 , io_context_(io_context)
113 , work_(std::in_place, boost::asio::make_work_guard(io_context_))
114 , strand_(boost::asio::make_strand(io_context_))
115 , setup_(setup)
116 , journal_(app_.journal("Overlay"))
117 , serverHandler_(serverHandler)
119 , m_peerFinder(PeerFinder::make_Manager(
120 io_context,
121 stopwatch(),
122 app_.journal("PeerFinder"),
123 config,
124 collector))
125 , m_resolver(resolver)
126 , next_id_(1)
127 , timer_count_(0)
128 , slots_(app.logs(), *this, app.config())
129 , m_stats(
130 std::bind(&OverlayImpl::collect_metrics, this),
131 collector,
132 [counts = m_traffic.getCounts(), collector]() {
134
135 for (auto const& pair : counts)
136 ret.emplace(
137 pair.first, TrafficGauges(pair.second.name, collector));
138
139 return ret;
140 }())
141{
143}
144
145Handoff
147 std::unique_ptr<stream_type>&& stream_ptr,
148 http_request_type&& request,
149 endpoint_type remote_endpoint)
150{
151 auto const id = next_id_++;
152 beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
153 beast::Journal journal(sink);
154
155 Handoff handoff;
156 if (processRequest(request, handoff))
157 return handoff;
158 if (!isPeerUpgrade(request))
159 return handoff;
160
161 handoff.moved = true;
162
163 JLOG(journal.debug()) << "Peer connection upgrade from " << remote_endpoint;
164
165 error_code ec;
166 auto const local_endpoint(
167 stream_ptr->next_layer().socket().local_endpoint(ec));
168 if (ec)
169 {
170 JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
171 return handoff;
172 }
173
176 if (consumer.disconnect(journal))
177 return handoff;
178
179 auto const [slot, result] = m_peerFinder->new_inbound_slot(
182
183 if (slot == nullptr)
184 {
185 // connection refused either IP limit exceeded or self-connect
186 handoff.moved = false;
187 JLOG(journal.debug())
188 << "Peer " << remote_endpoint << " refused, " << to_string(result);
189 return handoff;
190 }
191
192 // Validate HTTP request
193
194 {
195 auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
196 if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
197 return boost::iequals(s, "peer");
198 }) == types.end())
199 {
200 handoff.moved = false;
201 handoff.response =
202 makeRedirectResponse(slot, request, remote_endpoint.address());
203 handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
204 return handoff;
205 }
206 }
207
208 auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
209 if (!negotiatedVersion)
210 {
211 m_peerFinder->on_closed(slot);
212 handoff.moved = false;
213 handoff.response = makeErrorResponse(
214 slot,
215 request,
216 remote_endpoint.address(),
217 "Unable to agree on a protocol version");
218 handoff.keep_alive = false;
219 return handoff;
220 }
221
222 auto const sharedValue = makeSharedValue(*stream_ptr, journal);
223 if (!sharedValue)
224 {
225 m_peerFinder->on_closed(slot);
226 handoff.moved = false;
227 handoff.response = makeErrorResponse(
228 slot,
229 request,
230 remote_endpoint.address(),
231 "Incorrect security cookie");
232 handoff.keep_alive = false;
233 return handoff;
234 }
235
236 try
237 {
238 auto publicKey = verifyHandshake(
239 request,
240 *sharedValue,
243 remote_endpoint.address(),
244 app_);
245
246 consumer.setPublicKey(publicKey);
247
248 {
249 // The node gets a reserved slot if it is in our cluster
250 // or if it has a reservation.
251 bool const reserved =
252 static_cast<bool>(app_.cluster().member(publicKey)) ||
253 app_.peerReservations().contains(publicKey);
254 auto const result =
255 m_peerFinder->activate(slot, publicKey, reserved);
256 if (result != PeerFinder::Result::success)
257 {
258 m_peerFinder->on_closed(slot);
259 JLOG(journal.debug()) << "Peer " << remote_endpoint
260 << " redirected, " << to_string(result);
261 handoff.moved = false;
263 slot, request, remote_endpoint.address());
264 handoff.keep_alive = false;
265 return handoff;
266 }
267 }
268
269 auto const peer = std::make_shared<PeerImp>(
270 app_,
271 id,
272 slot,
273 std::move(request),
274 publicKey,
275 *negotiatedVersion,
276 consumer,
277 std::move(stream_ptr),
278 *this);
279 {
280 // As we are not on the strand, run() must be called
281 // while holding the lock, otherwise new I/O can be
282 // queued after a call to stop().
283 std::lock_guard<decltype(mutex_)> lock(mutex_);
284 {
285 auto const result = m_peers.emplace(peer->slot(), peer);
286 XRPL_ASSERT(
287 result.second,
288 "xrpl::OverlayImpl::onHandoff : peer is inserted");
289 (void)result.second;
290 }
291 list_.emplace(peer.get(), peer);
292
293 peer->run();
294 }
295 handoff.moved = true;
296 return handoff;
297 }
298 catch (std::exception const& e)
299 {
300 JLOG(journal.debug()) << "Peer " << remote_endpoint
301 << " fails handshake (" << e.what() << ")";
302
303 m_peerFinder->on_closed(slot);
304 handoff.moved = false;
305 handoff.response = makeErrorResponse(
306 slot, request, remote_endpoint.address(), e.what());
307 handoff.keep_alive = false;
308 return handoff;
309 }
310}
311
312//------------------------------------------------------------------------------
313
314bool
316{
317 if (!is_upgrade(request))
318 return false;
319 auto const versions = parseProtocolVersions(request["Upgrade"]);
320 return !versions.empty();
321}
322
325{
327 ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
328 return ss.str();
329}
330
334 http_request_type const& request,
335 address_type remote_address)
336{
337 boost::beast::http::response<json_body> msg;
338 msg.version(request.version());
339 msg.result(boost::beast::http::status::service_unavailable);
340 msg.insert("Server", BuildInfo::getFullVersionString());
341 {
343 ostr << remote_address;
344 msg.insert("Remote-Address", ostr.str());
345 }
346 msg.insert("Content-Type", "application/json");
347 msg.insert(boost::beast::http::field::connection, "close");
348 msg.body() = Json::objectValue;
349 {
350 Json::Value& ips = (msg.body()["peer-ips"] = Json::arrayValue);
351 for (auto const& _ : m_peerFinder->redirect(slot))
352 ips.append(_.address.to_string());
353 }
354 msg.prepare_payload();
356}
357
361 http_request_type const& request,
362 address_type remote_address,
363 std::string text)
364{
365 boost::beast::http::response<boost::beast::http::empty_body> msg;
366 msg.version(request.version());
367 msg.result(boost::beast::http::status::bad_request);
368 msg.reason("Bad Request (" + text + ")");
369 msg.insert("Server", BuildInfo::getFullVersionString());
370 msg.insert("Remote-Address", remote_address.to_string());
371 msg.insert(boost::beast::http::field::connection, "close");
372 msg.prepare_payload();
374}
375
376//------------------------------------------------------------------------------
377
378void
380{
381 XRPL_ASSERT(work_, "xrpl::OverlayImpl::connect : work is set");
382
383 auto usage = resourceManager().newOutboundEndpoint(remote_endpoint);
384 if (usage.disconnect(journal_))
385 {
386 JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
387 return;
388 }
389
390 auto const [slot, result] = peerFinder().new_outbound_slot(remote_endpoint);
391 if (slot == nullptr)
392 {
393 JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint
394 << ": " << to_string(result);
395 return;
396 }
397
399 app_,
402 usage,
404 next_id_++,
405 slot,
406 app_.journal("Peer"),
407 *this);
408
410 list_.emplace(p.get(), p);
411 p->run();
412}
413
414//------------------------------------------------------------------------------
415
416// Adds a peer that is already handshaked and active
417void
419{
420 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
421 beast::Journal journal{sink};
422
424
425 {
426 auto const result = m_peers.emplace(peer->slot(), peer);
427 XRPL_ASSERT(
428 result.second, "xrpl::OverlayImpl::add_active : peer is inserted");
429 (void)result.second;
430 }
431
432 {
433 auto const result = ids_.emplace(
435 std::make_tuple(peer->id()),
436 std::make_tuple(peer));
437 XRPL_ASSERT(
438 result.second,
439 "xrpl::OverlayImpl::add_active : peer ID is inserted");
440 (void)result.second;
441 }
442
443 list_.emplace(peer.get(), peer);
444
445 JLOG(journal.debug()) << "activated";
446
447 // As we are not on the strand, run() must be called
448 // while holding the lock, otherwise new I/O can be
449 // queued after a call to stop().
450 peer->run();
451}
452
453void
455{
457 auto const iter = m_peers.find(slot);
458 XRPL_ASSERT(
459 iter != m_peers.end(), "xrpl::OverlayImpl::remove : valid input");
460 m_peers.erase(iter);
461}
462
463void
465{
467 app_.config(),
468 serverHandler_.setup().overlay.port(),
469 app_.getValidationPublicKey().has_value(),
471
472 m_peerFinder->setConfig(config);
473 m_peerFinder->start();
474
475 // Populate our boot cache: if there are no entries in [ips] then we use
476 // the entries in [ips_fixed].
477 auto bootstrapIps =
479
480 // If nothing is specified, default to several well-known high-capacity
481 // servers to serve as bootstrap:
482 if (bootstrapIps.empty())
483 {
484 // Pool of servers operated by Ripple Labs Inc. - https://ripple.com
485 bootstrapIps.push_back("r.ripple.com 51235");
486
487 // Pool of servers operated by ISRDC - https://isrdc.in
488 bootstrapIps.push_back("sahyadri.isrdc.in 51235");
489
490 // Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
491 bootstrapIps.push_back("hubs.xrpkuwait.com 51235");
492
493 // Pool of servers operated by XRPL Commons - https://xrpl-commons.org
494 bootstrapIps.push_back("hub.xrpl-commons.org 51235");
495 }
496
498 bootstrapIps,
499 [this](
500 std::string const& name,
501 std::vector<beast::IP::Endpoint> const& addresses) {
503 ips.reserve(addresses.size());
504 for (auto const& addr : addresses)
505 {
506 if (addr.port() == 0)
507 ips.push_back(to_string(addr.at_port(DEFAULT_PEER_PORT)));
508 else
509 ips.push_back(to_string(addr));
510 }
511
512 std::string const base("config: ");
513 if (!ips.empty())
514 m_peerFinder->addFallbackStrings(base + name, ips);
515 });
516
517 // Add the ips_fixed from the xrpld.cfg file
519 {
522 [this](
523 std::string const& name,
524 std::vector<beast::IP::Endpoint> const& addresses) {
526 ips.reserve(addresses.size());
527
528 for (auto& addr : addresses)
529 {
530 if (addr.port() == 0)
531 ips.emplace_back(addr.address(), DEFAULT_PEER_PORT);
532 else
533 ips.emplace_back(addr);
534 }
535
536 if (!ips.empty())
537 m_peerFinder->addFixedPeer(name, ips);
538 });
539 }
540 auto const timer = std::make_shared<Timer>(*this);
542 list_.emplace(timer.get(), timer);
543 timer_ = timer;
544 timer->async_wait();
545}
546
547void
549{
550 boost::asio::dispatch(strand_, std::bind(&OverlayImpl::stopChildren, this));
551 {
552 std::unique_lock<decltype(mutex_)> lock(mutex_);
553 cond_.wait(lock, [this] { return list_.empty(); });
554 }
555 m_peerFinder->stop();
556}
557
558//------------------------------------------------------------------------------
559//
560// PropertyStream
561//
562//------------------------------------------------------------------------------
563
564void
566{
567 beast::PropertyStream::Set set("traffic", stream);
568 auto const stats = m_traffic.getCounts();
569 for (auto const& pair : stats)
570 {
572 item["category"] = pair.second.name;
573 item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
574 item["messages_in"] = std::to_string(pair.second.messagesIn.load());
575 item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
576 item["messages_out"] = std::to_string(pair.second.messagesOut.load());
577 }
578}
579
580//------------------------------------------------------------------------------
586void
588{
589 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
590 beast::Journal journal{sink};
591
592 // Now track this peer
593 {
595 auto const result(ids_.emplace(
597 std::make_tuple(peer->id()),
598 std::make_tuple(peer)));
599 XRPL_ASSERT(
600 result.second, "xrpl::OverlayImpl::activate : peer ID is inserted");
601 (void)result.second;
602 }
603
604 JLOG(journal.debug()) << "activated";
605
606 // We just accepted this peer so we have non-zero active peers
607 XRPL_ASSERT(size(), "xrpl::OverlayImpl::activate : nonzero peers");
608}
609
610void
616
617void
620 std::shared_ptr<PeerImp> const& from)
621{
622 auto const n = m->list_size();
623 auto const& journal = from->pJournal();
624
625 protocol::TMManifests relay;
626
627 for (std::size_t i = 0; i < n; ++i)
628 {
629 auto& s = m->list().Get(i).stobject();
630
631 if (auto mo = deserializeManifest(s))
632 {
633 auto const serialized = mo->serialized;
634
635 auto const result =
636 app_.validatorManifests().applyManifest(std::move(*mo));
637
638 if (result == ManifestDisposition::accepted)
639 {
640 relay.add_list()->set_stobject(s);
641
642 // N.B.: this is important; the applyManifest call above moves
643 // the loaded Manifest out of the optional so we need to
644 // reload it here.
645 mo = deserializeManifest(serialized);
646 XRPL_ASSERT(
647 mo,
648 "xrpl::OverlayImpl::onManifests : manifest "
649 "deserialization succeeded");
650
651 app_.getOPs().pubManifest(*mo);
652
653 if (app_.validators().listed(mo->masterKey))
654 {
655 auto db = app_.getWalletDB().checkoutDb();
656 addValidatorManifest(*db, serialized);
657 }
658 }
659 }
660 else
661 {
662 JLOG(journal.debug())
663 << "Malformed manifest #" << i + 1 << ": " << strHex(s);
664 continue;
665 }
666 }
667
668 if (!relay.list().empty())
669 for_each([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
670 std::shared_ptr<PeerImp>&& p) { p->send(m2); });
671}
672
673void
678
679void
690{
692 return ids_.size();
693}
694
695int
697{
698 return m_peerFinder->config().maxPeers;
699}
700
703{
704 using namespace std::chrono;
705 Json::Value jv;
706 auto& av = jv["active"] = Json::Value(Json::arrayValue);
707
709 auto& pv = av.append(Json::Value(Json::objectValue));
710 pv[jss::public_key] = base64_encode(
711 sp->getNodePublic().data(), sp->getNodePublic().size());
712 pv[jss::type] = sp->slot()->inbound() ? "in" : "out";
713 pv[jss::uptime] = static_cast<std::uint32_t>(
714 duration_cast<seconds>(sp->uptime()).count());
715 if (sp->crawl())
716 {
717 pv[jss::ip] = sp->getRemoteAddress().address().to_string();
718 if (sp->slot()->inbound())
719 {
720 if (auto port = sp->slot()->listening_port())
721 pv[jss::port] = *port;
722 }
723 else
724 {
725 pv[jss::port] = std::to_string(sp->getRemoteAddress().port());
726 }
727 }
728
729 {
730 auto version{sp->getVersion()};
731 if (!version.empty())
732 // Could move here if Json::value supported moving from strings
733 pv[jss::version] = std::string{version};
734 }
735
736 std::uint32_t minSeq, maxSeq;
737 sp->ledgerRange(minSeq, maxSeq);
738 if (minSeq != 0 || maxSeq != 0)
739 pv[jss::complete_ledgers] =
740 std::to_string(minSeq) + "-" + std::to_string(maxSeq);
741 });
742
743 return jv;
744}
745
748{
749 bool const humanReadable = false;
750 bool const admin = false;
751 bool const counters = false;
752
753 Json::Value server_info =
754 app_.getOPs().getServerInfo(humanReadable, admin, counters);
755
756 // Filter out some information
757 server_info.removeMember(jss::hostid);
758 server_info.removeMember(jss::load_factor_fee_escalation);
759 server_info.removeMember(jss::load_factor_fee_queue);
760 server_info.removeMember(jss::validation_quorum);
761
762 if (server_info.isMember(jss::validated_ledger))
763 {
764 Json::Value& validated_ledger = server_info[jss::validated_ledger];
765
766 validated_ledger.removeMember(jss::base_fee);
767 validated_ledger.removeMember(jss::reserve_base_xrp);
768 validated_ledger.removeMember(jss::reserve_inc_xrp);
769 }
770
771 return server_info;
772}
773
779
782{
783 Json::Value validators = app_.validators().getJson();
784
785 if (validators.isMember(jss::publisher_lists))
786 {
787 Json::Value& publisher_lists = validators[jss::publisher_lists];
788
789 for (auto& publisher : publisher_lists)
790 {
791 publisher.removeMember(jss::list);
792 }
793 }
794
795 validators.removeMember(jss::signing_keys);
796 validators.removeMember(jss::trusted_validator_keys);
797 validators.removeMember(jss::validation_quorum);
798
799 Json::Value validatorSites = app_.validatorSites().getJson();
800
801 if (validatorSites.isMember(jss::validator_sites))
802 {
803 validators[jss::validator_sites] =
804 std::move(validatorSites[jss::validator_sites]);
805 }
806
807 return validators;
808}
809
810// Returns information on verified peers.
813{
815 for (auto const& peer : getActivePeers())
816 {
817 json.append(peer->json());
818 }
819 return json;
820}
821
822bool
824{
825 if (req.target() != "/crawl" ||
827 return false;
828
829 boost::beast::http::response<json_body> msg;
830 msg.version(req.version());
831 msg.result(boost::beast::http::status::ok);
832 msg.insert("Server", BuildInfo::getFullVersionString());
833 msg.insert("Content-Type", "application/json");
834 msg.insert("Connection", "close");
835 msg.body()["version"] = Json::Value(2u);
836
838 {
839 msg.body()["overlay"] = getOverlayInfo();
840 }
842 {
843 msg.body()["server"] = getServerInfo();
844 }
846 {
847 msg.body()["counts"] = getServerCounts();
848 }
850 {
851 msg.body()["unl"] = getUnlInfo();
852 }
853
854 msg.prepare_payload();
856 return true;
857}
858
859bool
861 http_request_type const& req,
862 Handoff& handoff)
863{
864 // If the target is in the form "/vl/<validator_list_public_key>",
865 // return the most recent validator list for that key.
866 constexpr std::string_view prefix("/vl/");
867
868 if (!req.target().starts_with(prefix.data()) || !setup_.vlEnabled)
869 return false;
870
871 std::uint32_t version = 1;
872
873 boost::beast::http::response<json_body> msg;
874 msg.version(req.version());
875 msg.insert("Server", BuildInfo::getFullVersionString());
876 msg.insert("Content-Type", "application/json");
877 msg.insert("Connection", "close");
878
879 auto fail = [&msg, &handoff](auto status) {
880 msg.result(status);
881 msg.insert("Content-Length", "0");
882
883 msg.body() = Json::nullValue;
884
885 msg.prepare_payload();
887 return true;
888 };
889
890 std::string_view key = req.target().substr(prefix.size());
891
892 if (auto slash = key.find('/'); slash != std::string_view::npos)
893 {
894 auto verString = key.substr(0, slash);
895 if (!boost::conversion::try_lexical_convert(verString, version))
896 return fail(boost::beast::http::status::bad_request);
897 key = key.substr(slash + 1);
898 }
899
900 if (key.empty())
901 return fail(boost::beast::http::status::bad_request);
902
903 // find the list
904 auto vl = app_.validators().getAvailable(key, version);
905
906 if (!vl)
907 {
908 // 404 not found
909 return fail(boost::beast::http::status::not_found);
910 }
911 else if (!*vl)
912 {
913 return fail(boost::beast::http::status::bad_request);
914 }
915 else
916 {
917 msg.result(boost::beast::http::status::ok);
918
919 msg.body() = *vl;
920
921 msg.prepare_payload();
923 return true;
924 }
925}
926
927bool
929{
930 if (req.target() != "/health")
931 return false;
932 boost::beast::http::response<json_body> msg;
933 msg.version(req.version());
934 msg.insert("Server", BuildInfo::getFullVersionString());
935 msg.insert("Content-Type", "application/json");
936 msg.insert("Connection", "close");
937
938 auto info = getServerInfo();
939
940 int last_validated_ledger_age = -1;
941 if (info.isMember(jss::validated_ledger))
942 last_validated_ledger_age =
943 info[jss::validated_ledger][jss::age].asInt();
944 bool amendment_blocked = false;
945 if (info.isMember(jss::amendment_blocked))
946 amendment_blocked = true;
947 int number_peers = info[jss::peers].asInt();
948 std::string server_state = info[jss::server_state].asString();
949 auto load_factor = info[jss::load_factor_server].asDouble() /
950 info[jss::load_base].asDouble();
951
952 enum { healthy, warning, critical };
953 int health = healthy;
954 auto set_health = [&health](int state) {
955 if (health < state)
956 health = state;
957 };
958
959 msg.body()[jss::info] = Json::objectValue;
960 if (last_validated_ledger_age >= 7 || last_validated_ledger_age < 0)
961 {
962 msg.body()[jss::info][jss::validated_ledger] =
963 last_validated_ledger_age;
964 if (last_validated_ledger_age < 20)
965 set_health(warning);
966 else
967 set_health(critical);
968 }
969
970 if (amendment_blocked)
971 {
972 msg.body()[jss::info][jss::amendment_blocked] = true;
973 set_health(critical);
974 }
975
976 if (number_peers <= 7)
977 {
978 msg.body()[jss::info][jss::peers] = number_peers;
979 if (number_peers != 0)
980 set_health(warning);
981 else
982 set_health(critical);
983 }
984
985 if (!(server_state == "full" || server_state == "validating" ||
986 server_state == "proposing"))
987 {
988 msg.body()[jss::info][jss::server_state] = server_state;
989 if (server_state == "syncing" || server_state == "tracking" ||
990 server_state == "connected")
991 {
992 set_health(warning);
993 }
994 else
995 set_health(critical);
996 }
997
998 if (load_factor > 100)
999 {
1000 msg.body()[jss::info][jss::load_factor] = load_factor;
1001 if (load_factor < 1000)
1002 set_health(warning);
1003 else
1004 set_health(critical);
1005 }
1006
1007 switch (health)
1008 {
1009 case healthy:
1010 msg.result(boost::beast::http::status::ok);
1011 break;
1012 case warning:
1013 msg.result(boost::beast::http::status::service_unavailable);
1014 break;
1015 case critical:
1016 msg.result(boost::beast::http::status::internal_server_error);
1017 break;
1018 }
1019
1020 msg.prepare_payload();
1022 return true;
1023}
1024
1025bool
1027{
1028 // Take advantage of || short-circuiting
1029 return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
1030 processHealth(req, handoff);
1031}
1032
1035{
1037 ret.reserve(size());
1038
1039 for_each([&ret](std::shared_ptr<PeerImp>&& sp) {
1040 ret.emplace_back(std::move(sp));
1041 });
1042
1043 return ret;
1044}
1045
1048 std::set<Peer::id_t> const& toSkip,
1049 std::size_t& active,
1050 std::size_t& disabled,
1051 std::size_t& enabledInSkip) const
1052{
1054 std::lock_guard lock(mutex_);
1055
1056 active = ids_.size();
1057 disabled = enabledInSkip = 0;
1058 ret.reserve(ids_.size());
1059
1060 // NOTE The purpose of p is to delay the destruction of PeerImp
1062 for (auto& [id, w] : ids_)
1063 {
1064 if (p = w.lock(); p != nullptr)
1065 {
1066 bool const reduceRelayEnabled = p->txReduceRelayEnabled();
1067 // tx reduced relay feature disabled
1068 if (!reduceRelayEnabled)
1069 ++disabled;
1070
1071 if (toSkip.count(id) == 0)
1072 ret.emplace_back(std::move(p));
1073 else if (reduceRelayEnabled)
1074 ++enabledInSkip;
1075 }
1076 }
1077
1078 return ret;
1079}
1080
1081void
1083{
1084 for_each(
1085 [index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
1086}
1087
1090{
1091 std::lock_guard lock(mutex_);
1092 auto const iter = ids_.find(id);
1093 if (iter != ids_.end())
1094 return iter->second.lock();
1095 return {};
1096}
1097
1098// A public key hash map was not used due to the peer connect/disconnect
1099// update overhead outweighing the performance of a small set linear search.
1102{
1103 std::lock_guard lock(mutex_);
1104 // NOTE The purpose of peer is to delay the destruction of PeerImp
1106 for (auto const& e : ids_)
1107 {
1108 if (peer = e.second.lock(); peer != nullptr)
1109 {
1110 if (peer->getNodePublic() == pubKey)
1111 return peer;
1112 }
1113 }
1114 return {};
1115}
1116
1117void
1118OverlayImpl::broadcast(protocol::TMProposeSet& m)
1119{
1120 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
1121 for_each([&](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1122}
1123
1126 protocol::TMProposeSet& m,
1127 uint256 const& uid,
1128 PublicKey const& validator)
1129{
1130 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1131 {
1132 auto const sm =
1133 std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
1135 if (toSkip->find(p->id()) == toSkip->end())
1136 p->send(sm);
1137 });
1138 return *toSkip;
1139 }
1140 return {};
1141}
1142
1143void
1144OverlayImpl::broadcast(protocol::TMValidation& m)
1145{
1146 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
1147 for_each([sm](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1148}
1149
1152 protocol::TMValidation& m,
1153 uint256 const& uid,
1154 PublicKey const& validator)
1155{
1156 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1157 {
1158 auto const sm =
1159 std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
1161 if (toSkip->find(p->id()) == toSkip->end())
1162 p->send(sm);
1163 });
1164 return *toSkip;
1165 }
1166 return {};
1167}
1168
1171{
1173
1174 if (auto seq = app_.validatorManifests().sequence();
1175 seq != manifestListSeq_)
1176 {
1177 protocol::TMManifests tm;
1178
1180 [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
1181 [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
1182 tm.add_list()->set_stobject(
1183 manifest.serialized.data(), manifest.serialized.size());
1184 hr.addSuppression(manifest.hash());
1185 });
1186
1188
1189 if (tm.list_size() != 0)
1191 std::make_shared<Message>(tm, protocol::mtMANIFESTS);
1192
1193 manifestListSeq_ = seq;
1194 }
1195
1196 return manifestMessage_;
1197}
1198
1199void
1201 uint256 const& hash,
1203 std::set<Peer::id_t> const& toSkip)
1204{
1205 bool relay = tx.has_value();
1206 if (relay)
1207 {
1208 auto& txn = tx->get();
1209 SerialIter sit(makeSlice(txn.rawtransaction()));
1210 try
1211 {
1212 relay = !isPseudoTx(STTx{sit});
1213 }
1214 catch (std::exception const&)
1215 {
1216 // Could not construct STTx, not relaying
1217 JLOG(journal_.debug()) << "Could not construct STTx: " << hash;
1218 return;
1219 }
1220 }
1221
1222 Overlay::PeerSequence peers = {};
1223 std::size_t total = 0;
1224 std::size_t disabled = 0;
1225 std::size_t enabledInSkip = 0;
1226
1227 if (!relay)
1228 {
1230 return;
1231
1232 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1233 JLOG(journal_.trace())
1234 << "not relaying tx, total peers " << peers.size();
1235 for (auto const& p : peers)
1236 p->addTxQueue(hash);
1237 return;
1238 }
1239
1240 auto& txn = tx->get();
1241 auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
1242 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1243 auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
1244
1245 if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
1246 {
1247 for (auto const& p : peers)
1248 p->send(sm);
1251 txMetrics_.addMetrics(total, toSkip.size(), 0);
1252 return;
1253 }
1254
1255 // We have more peers than the minimum (disabled + minimum enabled),
1256 // relay to all disabled and some randomly selected enabled that
1257 // do not have the transaction.
1258 auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
1259 (total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;
1260
1261 txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
1262
1263 if (enabledTarget > enabledInSkip)
1264 std::shuffle(peers.begin(), peers.end(), default_prng());
1265
1266 JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size()
1267 << " selected " << enabledTarget << " skip "
1268 << toSkip.size() << " disabled " << disabled;
1269
1270 // count skipped peers with the enabled feature towards the quota
1271 std::uint16_t enabledAndRelayed = enabledInSkip;
1272 for (auto const& p : peers)
1273 {
1274 // always relay to a peer with the disabled feature
1275 if (!p->txReduceRelayEnabled())
1276 {
1277 p->send(sm);
1278 }
1279 else if (enabledAndRelayed < enabledTarget)
1280 {
1281 enabledAndRelayed++;
1282 p->send(sm);
1283 }
1284 else
1285 {
1286 p->addTxQueue(hash);
1287 }
1288 }
1289}
1290
1291//------------------------------------------------------------------------------
1292
1293void
1295{
1296 std::lock_guard lock(mutex_);
1297 list_.erase(&child);
1298 if (list_.empty())
1299 cond_.notify_all();
1300}
1301
1302void
1304{
1305 // Calling list_[].second->stop() may cause list_ to be modified
1306 // (OverlayImpl::remove() may be called on this same thread). So
1307 // iterating directly over list_ to call child->stop() could lead to
1308 // undefined behavior.
1309 //
1310 // Therefore we copy all of the weak/shared ptrs out of list_ before we
1311 // start calling stop() on them. That guarantees OverlayImpl::remove()
1312 // won't be called until vector<> children leaves scope.
1314 {
1315 std::lock_guard lock(mutex_);
1316 if (!work_)
1317 return;
1319
1320 children.reserve(list_.size());
1321 for (auto const& element : list_)
1322 {
1323 children.emplace_back(element.second.lock());
1324 }
1325 } // lock released
1326
1327 for (auto const& child : children)
1328 {
1329 if (child != nullptr)
1330 child->stop();
1331 }
1332}
1333
1334void
1336{
1337 auto const result = m_peerFinder->autoconnect();
1338 for (auto addr : result)
1339 connect(addr);
1340}
1341
1342void
1344{
1345 auto const result = m_peerFinder->buildEndpointsForPeers();
1346 for (auto const& e : result)
1347 {
1349 {
1350 std::lock_guard lock(mutex_);
1351 auto const iter = m_peers.find(e.first);
1352 if (iter != m_peers.end())
1353 peer = iter->second.lock();
1354 }
1355 if (peer)
1356 peer->sendEndpoints(e.second.begin(), e.second.end());
1357 }
1358}
1359
1360void
1362{
1363 for_each([](auto const& p) {
1364 if (p->txReduceRelayEnabled())
1365 p->sendTxQueue();
1366 });
1367}
1368
1371 PublicKey const& validator,
1372 bool squelch,
1373 uint32_t squelchDuration)
1374{
1375 protocol::TMSquelch m;
1376 m.set_squelch(squelch);
1377 m.set_validatorpubkey(validator.data(), validator.size());
1378 if (squelch)
1379 m.set_squelchduration(squelchDuration);
1380 return std::make_shared<Message>(m, protocol::mtSQUELCH);
1381}
1382
1383void
1385{
1386 if (auto peer = findPeerByShortID(id); peer)
1387 {
1388 // optimize - multiple message with different
1389 // validator might be sent to the same peer
1390 peer->send(makeSquelchMessage(validator, false, 0));
1391 }
1392}
1393
1394void
1396 PublicKey const& validator,
1397 Peer::id_t id,
1398 uint32_t squelchDuration) const
1399{
1400 if (auto peer = findPeerByShortID(id); peer)
1401 {
1402 peer->send(makeSquelchMessage(validator, true, squelchDuration));
1403 }
1404}
1405
1406void
1408 uint256 const& key,
1409 PublicKey const& validator,
1410 std::set<Peer::id_t>&& peers,
1411 protocol::MessageType type)
1412{
1413 if (!slots_.baseSquelchReady())
1414 return;
1415
1416 if (!strand_.running_in_this_thread())
1417 return post(
1418 strand_,
1419 // Must capture copies of reference parameters (i.e. key, validator)
1420 [this,
1421 key = key,
1422 validator = validator,
1423 peers = std::move(peers),
1424 type]() mutable {
1425 updateSlotAndSquelch(key, validator, std::move(peers), type);
1426 });
1427
1428 for (auto id : peers)
1429 slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
1431 });
1432}
1433
1434void
1436 uint256 const& key,
1437 PublicKey const& validator,
1438 Peer::id_t peer,
1439 protocol::MessageType type)
1440{
1441 if (!slots_.baseSquelchReady())
1442 return;
1443
1444 if (!strand_.running_in_this_thread())
1445 return post(
1446 strand_,
1447 // Must capture copies of reference parameters (i.e. key, validator)
1448 [this, key = key, validator = validator, peer, type]() {
1449 updateSlotAndSquelch(key, validator, peer, type);
1450 });
1451
1452 slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
1454 });
1455}
1456
1457void
1459{
1460 if (!strand_.running_in_this_thread())
1461 return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
1462
1463 slots_.deletePeer(id, true);
1464}
1465
1466void
1468{
1469 if (!strand_.running_in_this_thread())
1470 return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this));
1471
1472 slots_.deleteIdlePeers();
1473}
1474
1475//------------------------------------------------------------------------------
1476
1479{
1480 Overlay::Setup setup;
1481
1482 {
1483 auto const& section = config.section("overlay");
1484 setup.context = make_SSLContext("");
1485
1486 set(setup.ipLimit, "ip_limit", section);
1487 if (setup.ipLimit < 0)
1488 Throw<std::runtime_error>("Configured IP limit is invalid");
1489
1490 std::string ip;
1491 set(ip, "public_ip", section);
1492 if (!ip.empty())
1493 {
1494 boost::system::error_code ec;
1495 setup.public_ip = boost::asio::ip::make_address(ip, ec);
1496 if (ec || beast::IP::is_private(setup.public_ip))
1497 Throw<std::runtime_error>("Configured public IP is invalid");
1498 }
1499 }
1500
1501 {
1502 auto const& section = config.section("crawl");
1503 auto const& values = section.values();
1504
1505 if (values.size() > 1)
1506 {
1507 Throw<std::runtime_error>(
1508 "Configured [crawl] section is invalid, too many values");
1509 }
1510
1511 bool crawlEnabled = true;
1512
1513 // Only allow "0|1" as a value
1514 if (values.size() == 1)
1515 {
1516 try
1517 {
1518 crawlEnabled = boost::lexical_cast<bool>(values.front());
1519 }
1520 catch (boost::bad_lexical_cast const&)
1521 {
1522 Throw<std::runtime_error>(
1523 "Configured [crawl] section has invalid value: " +
1524 values.front());
1525 }
1526 }
1527
1528 if (crawlEnabled)
1529 {
1530 if (get<bool>(section, "overlay", true))
1531 {
1533 }
1534 if (get<bool>(section, "server", true))
1535 {
1537 }
1538 if (get<bool>(section, "counts", false))
1539 {
1541 }
1542 if (get<bool>(section, "unl", true))
1543 {
1545 }
1546 }
1547 }
1548 {
1549 auto const& section = config.section("vl");
1550
1551 set(setup.vlEnabled, "enabled", section);
1552 }
1553
1554 try
1555 {
1556 auto id = config.legacy("network_id");
1557
1558 if (!id.empty())
1559 {
1560 if (id == "main")
1561 id = "0";
1562
1563 if (id == "testnet")
1564 id = "1";
1565
1566 if (id == "devnet")
1567 id = "2";
1568
1569 setup.networkID = beast::lexicalCastThrow<std::uint32_t>(id);
1570 }
1571 }
1572 catch (...)
1573 {
1574 Throw<std::runtime_error>(
1575 "Configured [network_id] section is invalid: must be a number "
1576 "or one of the strings 'main', 'testnet' or 'devnet'.");
1577 }
1578
1579 return setup;
1580}
1581
1584 Application& app,
1585 Overlay::Setup const& setup,
1586 ServerHandler& serverHandler,
1587 Resource::Manager& resourceManager,
1588 Resolver& resolver,
1589 boost::asio::io_context& io_context,
1590 BasicConfig const& config,
1591 beast::insight::Collector::ptr const& collector)
1592{
1594 app,
1595 setup,
1596 serverHandler,
1597 resourceManager,
1598 resolver,
1599 io_context,
1600 config,
1601 collector);
1602}
1603
1604} // namespace xrpl
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
Value removeMember(char const *key)
Remove and return the named member.
bool isMember(char const *key) const
Return true if the object has a member named key.
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:278
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
std::string const & name() const
Returns the name of this source.
void add(Source &source)
Add a child source.
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:15
virtual HashRouter & getHashRouter()=0
virtual Cluster & cluster()=0
virtual Config & config()=0
virtual ValidatorSite & validatorSites()=0
virtual beast::Journal journal(std::string const &name)=0
virtual DatabaseCon & getWalletDB()=0
Retrieve the "wallet database".
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual Logs & logs()=0
virtual PeerReservationTable & peerReservations()=0
virtual ManifestCache & validatorManifests()=0
virtual ValidatorList & validators()=0
virtual NetworkOPs & getOPs()=0
Holds unparsed configuration information.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
Section & section(std::string const &name)
Returns the section with the given name.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:240
std::vector< std::string > IPS
Definition Config.h:123
bool standalone() const
Definition Config.h:318
std::size_t TX_RELAY_PERCENTAGE
Definition Config.h:253
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:247
std::vector< std::string > IPS_FIXED
Definition Config.h:126
std::size_t TX_REDUCE_RELAY_MIN_PEERS
Definition Config.h:250
LockedSociSession checkoutDb()
std::optional< std::set< PeerShortID > > shouldRelay(uint256 const &key)
Determines whether the hashed item should be relayed.
virtual void pubManifest(Manifest const &)=0
void for_each_manifest(Function &&f) const
Invokes the callback once for every populated manifest.
Definition Manifest.h:406
ManifestDisposition applyManifest(Manifest m)
Add manifest to cache.
Definition Manifest.cpp:363
std::uint32_t sequence() const
A monotonically increasing number used to detect new manifests.
Definition Manifest.h:259
virtual Json::Value getServerInfo(bool human, bool admin, bool counters)=0
Child(OverlayImpl &overlay)
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
std::weak_ptr< Timer > timer_
Definition OverlayImpl.h:92
boost::asio::io_context & io_context_
Definition OverlayImpl.h:85
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
OverlayImpl(Application &app, Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
boost::asio::ip::address address_type
Definition OverlayImpl.h:63
static bool isPeerUpgrade(http_request_type const &request)
Resource::Manager & m_resourceManager
Definition OverlayImpl.h:97
boost::system::error_code error_code
Definition OverlayImpl.h:65
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
Json::Value getServerCounts()
Returns information about the local server's performance counters.
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
void stop() override
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
ServerHandler & serverHandler_
Definition OverlayImpl.h:96
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition OverlayImpl.h:88
std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
TrafficCount m_traffic
Definition OverlayImpl.h:99
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
void sendTxQueue()
Send once a second transactions' hashes aggregated by peers.
std::shared_ptr< Message > manifestMessage_
std::unique_ptr< PeerFinder::Manager > m_peerFinder
Definition OverlayImpl.h:98
std::optional< std::uint32_t > manifestListSeq_
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
void add_active(std::shared_ptr< PeerImp > const &peer)
PeerFinder::Manager & peerFinder()
Application & app_
Definition OverlayImpl.h:84
std::recursive_mutex mutex_
Definition OverlayImpl.h:90
Resource::Manager & resourceManager()
beast::Journal const journal_
Definition OverlayImpl.h:95
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:64
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition OverlayImpl.h:89
Json::Value json() override
Return diagnostics on the status of all peers.
static std::string makePrefix(std::uint32_t id)
Setup const & setup() const
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
metrics::TxMetrics txMetrics_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition OverlayImpl.h:93
int limit() override
Returns the maximum number of peers we are configured to allow.
std::condition_variable_any cond_
Definition OverlayImpl.h:91
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
std::shared_ptr< Message > getManifestsMessage()
Json::Value getUnlInfo()
Returns information about the local server's UNL.
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
std::atomic< Peer::id_t > next_id_
void reportInboundTraffic(TrafficCount::category cat, int bytes)
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
Resolver & m_resolver
Json::Value getServerInfo()
Returns information about the local server.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
void start() override
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
Json::Value getOverlayInfo()
Returns information about peers on the overlay network.
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
Manages the set of connected peers.
Definition Overlay.h:30
virtual std::pair< std::shared_ptr< Slot >, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)=0
Create a new outbound slot with the specified remote endpoint.
bool contains(PublicKey const &nodeId)
A public key.
Definition PublicKey.h:43
void resolve(std::vector< std::string > const &names, Handler handler)
resolve all hostnames on the list
Definition Resolver.h:38
Tracks load and resource consumption.
virtual Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by outbound IP address and port.
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:60
void setup(Setup const &setup, beast::Journal journal)
void addCount(category cat, bool inbound, int bytes)
Account for traffic associated with the given category.
auto const & getCounts() const
An up-to-date copy of all the counters.
Json::Value getJson() const
Return a JSON representation of the state of the validator list.
std::optional< Json::Value > getAvailable(std::string_view pubKey, std::optional< std::uint32_t > forceVersion={})
Returns the current valid list for the given publisher key, if available, as a Json object.
bool listed(PublicKey const &identity) const
Returns true if public key is included on any lists.
Json::Value getJson() const
Return JSON representation of configured validator sites.
T count(T... args)
T data(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T find_if(T... args)
T get(T... args)
T is_same_v
T make_tuple(T... args)
@ nullValue
'null' value
Definition json_value.h:20
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
bool is_private(Address const &addr)
Returns true if the address is a private unroutable address.
Definition IPAddress.h:52
Result split_commas(FwdIt first, FwdIt last)
Definition rfc2616.h:180
bool is_keep_alive(boost::beast::http::message< isRequest, Body, Fields > const &m)
Definition rfc2616.h:367
STL namespace.
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:62
@ checkIdlePeers
How often we check for idle peers (seconds)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:100
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
std::optional< ProtocolVersion > negotiateProtocolVersion(std::vector< ProtocolVersion > const &versions)
Given a list of supported protocol versions, choose the one we prefer.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:14
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
void addValidatorManifest(soci::session &session, std::string const &serialized)
addValidatorManifest Saves the manifest of a validator to the database.
Definition Wallet.cpp:100
std::string base64_encode(std::uint8_t const *data, std::size_t len)
Json::Value getCountsJson(Application &app, int minObjectCount)
Definition GetCounts.cpp:43
std::optional< Manifest > deserializeManifest(Slice s, beast::Journal journal)
Constructs Manifest from serialized string.
Definition Manifest.cpp:35
beast::xor_shift_engine & default_prng()
Return the default random engine.
@ manifest
Manifest.
constexpr Number squelch(Number const &x, Number const &limit) noexcept
Definition Number.h:360
std::shared_ptr< Message > makeSquelchMessage(PublicKey const &validator, bool squelch, uint32_t squelchDuration)
Overlay::Setup setup_Overlay(BasicConfig const &config)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:225
PublicKey verifyHandshake(boost::beast::http::fields const &headers, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:810
std::unique_ptr< Overlay > make_Overlay(Application &app, Overlay::Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
Creates the implementation of Overlay.
@ accepted
Manifest is valid.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T reserve(T... args)
T reset(T... args)
T setfill(T... args)
T setw(T... args)
T size(T... args)
T str(T... args)
static boost::asio::ip::tcp::endpoint to_asio_endpoint(IP::Endpoint const &address)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:21
std::shared_ptr< Writer > response
Definition Handoff.h:30
bool keep_alive
Definition Handoff.h:27
void on_timer(error_code ec)
Timer(OverlayImpl &overlay)
std::uint32_t crawlOptions
Definition Overlay.h:52
std::optional< std::uint32_t > networkID
Definition Overlay.h:53
std::shared_ptr< boost::asio::ssl::context > context
Definition Overlay.h:49
beast::IP::Address public_ip
Definition Overlay.h:50
PeerFinder configuration settings.
static Config makeConfig(xrpl::Config const &config, std::uint16_t port, bool validationPublicKey, int ipLimit)
Make PeerFinder::Config from configuration parameters.
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition TxMetrics.cpp:12
T substr(T... args)
T to_string(T... args)
T what(T... args)