rippled
Loading...
Searching...
No Matches
OverlayImpl.cpp
1#include <xrpld/app/misc/ValidatorList.h>
2#include <xrpld/app/misc/ValidatorSite.h>
3#include <xrpld/overlay/Cluster.h>
4#include <xrpld/overlay/detail/ConnectAttempt.h>
5#include <xrpld/overlay/detail/PeerImp.h>
6#include <xrpld/overlay/detail/TrafficCount.h>
7#include <xrpld/overlay/detail/Tuning.h>
8#include <xrpld/overlay/predicates.h>
9#include <xrpld/peerfinder/make_Manager.h>
10#include <xrpld/rpc/handlers/admin/status/GetCounts.h>
11#include <xrpld/rpc/json_body.h>
12
13#include <xrpl/basics/base64.h>
14#include <xrpl/basics/make_SSLContext.h>
15#include <xrpl/basics/random.h>
16#include <xrpl/beast/core/LexicalCast.h>
17#include <xrpl/core/HashRouter.h>
18#include <xrpl/protocol/STTx.h>
19#include <xrpl/rdb/RelationalDatabase.h>
20#include <xrpl/server/NetworkOPs.h>
21#include <xrpl/server/SimpleWriter.h>
22#include <xrpl/server/Wallet.h>
23
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/asio/executor_work_guard.hpp>
26
27#include <algorithm>
28
29namespace xrpl {
30
31namespace CrawlOptions {
32enum {
34 Overlay = (1 << 0),
35 ServerInfo = (1 << 1),
36 ServerCounts = (1 << 2),
37 Unl = (1 << 3)
38};
39} // namespace CrawlOptions
40
41//------------------------------------------------------------------------------
42
43OverlayImpl::Child::Child(OverlayImpl& overlay) : overlay_(overlay)
44{
45}
46
48{
49 overlay_.remove(*this);
50}
51
52//------------------------------------------------------------------------------
53
55{
56}
57
58void
60{
61 // This method is only ever called from the same strand that calls
62 // Timer::on_timer, ensuring they never execute concurrently.
63 stopping_ = true;
64 timer_.cancel();
65}
66
67void
69{
70 timer_.expires_after(std::chrono::seconds(1));
71 timer_.async_wait(
72 boost::asio::bind_executor(
73 overlay_.strand_,
74 std::bind(&Timer::on_timer, shared_from_this(), std::placeholders::_1)));
75}
76
77void
79{
80 if (ec || stopping_)
81 {
82 if (ec && ec != boost::asio::error::operation_aborted)
83 {
84 JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
85 }
86 return;
87 }
88
89 overlay_.m_peerFinder->once_per_second();
90 overlay_.sendEndpoints();
91 overlay_.autoConnect();
92 if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
93 overlay_.sendTxQueue();
94
95 if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
96 overlay_.deleteIdlePeers();
97
98 async_wait();
99}
100
101//------------------------------------------------------------------------------
102
104 Application& app,
105 Setup const& setup,
106 ServerHandler& serverHandler,
108 Resolver& resolver,
109 boost::asio::io_context& io_context,
110 BasicConfig const& config,
111 beast::insight::Collector::ptr const& collector)
112 : app_(app)
113 , io_context_(io_context)
114 , work_(std::in_place, boost::asio::make_work_guard(io_context_))
115 , strand_(boost::asio::make_strand(io_context_))
116 , setup_(setup)
117 , journal_(app_.getJournal("Overlay"))
118 , serverHandler_(serverHandler)
120 , m_peerFinder(
121 PeerFinder::make_Manager(
122 io_context,
123 stopwatch(),
124 app_.getJournal("PeerFinder"),
125 config,
126 collector))
127 , m_resolver(resolver)
128 , next_id_(1)
129 , slots_(app, *this, app.config())
130 , m_stats(
131 std::bind(&OverlayImpl::collect_metrics, this),
132 collector,
133 [counts = m_traffic.getCounts(), collector]() {
135
136 for (auto const& pair : counts)
137 ret.emplace(pair.first, TrafficGauges(pair.second.name, collector));
138
139 return ret;
140 }())
141{
143}
144
145Handoff
147 std::unique_ptr<stream_type>&& stream_ptr,
148 http_request_type&& request,
149 endpoint_type remote_endpoint)
150{
151 auto const id = next_id_++;
152 auto peerJournal = app_.getJournal("Peer");
153 beast::WrappedSink sink(peerJournal.sink(), makePrefix(id));
154 beast::Journal const journal(sink);
155
156 Handoff handoff;
157 if (processRequest(request, handoff))
158 return handoff;
159 if (!isPeerUpgrade(request))
160 return handoff;
161
162 handoff.moved = true;
163
164 JLOG(journal.debug()) << "Peer connection upgrade from " << remote_endpoint;
165
166 error_code ec;
167 auto const local_endpoint(stream_ptr->next_layer().socket().local_endpoint(ec));
168 if (ec)
169 {
170 JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
171 return handoff;
172 }
173
176 if (consumer.disconnect(journal))
177 return handoff;
178
179 auto const [slot, result] = m_peerFinder->new_inbound_slot(
182
183 if (slot == nullptr)
184 {
185 // connection refused either IP limit exceeded or self-connect
186 handoff.moved = false;
187 JLOG(journal.debug()) << "Peer " << remote_endpoint << " refused, " << to_string(result);
188 return handoff;
189 }
190
191 // Validate HTTP request
192
193 {
194 auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
195 if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
196 return boost::iequals(s, "peer");
197 }) == types.end())
198 {
199 handoff.moved = false;
200 handoff.response = makeRedirectResponse(slot, request, remote_endpoint.address());
201 handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
202 return handoff;
203 }
204 }
205
206 auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
207 if (!negotiatedVersion)
208 {
209 m_peerFinder->on_closed(slot);
210 handoff.moved = false;
211 handoff.response = makeErrorResponse(
212 slot, request, remote_endpoint.address(), "Unable to agree on a protocol version");
213 handoff.keep_alive = false;
214 return handoff;
215 }
216
217 auto const sharedValue = makeSharedValue(*stream_ptr, journal);
218 if (!sharedValue)
219 {
220 m_peerFinder->on_closed(slot);
221 handoff.moved = false;
222 handoff.response = makeErrorResponse(
223 slot, request, remote_endpoint.address(), "Incorrect security cookie");
224 handoff.keep_alive = false;
225 return handoff;
226 }
227
228 try
229 {
230 auto publicKey = verifyHandshake(
231 request,
232 *sharedValue,
235 remote_endpoint.address(),
236 app_);
237
238 consumer.setPublicKey(publicKey);
239
240 {
241 // The node gets a reserved slot if it is in our cluster
242 // or if it has a reservation.
243 bool const reserved = static_cast<bool>(app_.getCluster().member(publicKey)) ||
244 app_.getPeerReservations().contains(publicKey);
245 auto const result = m_peerFinder->activate(slot, publicKey, reserved);
246 if (result != PeerFinder::Result::success)
247 {
248 m_peerFinder->on_closed(slot);
249 JLOG(journal.debug())
250 << "Peer " << remote_endpoint << " redirected, " << to_string(result);
251 handoff.moved = false;
252 handoff.response = makeRedirectResponse(slot, request, remote_endpoint.address());
253 handoff.keep_alive = false;
254 return handoff;
255 }
256 }
257
258 auto const peer = std::make_shared<PeerImp>(
259 app_,
260 id,
261 slot,
262 std::move(request),
263 publicKey,
264 *negotiatedVersion,
265 consumer,
266 std::move(stream_ptr),
267 *this);
268 {
269 // As we are not on the strand, run() must be called
270 // while holding the lock, otherwise new I/O can be
271 // queued after a call to stop().
272 std::lock_guard<decltype(mutex_)> const lock(mutex_);
273 {
274 auto const result = m_peers.emplace(peer->slot(), peer);
275 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::onHandoff : peer is inserted");
276 (void)result.second;
277 }
278 list_.emplace(peer.get(), peer);
279
280 peer->run();
281 }
282 handoff.moved = true;
283 return handoff;
284 }
285 catch (std::exception const& e)
286 {
287 JLOG(journal.debug()) << "Peer " << remote_endpoint << " fails handshake (" << e.what()
288 << ")";
289
290 m_peerFinder->on_closed(slot);
291 handoff.moved = false;
292 handoff.response = makeErrorResponse(slot, request, remote_endpoint.address(), e.what());
293 handoff.keep_alive = false;
294 return handoff;
295 }
296}
297
298//------------------------------------------------------------------------------
299
300bool
302{
303 if (!is_upgrade(request))
304 return false;
305 auto const versions = parseProtocolVersions(request["Upgrade"]);
306 return !versions.empty();
307}
308
311{
313 ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
314 return ss.str();
315}
316
320 http_request_type const& request,
321 address_type remote_address)
322{
323 boost::beast::http::response<json_body> msg;
324 msg.version(request.version());
325 msg.result(boost::beast::http::status::service_unavailable);
326 msg.insert("Server", BuildInfo::getFullVersionString());
327 {
329 ostr << remote_address;
330 msg.insert("Remote-Address", ostr.str());
331 }
332 msg.insert("Content-Type", "application/json");
333 msg.insert(boost::beast::http::field::connection, "close");
334 msg.body() = Json::objectValue;
335 {
336 Json::Value& ips = (msg.body()["peer-ips"] = Json::arrayValue);
337 for (auto const& _ : m_peerFinder->redirect(slot))
338 ips.append(_.address.to_string());
339 }
340 msg.prepare_payload();
342}
343
347 http_request_type const& request,
348 address_type remote_address,
349 std::string text)
350{
351 boost::beast::http::response<boost::beast::http::empty_body> msg;
352 msg.version(request.version());
353 msg.result(boost::beast::http::status::bad_request);
354 msg.reason("Bad Request (" + text + ")");
355 msg.insert("Server", BuildInfo::getFullVersionString());
356 msg.insert("Remote-Address", remote_address.to_string());
357 msg.insert(boost::beast::http::field::connection, "close");
358 msg.prepare_payload();
360}
361
362//------------------------------------------------------------------------------
363
364void
366{
367 XRPL_ASSERT(work_, "xrpl::OverlayImpl::connect : work is set");
368
369 auto usage = resourceManager().newOutboundEndpoint(remote_endpoint);
370 if (usage.disconnect(journal_))
371 {
372 JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
373 return;
374 }
375
376 auto const [slot, result] = peerFinder().new_outbound_slot(remote_endpoint);
377 if (slot == nullptr)
378 {
379 JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint << ": "
380 << to_string(result);
381 return;
382 }
383
385 app_,
388 usage,
390 next_id_++,
391 slot,
392 app_.getJournal("Peer"),
393 *this);
394
395 std::lock_guard const lock(mutex_);
396 list_.emplace(p.get(), p);
397 p->run();
398}
399
400//------------------------------------------------------------------------------
401
402// Adds a peer that is already handshaked and active
403void
405{
406 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
407 beast::Journal const journal{sink};
408
409 std::lock_guard const lock(mutex_);
410
411 {
412 auto const result = m_peers.emplace(peer->slot(), peer);
413 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::add_active : peer is inserted");
414 (void)result.second;
415 }
416
417 {
418 auto const result = ids_.emplace(
420 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::add_active : peer ID is inserted");
421 (void)result.second;
422 }
423
424 list_.emplace(peer.get(), peer);
425
426 JLOG(journal.debug()) << "activated";
427
428 // As we are not on the strand, run() must be called
429 // while holding the lock, otherwise new I/O can be
430 // queued after a call to stop().
431 peer->run();
432}
433
434void
436{
437 std::lock_guard const lock(mutex_);
438 auto const iter = m_peers.find(slot);
439 XRPL_ASSERT(iter != m_peers.end(), "xrpl::OverlayImpl::remove : valid input");
440 m_peers.erase(iter);
441}
442
443void
445{
447 app_.config(),
448 serverHandler_.setup().overlay.port(),
449 app_.getValidationPublicKey().has_value(),
451
452 m_peerFinder->setConfig(config);
453 m_peerFinder->start();
454
455 // Populate our boot cache: if there are no entries in [ips] then we use
456 // the entries in [ips_fixed].
457 auto bootstrapIps = app_.config().IPS.empty() ? app_.config().IPS_FIXED : app_.config().IPS;
458
459 // If nothing is specified, default to several well-known high-capacity
460 // servers to serve as bootstrap:
461 if (bootstrapIps.empty())
462 {
463 // Pool of servers operated by Ripple Labs Inc. - https://ripple.com
464 bootstrapIps.push_back("r.ripple.com 51235");
465
466 // Pool of servers operated by ISRDC - https://isrdc.in
467 bootstrapIps.push_back("sahyadri.isrdc.in 51235");
468
469 // Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
470 bootstrapIps.push_back("hubs.xrpkuwait.com 51235");
471
472 // Pool of servers operated by XRPL Commons - https://xrpl-commons.org
473 bootstrapIps.push_back("hub.xrpl-commons.org 51235");
474 }
475
477 bootstrapIps,
478 [this](std::string const& name, std::vector<beast::IP::Endpoint> const& addresses) {
480 ips.reserve(addresses.size());
481 for (auto const& addr : addresses)
482 {
483 if (addr.port() == 0)
484 {
485 ips.push_back(to_string(addr.at_port(DEFAULT_PEER_PORT)));
486 }
487 else
488 {
489 ips.push_back(to_string(addr));
490 }
491 }
492
493 std::string const base("config: ");
494 if (!ips.empty())
495 m_peerFinder->addFallbackStrings(base + name, ips);
496 });
497
498 // Add the ips_fixed from the xrpld.cfg file
500 {
503 [this](std::string const& name, std::vector<beast::IP::Endpoint> const& addresses) {
505 ips.reserve(addresses.size());
506
507 for (auto& addr : addresses)
508 {
509 if (addr.port() == 0)
510 {
511 ips.emplace_back(addr.address(), DEFAULT_PEER_PORT);
512 }
513 else
514 {
515 ips.emplace_back(addr);
516 }
517 }
518
519 if (!ips.empty())
520 m_peerFinder->addFixedPeer(name, ips);
521 });
522 }
523 auto const timer = std::make_shared<Timer>(*this);
524 std::lock_guard const lock(mutex_);
525 list_.emplace(timer.get(), timer);
526 timer_ = timer;
527 timer->async_wait();
528}
529
530void
532{
533 boost::asio::dispatch(strand_, std::bind(&OverlayImpl::stopChildren, this));
534 {
535 std::unique_lock<decltype(mutex_)> lock(mutex_);
536 cond_.wait(lock, [this] { return list_.empty(); });
537 }
538 m_peerFinder->stop();
539}
540
541//------------------------------------------------------------------------------
542//
543// PropertyStream
544//
545//------------------------------------------------------------------------------
546
547void
549{
550 beast::PropertyStream::Set set("traffic", stream);
551 auto const stats = m_traffic.getCounts();
552 for (auto const& pair : stats)
553 {
555 item["category"] = pair.second.name;
556 item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
557 item["messages_in"] = std::to_string(pair.second.messagesIn.load());
558 item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
559 item["messages_out"] = std::to_string(pair.second.messagesOut.load());
560 }
561}
562
563//------------------------------------------------------------------------------
569void
571{
572 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
573 beast::Journal const journal{sink};
574
575 // Now track this peer
576 {
577 std::lock_guard const lock(mutex_);
578 auto const result(ids_.emplace(
580 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::activate : peer ID is inserted");
581 (void)result.second;
582 }
583
584 JLOG(journal.debug()) << "activated";
585
586 // We just accepted this peer so we have non-zero active peers
587 XRPL_ASSERT(size(), "xrpl::OverlayImpl::activate : nonzero peers");
588}
589
590void
592{
593 std::lock_guard const lock(mutex_);
594 ids_.erase(id);
595}
596
597void
600 std::shared_ptr<PeerImp> const& from)
601{
602 auto const n = m->list_size();
603 auto const& journal = from->pJournal();
604
605 protocol::TMManifests relay;
606
607 for (std::size_t i = 0; i < n; ++i)
608 {
609 auto& s = m->list().Get(i).stobject();
610
611 if (auto mo = deserializeManifest(s))
612 {
613 auto const serialized = mo->serialized;
614
615 auto const result = app_.getValidatorManifests().applyManifest(std::move(*mo));
616
617 if (result == ManifestDisposition::accepted)
618 {
619 relay.add_list()->set_stobject(s);
620
621 // N.B.: this is important; the applyManifest call above moves
622 // the loaded Manifest out of the optional so we need to
623 // reload it here.
624 mo = deserializeManifest(serialized);
625 XRPL_ASSERT(
626 mo,
627 "xrpl::OverlayImpl::onManifests : manifest "
628 "deserialization succeeded");
629
630 app_.getOPs().pubManifest(*mo);
631
632 if (app_.getValidators().listed(mo->masterKey))
633 {
634 auto db = app_.getWalletDB().checkoutDb();
635 addValidatorManifest(*db, serialized);
636 }
637 }
638 }
639 else
640 {
641 JLOG(journal.debug()) << "Malformed manifest #" << i + 1 << ": " << strHex(s);
642 continue;
643 }
644 }
645
646 if (!relay.list().empty())
647 {
648 for_each([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
649 std::shared_ptr<PeerImp> const& p) { p->send(m2); });
650 }
651}
652
653void
658
659void
670{
671 std::lock_guard const lock(mutex_);
672 return ids_.size();
673}
674
675int
677{
678 return m_peerFinder->config().maxPeers;
679}
680
683{
684 using namespace std::chrono;
685 Json::Value jv;
686 auto& av = jv[jss::active] = Json::Value(Json::arrayValue);
687
688 for_each([&](std::shared_ptr<PeerImp> const& sp) {
689 auto& pv = av.append(Json::Value(Json::objectValue));
690 pv[jss::public_key] = base64_encode(sp->getNodePublic().data(), sp->getNodePublic().size());
691 pv[jss::type] = sp->slot()->inbound() ? jss::in : jss::out;
692 pv[jss::uptime] = static_cast<std::uint32_t>(duration_cast<seconds>(sp->uptime()).count());
693 if (sp->crawl())
694 {
695 pv[jss::ip] = sp->getRemoteAddress().address().to_string();
696 if (sp->slot()->inbound())
697 {
698 if (auto port = sp->slot()->listening_port())
699 pv[jss::port] = *port;
700 }
701 else
702 {
703 pv[jss::port] = sp->getRemoteAddress().port();
704 }
705 }
706
707 {
708 auto version{sp->getVersion()};
709 if (!version.empty())
710 {
711 // Could move here if Json::value supported moving from strings
712 pv[jss::version] = std::string{version};
713 }
714 }
715
716 std::uint32_t minSeq = 0, maxSeq = 0;
717 sp->ledgerRange(minSeq, maxSeq);
718 if (minSeq != 0 || maxSeq != 0)
719 pv[jss::complete_ledgers] = std::to_string(minSeq) + "-" + std::to_string(maxSeq);
720 });
721
722 return jv;
723}
724
727{
728 bool const humanReadable = false;
729 bool const admin = false;
730 bool const counters = false;
731
732 Json::Value server_info = app_.getOPs().getServerInfo(humanReadable, admin, counters);
733
734 // Filter out some information
735 server_info.removeMember(jss::hostid);
736 server_info.removeMember(jss::load_factor_fee_escalation);
737 server_info.removeMember(jss::load_factor_fee_queue);
738 server_info.removeMember(jss::validation_quorum);
739
740 if (server_info.isMember(jss::validated_ledger))
741 {
742 Json::Value& validated_ledger = server_info[jss::validated_ledger];
743
744 validated_ledger.removeMember(jss::base_fee);
745 validated_ledger.removeMember(jss::reserve_base_xrp);
746 validated_ledger.removeMember(jss::reserve_inc_xrp);
747 }
748
749 return server_info;
750}
751
757
760{
761 Json::Value validators = app_.getValidators().getJson();
762
763 if (validators.isMember(jss::publisher_lists))
764 {
765 Json::Value& publisher_lists = validators[jss::publisher_lists];
766
767 for (auto& publisher : publisher_lists)
768 {
769 publisher.removeMember(jss::list);
770 }
771 }
772
773 validators.removeMember(jss::signing_keys);
774 validators.removeMember(jss::trusted_validator_keys);
775 validators.removeMember(jss::validation_quorum);
776
777 Json::Value validatorSites = app_.getValidatorSites().getJson();
778
779 if (validatorSites.isMember(jss::validator_sites))
780 {
781 validators[jss::validator_sites] = std::move(validatorSites[jss::validator_sites]);
782 }
783
784 return validators;
785}
786
787// Returns information on verified peers.
790{
792 for (auto const& peer : getActivePeers())
793 {
794 json.append(peer->json());
795 }
796 return json;
797}
798
799bool
801{
802 if (req.target() != "/crawl" || setup_.crawlOptions == CrawlOptions::Disabled)
803 return false;
804
805 boost::beast::http::response<json_body> msg;
806 msg.version(req.version());
807 msg.result(boost::beast::http::status::ok);
808 msg.insert("Server", BuildInfo::getFullVersionString());
809 msg.insert("Content-Type", "application/json");
810 msg.insert("Connection", "close");
811 msg.body()["version"] = Json::Value(2u);
812
814 {
815 msg.body()["overlay"] = getOverlayInfo();
816 }
818 {
819 msg.body()["server"] = getServerInfo();
820 }
822 {
823 msg.body()["counts"] = getServerCounts();
824 }
826 {
827 msg.body()["unl"] = getUnlInfo();
828 }
829
830 msg.prepare_payload();
832 return true;
833}
834
835bool
837{
838 // If the target is in the form "/vl/<validator_list_public_key>",
839 // return the most recent validator list for that key.
840 constexpr std::string_view prefix("/vl/");
841
842 if (!req.target().starts_with(prefix) || !setup_.vlEnabled)
843 return false;
844
845 std::uint32_t version = 1;
846
847 boost::beast::http::response<json_body> msg;
848 msg.version(req.version());
849 msg.insert("Server", BuildInfo::getFullVersionString());
850 msg.insert("Content-Type", "application/json");
851 msg.insert("Connection", "close");
852
853 auto fail = [&msg, &handoff](auto status) {
854 msg.result(status);
855 msg.insert("Content-Length", "0");
856
857 msg.body() = Json::nullValue;
858
859 msg.prepare_payload();
861 return true;
862 };
863
864 std::string_view key = req.target().substr(prefix.size());
865
866 if (auto slash = key.find('/'); slash != std::string_view::npos)
867 {
868 auto verString = key.substr(0, slash);
869 if (!boost::conversion::try_lexical_convert(verString, version))
870 return fail(boost::beast::http::status::bad_request);
871 key = key.substr(slash + 1);
872 }
873
874 if (key.empty())
875 return fail(boost::beast::http::status::bad_request);
876
877 // find the list
878 auto vl = app_.getValidators().getAvailable(key, version);
879
880 if (!vl)
881 {
882 // 404 not found
883 return fail(boost::beast::http::status::not_found);
884 }
885 if (!*vl)
886 {
887 return fail(boost::beast::http::status::bad_request);
888 }
889
890 msg.result(boost::beast::http::status::ok);
891
892 msg.body() = *vl;
893
894 msg.prepare_payload();
896 return true;
897}
898
899bool
901{
902 if (req.target() != "/health")
903 return false;
904 boost::beast::http::response<json_body> msg;
905 msg.version(req.version());
906 msg.insert("Server", BuildInfo::getFullVersionString());
907 msg.insert("Content-Type", "application/json");
908 msg.insert("Connection", "close");
909
910 auto info = getServerInfo();
911
912 int last_validated_ledger_age = -1;
913 if (info.isMember(jss::validated_ledger))
914 last_validated_ledger_age = info[jss::validated_ledger][jss::age].asInt();
915 bool amendment_blocked = false;
916 if (info.isMember(jss::amendment_blocked))
917 amendment_blocked = true;
918 int const number_peers = info[jss::peers].asInt();
919 std::string const server_state = info[jss::server_state].asString();
920 auto load_factor = info[jss::load_factor_server].asDouble() / info[jss::load_base].asDouble();
921
922 enum class HealthState { healthy, warning, critical };
923 auto health = HealthState::healthy;
924 auto set_health = [&health](HealthState state) { health = std::max(health, state); };
925
926 msg.body()[jss::info] = Json::objectValue;
927 if (last_validated_ledger_age >= 7 || last_validated_ledger_age < 0)
928 {
929 msg.body()[jss::info][jss::validated_ledger] = last_validated_ledger_age;
930 if (last_validated_ledger_age < 20)
931 {
932 set_health(HealthState::warning);
933 }
934 else
935 {
936 set_health(HealthState::critical);
937 }
938 }
939
940 if (amendment_blocked)
941 {
942 msg.body()[jss::info][jss::amendment_blocked] = true;
943 set_health(HealthState::critical);
944 }
945
946 if (number_peers <= 7)
947 {
948 msg.body()[jss::info][jss::peers] = number_peers;
949 if (number_peers != 0)
950 {
951 set_health(HealthState::warning);
952 }
953 else
954 {
955 set_health(HealthState::critical);
956 }
957 }
958
959 if (!(server_state == "full" || server_state == "validating" || server_state == "proposing"))
960 {
961 msg.body()[jss::info][jss::server_state] = server_state;
962 if (server_state == "syncing" || server_state == "tracking" || server_state == "connected")
963 {
964 set_health(HealthState::warning);
965 }
966 else
967 {
968 set_health(HealthState::critical);
969 }
970 }
971
972 if (load_factor > 100)
973 {
974 msg.body()[jss::info][jss::load_factor] = load_factor;
975 if (load_factor < 1000)
976 {
977 set_health(HealthState::warning);
978 }
979 else
980 {
981 set_health(HealthState::critical);
982 }
983 }
984
985 switch (health)
986 {
987 case HealthState::healthy:
988 msg.result(boost::beast::http::status::ok);
989 break;
990 case HealthState::warning:
991 msg.result(boost::beast::http::status::service_unavailable);
992 break;
993 case HealthState::critical:
994 msg.result(boost::beast::http::status::internal_server_error);
995 break;
996 }
997
998 msg.prepare_payload();
1000 return true;
1001}
1002
1003bool
1005{
1006 // Take advantage of || short-circuiting
1007 return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
1008 processHealth(req, handoff);
1009}
1010
1013{
1015 ret.reserve(size());
1016
1017 for_each([&ret](std::shared_ptr<PeerImp> const& sp) { ret.emplace_back(sp); });
1018
1019 return ret;
1020}
1021
1024 std::set<Peer::id_t> const& toSkip,
1025 std::size_t& active,
1026 std::size_t& disabled,
1027 std::size_t& enabledInSkip) const
1028{
1030 std::lock_guard const lock(mutex_);
1031
1032 active = ids_.size();
1033 disabled = enabledInSkip = 0;
1034 ret.reserve(ids_.size());
1035
1036 // NOTE The purpose of p is to delay the destruction of PeerImp
1038 for (auto& [id, w] : ids_)
1039 {
1040 if (p = w.lock(); p != nullptr)
1041 {
1042 bool const reduceRelayEnabled = p->txReduceRelayEnabled();
1043 // tx reduced relay feature disabled
1044 if (!reduceRelayEnabled)
1045 ++disabled;
1046
1047 if (!toSkip.contains(id))
1048 {
1049 ret.emplace_back(std::move(p));
1050 }
1051 else if (reduceRelayEnabled)
1052 {
1053 ++enabledInSkip;
1054 }
1055 }
1056 }
1057
1058 return ret;
1059}
1060
1061void
1063{
1064 for_each([index](std::shared_ptr<PeerImp> const& sp) { sp->checkTracking(index); });
1065}
1066
1069{
1070 std::lock_guard const lock(mutex_);
1071 auto const iter = ids_.find(id);
1072 if (iter != ids_.end())
1073 return iter->second.lock();
1074 return {};
1075}
1076
1077// A public key hash map was not used due to the peer connect/disconnect
1078// update overhead outweighing the performance of a small set linear search.
1081{
1082 std::lock_guard const lock(mutex_);
1083 // NOTE The purpose of peer is to delay the destruction of PeerImp
1085 for (auto const& e : ids_)
1086 {
1087 if (peer = e.second.lock(); peer != nullptr)
1088 {
1089 if (peer->getNodePublic() == pubKey)
1090 return peer;
1091 }
1092 }
1093 return {};
1094}
1095
1096void
1097OverlayImpl::broadcast(protocol::TMProposeSet& m)
1098{
1099 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
1100 for_each([&](std::shared_ptr<PeerImp> const& p) { p->send(sm); });
1101}
1102
1104OverlayImpl::relay(protocol::TMProposeSet& m, uint256 const& uid, PublicKey const& validator)
1105{
1106 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1107 {
1108 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
1109 for_each([&](std::shared_ptr<PeerImp> const& p) {
1110 if (!toSkip->contains(p->id()))
1111 p->send(sm);
1112 });
1113 return *toSkip;
1114 }
1115 return {};
1116}
1117
1118void
1119OverlayImpl::broadcast(protocol::TMValidation& m)
1120{
1121 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
1122 for_each([sm](std::shared_ptr<PeerImp> const& p) { p->send(sm); });
1123}
1124
1126OverlayImpl::relay(protocol::TMValidation& m, uint256 const& uid, PublicKey const& validator)
1127{
1128 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1129 {
1130 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
1131 for_each([&](std::shared_ptr<PeerImp> const& p) {
1132 if (!toSkip->contains(p->id()))
1133 p->send(sm);
1134 });
1135 return *toSkip;
1136 }
1137 return {};
1138}
1139
1142{
1144
1145 if (auto seq = app_.getValidatorManifests().sequence(); seq != manifestListSeq_)
1146 {
1147 protocol::TMManifests tm;
1148
1150 [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
1151 [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
1152 tm.add_list()->set_stobject(manifest.serialized.data(), manifest.serialized.size());
1153 hr.addSuppression(manifest.hash());
1154 });
1155
1157
1158 if (tm.list_size() != 0)
1159 manifestMessage_ = std::make_shared<Message>(tm, protocol::mtMANIFESTS);
1160
1161 manifestListSeq_ = seq;
1162 }
1163
1164 return manifestMessage_;
1165}
1166
1167void
1169 uint256 const& hash,
1171 std::set<Peer::id_t> const& toSkip)
1172{
1173 bool relay = tx.has_value();
1174 if (relay)
1175 {
1176 auto& txn = tx->get();
1177 SerialIter sit(makeSlice(txn.rawtransaction()));
1178 try
1179 {
1180 relay = !isPseudoTx(STTx{sit});
1181 }
1182 catch (std::exception const&)
1183 {
1184 // Could not construct STTx, not relaying
1185 JLOG(journal_.debug()) << "Could not construct STTx: " << hash;
1186 return;
1187 }
1188 }
1189
1190 Overlay::PeerSequence peers = {};
1191 std::size_t total = 0;
1192 std::size_t disabled = 0;
1193 std::size_t enabledInSkip = 0;
1194
1195 if (!relay)
1196 {
1198 return;
1199
1200 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1201 JLOG(journal_.trace()) << "not relaying tx, total peers " << peers.size();
1202 for (auto const& p : peers)
1203 p->addTxQueue(hash);
1204 return;
1205 }
1206
1207 auto& txn = tx->get();
1208 auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
1209 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1210 auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
1211
1212 if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
1213 {
1214 for (auto const& p : peers)
1215 p->send(sm);
1217 txMetrics_.addMetrics(total, toSkip.size(), 0);
1218 return;
1219 }
1220
1221 // We have more peers than the minimum (disabled + minimum enabled),
1222 // relay to all disabled and some randomly selected enabled that
1223 // do not have the transaction.
1224 auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
1225 ((total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100);
1226
1227 txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
1228
1229 if (enabledTarget > enabledInSkip)
1230 std::shuffle(peers.begin(), peers.end(), default_prng());
1231
1232 JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size() << " selected "
1233 << enabledTarget << " skip " << toSkip.size() << " disabled "
1234 << disabled;
1235
1236 // count skipped peers with the enabled feature towards the quota
1237 std::uint16_t enabledAndRelayed = enabledInSkip;
1238 for (auto const& p : peers)
1239 {
1240 // always relay to a peer with the disabled feature
1241 if (!p->txReduceRelayEnabled())
1242 {
1243 p->send(sm);
1244 }
1245 else if (enabledAndRelayed < enabledTarget)
1246 {
1247 enabledAndRelayed++;
1248 p->send(sm);
1249 }
1250 else
1251 {
1252 p->addTxQueue(hash);
1253 }
1254 }
1255}
1256
1257//------------------------------------------------------------------------------
1258
1259void
1261{
1262 std::lock_guard const lock(mutex_);
1263 list_.erase(&child);
1264 if (list_.empty())
1265 cond_.notify_all();
1266}
1267
1268void
1270{
1271 // Calling list_[].second->stop() may cause list_ to be modified
1272 // (OverlayImpl::remove() may be called on this same thread). So
1273 // iterating directly over list_ to call child->stop() could lead to
1274 // undefined behavior.
1275 //
1276 // Therefore we copy all of the weak/shared ptrs out of list_ before we
1277 // start calling stop() on them. That guarantees OverlayImpl::remove()
1278 // won't be called until vector<> children leaves scope.
1280 {
1281 std::lock_guard const lock(mutex_);
1282 if (!work_)
1283 return;
1285
1286 children.reserve(list_.size());
1287 for (auto const& element : list_)
1288 {
1289 children.emplace_back(element.second.lock());
1290 }
1291 } // lock released
1292
1293 for (auto const& child : children)
1294 {
1295 if (child != nullptr)
1296 child->stop();
1297 }
1298}
1299
1300void
1302{
1303 auto const result = m_peerFinder->autoconnect();
1304 for (auto const& addr : result)
1305 connect(addr);
1306}
1307
1308void
1310{
1311 auto const result = m_peerFinder->buildEndpointsForPeers();
1312 for (auto const& e : result)
1313 {
1315 {
1316 std::lock_guard const lock(mutex_);
1317 auto const iter = m_peers.find(e.first);
1318 if (iter != m_peers.end())
1319 peer = iter->second.lock();
1320 }
1321 if (peer)
1322 peer->sendEndpoints(e.second.begin(), e.second.end());
1323 }
1324}
1325
1326void
1328{
1329 for_each([](auto const& p) {
1330 if (p->txReduceRelayEnabled())
1331 p->sendTxQueue();
1332 });
1333}
1334
1336makeSquelchMessage(PublicKey const& validator, bool squelch, uint32_t squelchDuration)
1337{
1338 protocol::TMSquelch m;
1339 m.set_squelch(squelch);
1340 m.set_validatorpubkey(validator.data(), validator.size());
1341 if (squelch)
1342 m.set_squelchduration(squelchDuration);
1343 return std::make_shared<Message>(m, protocol::mtSQUELCH);
1344}
1345
1346void
1348{
1349 if (auto peer = findPeerByShortID(id); peer)
1350 {
1351 // optimize - multiple message with different
1352 // validator might be sent to the same peer
1353 peer->send(makeSquelchMessage(validator, false, 0));
1354 }
1355}
1356
1357void
1358OverlayImpl::squelch(PublicKey const& validator, Peer::id_t id, uint32_t squelchDuration) const
1359{
1360 if (auto peer = findPeerByShortID(id); peer)
1361 {
1362 peer->send(makeSquelchMessage(validator, true, squelchDuration));
1363 }
1364}
1365
1366void
1368 uint256 const& key,
1369 PublicKey const& validator,
1370 std::set<Peer::id_t>&& peers,
1371 protocol::MessageType type)
1372{
1373 if (!slots_.baseSquelchReady())
1374 return;
1375
1376 if (!strand_.running_in_this_thread())
1377 {
1378 post(
1379 strand_,
1380 // Must capture copies of reference parameters (i.e. key, validator)
1381 [this, key = key, validator = validator, peers = std::move(peers), type]() mutable {
1382 updateSlotAndSquelch(key, validator, std::move(peers), type);
1383 });
1384
1385 return;
1386 }
1387
1388 for (auto id : peers)
1389 {
1390 slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
1392 });
1393 }
1394}
1395
1396void
1398 uint256 const& key,
1399 PublicKey const& validator,
1400 Peer::id_t peer,
1401 protocol::MessageType type)
1402{
1403 if (!slots_.baseSquelchReady())
1404 return;
1405
1406 if (!strand_.running_in_this_thread())
1407 {
1408 {
1409 post(
1410 strand_,
1411 // Must capture copies of reference parameters (i.e. key, validator)
1412 [this, key = key, validator = validator, peer, type]() {
1413 updateSlotAndSquelch(key, validator, peer, type);
1414 });
1415 }
1416 return;
1417 }
1418
1419 slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
1421 });
1422}
1423
1424void
1426{
1427 if (!strand_.running_in_this_thread())
1428 {
1429 post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
1430 return;
1431 }
1432
1433 slots_.deletePeer(id, true);
1434}
1435
1436void
1438{
1439 if (!strand_.running_in_this_thread())
1440 {
1442 return;
1443 }
1444
1445 slots_.deleteIdlePeers();
1446}
1447
1448//------------------------------------------------------------------------------
1449
1452{
1453 Overlay::Setup setup;
1454
1455 {
1456 auto const& section = config.section("overlay");
1457 setup.context = make_SSLContext("");
1458
1459 set(setup.ipLimit, "ip_limit", section);
1460 if (setup.ipLimit < 0)
1461 Throw<std::runtime_error>("Configured IP limit is invalid");
1462
1463 std::string ip;
1464 set(ip, "public_ip", section);
1465 if (!ip.empty())
1466 {
1467 boost::system::error_code ec;
1468 setup.public_ip = boost::asio::ip::make_address(ip, ec);
1469 if (ec || beast::IP::is_private(setup.public_ip))
1470 Throw<std::runtime_error>("Configured public IP is invalid");
1471 }
1472 }
1473
1474 {
1475 auto const& section = config.section("crawl");
1476 auto const& values = section.values();
1477
1478 if (values.size() > 1)
1479 {
1480 Throw<std::runtime_error>("Configured [crawl] section is invalid, too many values");
1481 }
1482
1483 bool crawlEnabled = true;
1484
1485 // Only allow "0|1" as a value
1486 if (values.size() == 1)
1487 {
1488 try
1489 {
1490 crawlEnabled = boost::lexical_cast<bool>(values.front());
1491 }
1492 catch (boost::bad_lexical_cast const&)
1493 {
1494 Throw<std::runtime_error>(
1495 "Configured [crawl] section has invalid value: " + values.front());
1496 }
1497 }
1498
1499 if (crawlEnabled)
1500 {
1501 if (get<bool>(section, "overlay", true))
1502 {
1504 }
1505 if (get<bool>(section, "server", true))
1506 {
1508 }
1509 if (get<bool>(section, "counts", false))
1510 {
1512 }
1513 if (get<bool>(section, "unl", true))
1514 {
1516 }
1517 }
1518 }
1519 {
1520 auto const& section = config.section("vl");
1521
1522 set(setup.vlEnabled, "enabled", section);
1523 }
1524
1525 try
1526 {
1527 auto id = config.legacy("network_id");
1528
1529 if (!id.empty())
1530 {
1531 if (id == "main")
1532 id = "0";
1533
1534 if (id == "testnet")
1535 id = "1";
1536
1537 if (id == "devnet")
1538 id = "2";
1539
1540 setup.networkID = beast::lexicalCastThrow<std::uint32_t>(id);
1541 }
1542 }
1543 catch (...)
1544 {
1545 Throw<std::runtime_error>(
1546 "Configured [network_id] section is invalid: must be a number "
1547 "or one of the strings 'main', 'testnet' or 'devnet'.");
1548 }
1549
1550 return setup;
1551}
1552
1555 Application& app,
1556 Overlay::Setup const& setup,
1557 ServerHandler& serverHandler,
1558 Resource::Manager& resourceManager,
1559 Resolver& resolver,
1560 boost::asio::io_context& io_context,
1561 BasicConfig const& config,
1562 beast::insight::Collector::ptr const& collector)
1563{
1565 app, setup, serverHandler, resourceManager, resolver, io_context, config, collector);
1566}
1567
1568} // namespace xrpl
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
Value removeMember(char const *key)
Remove and return the named member.
bool isMember(char const *key) const
Return true if the object has a member named key.
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
A generic endpoint for log messages.
Definition Journal.h:40
Stream debug() const
Definition Journal.h:301
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:270
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
std::string const & name() const
Returns the name of this source.
void add(Source &source)
Add a child source.
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:14
virtual Config & config()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
Holds unparsed configuration information.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
Section & section(std::string const &name)
Returns the section with the given name.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:243
std::vector< std::string > IPS
Definition Config.h:132
bool standalone() const
Definition Config.h:316
std::size_t TX_RELAY_PERCENTAGE
Definition Config.h:256
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:250
std::vector< std::string > IPS_FIXED
Definition Config.h:135
std::size_t TX_REDUCE_RELAY_MIN_PEERS
Definition Config.h:253
LockedSociSession checkoutDb()
std::optional< std::set< PeerShortID > > shouldRelay(uint256 const &key)
Determines whether the hashed item should be relayed.
virtual void pubManifest(Manifest const &)=0
void for_each_manifest(Function &&f) const
Invokes the callback once for every populated manifest.
Definition Manifest.h:402
ManifestDisposition applyManifest(Manifest m)
Add manifest to cache.
std::uint32_t sequence() const
A monotonically increasing number used to detect new manifests.
Definition Manifest.h:255
virtual Json::Value getServerInfo(bool human, bool admin, bool counters)=0
Child(OverlayImpl &overlay)
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
std::weak_ptr< Timer > timer_
Definition OverlayImpl.h:88
boost::asio::io_context & io_context_
Definition OverlayImpl.h:83
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
OverlayImpl(Application &app, Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
boost::asio::ip::address address_type
Definition OverlayImpl.h:61
static bool isPeerUpgrade(http_request_type const &request)
Resource::Manager & m_resourceManager
Definition OverlayImpl.h:93
boost::system::error_code error_code
Definition OverlayImpl.h:63
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
Json::Value getServerCounts()
Returns information about the local server's performance counters.
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition OverlayImpl.h:84
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
void stop() override
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
ServerHandler & serverHandler_
Definition OverlayImpl.h:92
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
static std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Definition OverlayImpl.h:97
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
TrafficCount m_traffic
Definition OverlayImpl.h:95
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
std::shared_ptr< Message > manifestMessage_
void sendTxQueue() const
Send once a second transactions' hashes aggregated by peers.
std::unique_ptr< PeerFinder::Manager > m_peerFinder
Definition OverlayImpl.h:94
std::optional< std::uint32_t > manifestListSeq_
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
void add_active(std::shared_ptr< PeerImp > const &peer)
PeerFinder::Manager & peerFinder()
Application & app_
Definition OverlayImpl.h:82
std::recursive_mutex mutex_
Definition OverlayImpl.h:86
Resource::Manager & resourceManager()
beast::Journal const journal_
Definition OverlayImpl.h:91
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:62
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition OverlayImpl.h:85
Json::Value json() override
Return diagnostics on the status of all peers.
static std::string makePrefix(std::uint32_t id)
Setup const & setup() const
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
metrics::TxMetrics txMetrics_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition OverlayImpl.h:89
int limit() override
Returns the maximum number of peers we are configured to allow.
std::condition_variable_any cond_
Definition OverlayImpl.h:87
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
Definition OverlayImpl.h:96
std::shared_ptr< Message > getManifestsMessage()
Json::Value getUnlInfo()
Returns information about the local server's UNL.
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
std::atomic< Peer::id_t > next_id_
Definition OverlayImpl.h:99
void reportInboundTraffic(TrafficCount::category cat, int bytes)
Json::Value getOverlayInfo() const
Returns information about peers on the overlay network.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
Resolver & m_resolver
Definition OverlayImpl.h:98
Json::Value getServerInfo()
Returns information about the local server.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
void start() override
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
Manages the set of connected peers.
Definition Overlay.h:29
virtual std::pair< std::shared_ptr< Slot >, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)=0
Create a new outbound slot with the specified remote endpoint.
bool contains(PublicKey const &nodeId)
A public key.
Definition PublicKey.h:42
void resolve(std::vector< std::string > const &names, Handler handler)
resolve all hostnames on the list
Definition Resolver.h:36
Tracks load and resource consumption.
virtual Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by outbound IP address and port.
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:58
void setup(Setup const &setup, beast::Journal journal)
virtual PeerReservationTable & getPeerReservations()=0
virtual beast::Journal getJournal(std::string const &name)=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & getValidators()=0
virtual ValidatorSite & getValidatorSites()=0
virtual HashRouter & getHashRouter()=0
virtual Cluster & getCluster()=0
virtual DatabaseCon & getWalletDB()=0
Retrieve the "wallet database".
virtual ManifestCache & getValidatorManifests()=0
void addCount(category cat, bool inbound, int bytes)
Account for traffic associated with the given category.
auto const & getCounts() const
An up-to-date copy of all the counters.
Json::Value getJson() const
Return a JSON representation of the state of the validator list.
std::optional< Json::Value > getAvailable(std::string_view pubKey, std::optional< std::uint32_t > forceVersion={})
Returns the current valid list for the given publisher key, if available, as a Json object.
bool listed(PublicKey const &identity) const
Returns true if public key is included on any lists.
Json::Value getJson() const
Return JSON representation of configured validator sites.
T contains(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T find_if(T... args)
T get(T... args)
T is_same_v
T make_tuple(T... args)
T max(T... args)
@ nullValue
'null' value
Definition json_value.h:19
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
bool is_private(Address const &addr)
Returns true if the address is a private unroutable address.
Definition IPAddress.h:51
Result split_commas(FwdIt first, FwdIt last)
Definition rfc2616.h:177
bool is_keep_alive(boost::beast::http::message< isRequest, Body, Fields > const &m)
Definition rfc2616.h:359
STL namespace.
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:82
@ checkIdlePeers
How often we check for idle peers (seconds)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:94
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:10
std::optional< ProtocolVersion > negotiateProtocolVersion(std::vector< ProtocolVersion > const &versions)
Given a list of supported protocol versions, choose the one we prefer.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
std::string base64_encode(std::uint8_t const *data, std::size_t len)
Json::Value getCountsJson(Application &app, int minObjectCount)
Definition GetCounts.cpp:43
void addValidatorManifest(soci::session &session, std::string const &serialized)
addValidatorManifest Saves the manifest of a validator to the database.
Definition Wallet.cpp:94
std::optional< Manifest > deserializeManifest(Slice s, beast::Journal journal)
Constructs Manifest from serialized string.
beast::xor_shift_engine & default_prng()
Return the default random engine.
@ manifest
Manifest.
constexpr Number squelch(Number const &x, Number const &limit) noexcept
Definition Number.h:750
std::shared_ptr< Message > makeSquelchMessage(PublicKey const &validator, bool squelch, uint32_t squelchDuration)
Overlay::Setup setup_Overlay(BasicConfig const &config)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
PublicKey verifyHandshake(boost::beast::http::fields const &headers, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:809
std::unique_ptr< Overlay > make_Overlay(Application &app, Overlay::Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
Creates the implementation of Overlay.
@ accepted
Manifest is valid.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T reserve(T... args)
T reset(T... args)
T setfill(T... args)
T setw(T... args)
T size(T... args)
T str(T... args)
static boost::asio::ip::tcp::endpoint to_asio_endpoint(IP::Endpoint const &address)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:18
std::shared_ptr< Writer > response
Definition Handoff.h:27
bool keep_alive
Definition Handoff.h:24
void on_timer(error_code ec)
Timer(OverlayImpl &overlay)
std::uint32_t crawlOptions
Definition Overlay.h:51
std::optional< std::uint32_t > networkID
Definition Overlay.h:52
std::shared_ptr< boost::asio::ssl::context > context
Definition Overlay.h:48
beast::IP::Address public_ip
Definition Overlay.h:49
PeerFinder configuration settings.
static Config makeConfig(xrpl::Config const &config, std::uint16_t port, bool validationPublicKey, int ipLimit)
Make PeerFinder::Config from configuration parameters.
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition TxMetrics.cpp:12
T substr(T... args)
T to_string(T... args)
T what(T... args)