xrpld
Loading...
Searching...
No Matches
OverlayImpl.cpp
1#include <xrpld/overlay/detail/OverlayImpl.h>
2
3#include <xrpld/app/misc/ValidatorList.h>
4#include <xrpld/app/misc/ValidatorSite.h>
5#include <xrpld/overlay/Cluster.h>
6#include <xrpld/overlay/detail/ConnectAttempt.h>
7#include <xrpld/overlay/detail/Handshake.h>
8#include <xrpld/overlay/detail/PeerImp.h>
9#include <xrpld/overlay/detail/ProtocolVersion.h>
10#include <xrpld/overlay/detail/TrafficCount.h>
11#include <xrpld/overlay/detail/Tuning.h>
12#include <xrpld/peerfinder/PeerfinderManager.h>
13#include <xrpld/peerfinder/Slot.h>
14#include <xrpld/peerfinder/make_Manager.h>
15#include <xrpld/rpc/ServerHandler.h>
16#include <xrpld/rpc/handlers/admin/status/GetCounts.h>
17#include <xrpld/rpc/json_body.h>
18
19#include <xrpl/basics/Log.h>
20#include <xrpl/basics/Resolver.h>
21#include <xrpl/basics/Slice.h>
22#include <xrpl/basics/base64.h>
23#include <xrpl/basics/base_uint.h>
24#include <xrpl/basics/chrono.h>
25#include <xrpl/basics/contract.h>
26#include <xrpl/basics/make_SSLContext.h>
27#include <xrpl/basics/random.h>
28#include <xrpl/basics/strHex.h>
29#include <xrpl/beast/core/LexicalCast.h>
30#include <xrpl/beast/insight/Collector.h>
31#include <xrpl/beast/net/IPAddress.h>
32#include <xrpl/beast/net/IPAddressConversion.h>
33#include <xrpl/beast/net/IPEndpoint.h>
34#include <xrpl/beast/rfc2616.h>
35#include <xrpl/beast/utility/PropertyStream.h>
36#include <xrpl/beast/utility/WrappedSink.h>
37#include <xrpl/beast/utility/instrumentation.h>
38#include <xrpl/config/BasicConfig.h>
39#include <xrpl/config/Constants.h>
40#include <xrpl/core/HashRouter.h>
41#include <xrpl/json/json_value.h>
42#include <xrpl/protocol/BuildInfo.h>
43#include <xrpl/protocol/STTx.h>
44#include <xrpl/protocol/Serializer.h>
45#include <xrpl/protocol/SystemParameters.h>
46#include <xrpl/protocol/jss.h>
47#include <xrpl/resource/ResourceManager.h>
48#include <xrpl/server/Handoff.h>
49#include <xrpl/server/Manifest.h>
50#include <xrpl/server/NetworkOPs.h>
51#include <xrpl/server/SimpleWriter.h>
52#include <xrpl/server/Wallet.h>
53#include <xrpl/server/Writer.h>
54
55#include <boost/algorithm/string/predicate.hpp>
56#include <boost/asio/bind_executor.hpp>
57#include <boost/asio/dispatch.hpp>
58#include <boost/asio/error.hpp>
59#include <boost/asio/executor_work_guard.hpp>
60#include <boost/asio/io_context.hpp>
61#include <boost/asio/ip/address.hpp>
62#include <boost/asio/post.hpp>
63#include <boost/asio/strand.hpp>
64#include <boost/beast/http/empty_body.hpp>
65#include <boost/beast/http/field.hpp>
66#include <boost/beast/http/status.hpp>
67#include <boost/lexical_cast.hpp>
68#include <boost/lexical_cast/bad_lexical_cast.hpp>
69#include <boost/lexical_cast/try_lexical_convert.hpp>
70
71#include <xrpl.pb.h>
72
73#include <algorithm>
74#include <chrono>
75#include <cstddef>
76#include <cstdint>
77#include <exception>
78#include <functional>
79#include <iomanip>
80#include <memory>
81#include <mutex>
82#include <optional>
83#include <set>
84#include <sstream>
85#include <stdexcept>
86#include <string>
87#include <string_view>
88#include <tuple>
89#include <unordered_map>
90#include <utility>
91#include <vector>
92
93namespace xrpl {
94
95namespace CrawlOptions {
96static constexpr auto kDisabled = 0;
97static constexpr auto kOverlay = (1 << 0);
98static constexpr auto kServerInfo = (1 << 1);
99static constexpr auto kServerCounts = (1 << 2);
100static constexpr auto kUnl = (1 << 3);
101} // namespace CrawlOptions
102
103//------------------------------------------------------------------------------
104
106{
107}
108
110{
111 overlay_.remove(*this);
112}
113
114//------------------------------------------------------------------------------
115
119
120void
122{
123 // This method is only ever called from the same strand that calls
124 // Timer::on_timer, ensuring they never execute concurrently.
125 stopping = true;
126 timer.cancel();
127}
128
129void
131{
132 timer.expires_after(std::chrono::seconds(1));
133 timer.async_wait(
134 boost::asio::bind_executor(
135 overlay_.strand_,
136 std::bind(&Timer::onTimer, shared_from_this(), std::placeholders::_1)));
137}
138
139void
141{
142 if (ec || stopping)
143 {
144 if (ec && ec != boost::asio::error::operation_aborted)
145 {
146 JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
147 }
148 return;
149 }
150
151 overlay_.peerFinder_->oncePerSecond();
152 overlay_.sendEndpoints();
153 overlay_.autoConnect();
154 if (overlay_.app_.config().txReduceRelayEnable)
155 overlay_.sendTxQueue();
156
157 if ((++overlay_.timerCount_ % Tuning::kCheckIdlePeers) == 0)
158 overlay_.deleteIdlePeers();
159
160 asyncWait();
161}
162
163//------------------------------------------------------------------------------
164
166 Application& app,
167 Setup setup,
168 ServerHandler& serverHandler,
170 Resolver& resolver,
171 boost::asio::io_context& ioContext,
172 BasicConfig const& config,
173 beast::insight::Collector::ptr const& collector)
174 : app_(app)
175 , ioContext_(ioContext)
176 , work_(std::in_place, boost::asio::make_work_guard(ioContext_))
177 , strand_(boost::asio::make_strand(ioContext_))
178 , setup_(std::move(setup))
179 , journal_(app_.getJournal("Overlay"))
180 , serverHandler_(serverHandler)
182 , peerFinder_(
183 PeerFinder::makeManager(
184 ioContext,
185 stopwatch(),
186 app_.getJournal("PeerFinder"),
187 config,
188 collector))
189 , resolver_(resolver)
190 , nextId_(1)
191 , slots_(app, *this, app.config())
192 , stats_(
193 std::bind(&OverlayImpl::collectMetrics, this),
194 collector,
195 [counts = traffic_.getCounts(), collector]() {
197
198 for (auto const& pair : counts)
199 ret.emplace(pair.first, TrafficGauges(pair.second.name, collector));
200
201 return ret;
202 }())
203{
204 beast::PropertyStream::Source::add(peerFinder_.get());
205}
206
207Handoff
210 http_request_type&& request,
211 endpoint_type remoteEndpoint)
212{
213 auto const id = nextId_++;
214 auto peerJournal = app_.getJournal("Peer");
215 beast::WrappedSink sink(peerJournal.sink(), makePrefix(id));
216 beast::Journal const journal(sink);
217
218 Handoff handoff;
219 if (processRequest(request, handoff))
220 return handoff;
221 if (!isPeerUpgrade(request))
222 return handoff;
223
224 handoff.moved = true;
225
226 JLOG(journal.debug()) << "Peer connection upgrade from " << remoteEndpoint;
227
228 error_code ec;
229 auto const localEndpoint(streamPtr->next_layer().socket().local_endpoint(ec));
230 if (ec)
231 {
232 JLOG(journal.debug()) << remoteEndpoint << " failed: " << ec.message();
233 return handoff;
234 }
235
236 auto consumer =
237 resourceManager_.newInboundEndpoint(beast::IPAddressConversion::fromAsio(remoteEndpoint));
238 if (consumer.disconnect(journal))
239 return handoff;
240
241 auto const [slot, result] = peerFinder_->newInboundSlot(
244
245 if (slot == nullptr)
246 {
247 // connection refused either IP limit exceeded or self-connect
248 handoff.moved = false;
249 JLOG(journal.debug()) << "Peer " << remoteEndpoint << " refused, " << to_string(result);
250 return handoff;
251 }
252
253 // Validate HTTP request
254
255 {
256 auto const types = beast::rfc2616::splitCommas(request["Connect-As"]);
257 if (std::ranges::find_if(types, [](std::string const& s) {
258 return boost::iequals(s, "peer");
259 }) == types.end())
260 {
261 handoff.moved = false;
262 handoff.response = makeRedirectResponse(slot, request, remoteEndpoint.address());
263 handoff.keepAlive = beast::rfc2616::isKeepAlive(request);
264 return handoff;
265 }
266 }
267
268 auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
269 if (!negotiatedVersion)
270 {
271 peerFinder_->onClosed(slot);
272 handoff.moved = false;
273 handoff.response = makeErrorResponse(
274 slot, request, remoteEndpoint.address(), "Unable to agree on a protocol version");
275 handoff.keepAlive = false;
276 return handoff;
277 }
278
279 auto const sharedValue = makeSharedValue(*streamPtr, journal);
280 if (!sharedValue)
281 {
282 peerFinder_->onClosed(slot);
283 handoff.moved = false;
284 handoff.response =
285 makeErrorResponse(slot, request, remoteEndpoint.address(), "Incorrect security cookie");
286 handoff.keepAlive = false;
287 return handoff;
288 }
289
290 try
291 {
292 auto publicKey = verifyHandshake(
293 request,
294 *sharedValue,
295 setup_.networkID,
296 setup_.publicIp,
297 remoteEndpoint.address(),
298 app_);
299
300 consumer.setPublicKey(publicKey);
301
302 {
303 // The node gets a reserved slot if it is in our cluster
304 // or if it has a reservation.
305 bool const reserved = static_cast<bool>(app_.getCluster().member(publicKey)) ||
306 app_.getPeerReservations().contains(publicKey);
307 auto const result = peerFinder_->activate(slot, publicKey, reserved);
308 if (result != PeerFinder::Result::Success)
309 {
310 peerFinder_->onClosed(slot);
311 JLOG(journal.debug())
312 << "Peer " << remoteEndpoint << " redirected, " << to_string(result);
313 handoff.moved = false;
314 handoff.response = makeRedirectResponse(slot, request, remoteEndpoint.address());
315 handoff.keepAlive = false;
316 return handoff;
317 }
318 }
319
320 auto const peer = std::make_shared<PeerImp>(
321 app_,
322 id,
323 slot,
324 std::move(request),
325 publicKey,
326 *negotiatedVersion,
327 consumer,
328 std::move(streamPtr),
329 *this);
330 {
331 // As we are not on the strand, run() must be called
332 // while holding the lock, otherwise new I/O can be
333 // queued after a call to stop().
334 std::scoped_lock const lock(mutex_);
335 {
336 auto const result = peers_.emplace(peer->slot(), peer);
337 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::onHandoff : peer is inserted");
338 (void)result.second;
339 }
340 list_.emplace(peer.get(), peer);
341
342 peer->run();
343 }
344 handoff.moved = true;
345 return handoff;
346 }
347 catch (std::exception const& e)
348 {
349 JLOG(journal.debug()) << "Peer " << remoteEndpoint << " fails handshake (" << e.what()
350 << ")";
351
352 peerFinder_->onClosed(slot);
353 handoff.moved = false;
354 handoff.response = makeErrorResponse(slot, request, remoteEndpoint.address(), e.what());
355 handoff.keepAlive = false;
356 return handoff;
357 }
358}
359
360//------------------------------------------------------------------------------
361
362bool
364{
365 if (!isUpgrade(request))
366 return false;
367 auto const versions = parseProtocolVersions(request["Upgrade"]);
368 return !versions.empty();
369}
370
373{
375 ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
376 return ss.str();
377}
378
382 http_request_type const& request,
383 address_type remoteAddress)
384{
385 boost::beast::http::response<JsonBody> msg;
386 msg.version(request.version());
387 msg.result(boost::beast::http::status::service_unavailable);
388 msg.insert("Server", BuildInfo::getFullVersionString());
389 {
391 ostr << remoteAddress;
392 msg.insert("Remote-Address", ostr.str());
393 }
394 msg.insert("Content-Type", "application/json");
395 msg.insert(boost::beast::http::field::connection, "close");
396 msg.body() = json::ValueType::Object;
397 {
398 json::Value& ips = (msg.body()["peer-ips"] = json::ValueType::Array);
399 for (auto const& _ : peerFinder_->redirect(slot))
400 ips.append(_.address.toString());
401 }
402 msg.prepare_payload();
404}
405
409 http_request_type const& request,
410 address_type remoteAddress,
411 std::string const& text)
412{
413 boost::beast::http::response<boost::beast::http::empty_body> msg;
414 msg.version(request.version());
415 msg.result(boost::beast::http::status::bad_request);
416 msg.reason("Bad Request (" + text + ")");
417 msg.insert("Server", BuildInfo::getFullVersionString());
418 msg.insert("Remote-Address", remoteAddress.to_string());
419 msg.insert(boost::beast::http::field::connection, "close");
420 msg.prepare_payload();
422}
423
424//------------------------------------------------------------------------------
425
426void
428{
429 XRPL_ASSERT(work_, "xrpl::OverlayImpl::connect : work is set");
430
431 auto usage = resourceManager().newOutboundEndpoint(remoteEndpoint);
432 if (usage.disconnect(journal_))
433 {
434 JLOG(journal_.info()) << "Over resource limit: " << remoteEndpoint;
435 return;
436 }
437
438 auto const [slot, result] = peerFinder().newOutboundSlot(remoteEndpoint);
439 if (slot == nullptr)
440 {
441 JLOG(journal_.debug()) << "Connect: No slot for " << remoteEndpoint << ": "
442 << to_string(result);
443 return;
444 }
445
447 app_,
450 usage,
451 setup_.context,
452 nextId_++,
453 slot,
454 app_.getJournal("Peer"),
455 *this);
456
457 std::scoped_lock const lock(mutex_);
458 list_.emplace(p.get(), p);
459 p->run();
460}
461
462//------------------------------------------------------------------------------
463
464// Adds a peer that is already handshaked and active
465void
467{
468 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
469 beast::Journal const journal{sink};
470
471 std::scoped_lock const lock(mutex_);
472
473 {
474 auto const result = peers_.emplace(peer->slot(), peer);
475 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::addActive : peer is inserted");
476 (void)result.second;
477 }
478
479 {
480 auto const result = ids_.emplace(
482 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::addActive : peer ID is inserted");
483 (void)result.second;
484 }
485
486 list_.emplace(peer.get(), peer);
487
488 JLOG(journal.debug()) << "activated";
489
490 // As we are not on the strand, run() must be called
491 // while holding the lock, otherwise new I/O can be
492 // queued after a call to stop().
493 peer->run();
494}
495
496void
498{
499 std::scoped_lock const lock(mutex_);
500 auto const iter = peers_.find(slot);
501 XRPL_ASSERT(iter != peers_.end(), "xrpl::OverlayImpl::remove : valid input");
502 peers_.erase(iter);
503}
504
505void
507{
509 app_.config(),
510 serverHandler_.setup().overlay.port(),
511 app_.getValidationPublicKey().has_value(),
512 setup_.ipLimit,
513 setup_.verifyEndpoints);
514
515 peerFinder_->setConfig(config);
516 peerFinder_->start();
517
518 // Populate our boot cache: if there are no entries in [ips] then we use
519 // the entries in [ips_fixed].
520 auto bootstrapIps = app_.config().ips.empty() ? app_.config().ipsFixed : app_.config().ips;
521
522 // If nothing is specified, default to several well-known high-capacity
523 // servers to serve as bootstrap:
524 if (bootstrapIps.empty())
525 {
526 // Pool of servers operated by Ripple Labs Inc. - https://ripple.com
527 bootstrapIps.emplace_back("r.ripple.com 51235");
528
529 // Pool of servers operated by ISRDC - https://isrdc.in
530 bootstrapIps.emplace_back("sahyadri.isrdc.in 51235");
531
532 // Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
533 bootstrapIps.emplace_back("hubs.xrpkuwait.com 51235");
534
535 // Pool of servers operated by XRPL Commons - https://xrpl-commons.org
536 bootstrapIps.emplace_back("hub.xrpl-commons.org 51235");
537 }
538
539 resolver_.resolve(
540 bootstrapIps,
541 [this](std::string const& name, std::vector<beast::IP::Endpoint> const& addresses) {
543 ips.reserve(addresses.size());
544 for (auto const& addr : addresses)
545 {
546 if (addr.port() == 0)
547 {
548 ips.push_back(to_string(addr.atPort(kDefaultPeerPort)));
549 }
550 else
551 {
552 ips.push_back(to_string(addr));
553 }
554 }
555
556 std::string const base("config: ");
557 if (!ips.empty())
558 peerFinder_->addFallbackStrings(base + name, ips);
559 });
560
561 // Add the ips_fixed from the xrpld.cfg file
562 if (!app_.config().standalone() && !app_.config().ipsFixed.empty())
563 {
564 resolver_.resolve(
565 app_.config().ipsFixed,
566 [this](std::string const& name, std::vector<beast::IP::Endpoint> const& addresses) {
567 std::vector<beast::IP::Endpoint> ips;
568 ips.reserve(addresses.size());
569
570 for (auto& addr : addresses)
571 {
572 if (addr.port() == 0)
573 {
574 ips.emplace_back(addr.address(), kDefaultPeerPort);
575 }
576 else
577 {
578 ips.emplace_back(addr);
579 }
580 }
581
582 if (!ips.empty())
583 peerFinder_->addFixedPeer(name, ips);
584 });
585 }
586 auto const timer = std::make_shared<Timer>(*this);
587 std::scoped_lock const lock(mutex_);
588 list_.emplace(timer.get(), timer);
589 timer_ = timer;
590 timer->asyncWait();
591}
592
593void
595{
596 boost::asio::dispatch(strand_, std::bind(&OverlayImpl::stopChildren, this));
597 {
598 std::unique_lock<decltype(mutex_)> lock(mutex_);
599 cond_.wait(lock, [this] { return list_.empty(); });
600 }
601 peerFinder_->stop();
602}
603
604//------------------------------------------------------------------------------
605//
606// PropertyStream
607//
608//------------------------------------------------------------------------------
609
610void
612{
613 beast::PropertyStream::Set set("traffic", stream);
614 auto const stats = traffic_.getCounts();
615 for (auto const& pair : stats)
616 {
618 item["category"] = pair.second.name;
619 item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
620 item["messages_in"] = std::to_string(pair.second.messagesIn.load());
621 item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
622 item["messages_out"] = std::to_string(pair.second.messagesOut.load());
623 }
624}
625
626//------------------------------------------------------------------------------
632void
634{
635 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
636 beast::Journal const journal{sink};
637
638 // Now track this peer
639 {
640 std::scoped_lock const lock(mutex_);
641 auto const result(ids_.emplace(
643 XRPL_ASSERT(result.second, "xrpl::OverlayImpl::activate : peer ID is inserted");
644 (void)result.second;
645 }
646
647 JLOG(journal.debug()) << "activated";
648
649 // We just accepted this peer so we have non-zero active peers
650 XRPL_ASSERT(size(), "xrpl::OverlayImpl::activate : nonzero peers");
651}
652
653void
655{
656 std::scoped_lock const lock(mutex_);
657 ids_.erase(id);
658}
659
660void
663 std::shared_ptr<PeerImp> const& from)
664{
665 auto const n = m->list_size();
666 auto const& journal = from->pJournal();
667
668 protocol::TMManifests relay;
669
670 for (std::size_t i = 0; i < n; ++i)
671 {
672 auto& s = m->list().Get(i).stobject();
673
674 if (auto mo = deserializeManifest(s))
675 {
676 auto const serialized = mo->serialized;
677
678 auto const result = app_.getValidatorManifests().applyManifest(std::move(*mo));
679
680 if (result == ManifestDisposition::Accepted)
681 {
682 relay.add_list()->set_stobject(s);
683
684 // N.B.: this is important; the applyManifest call above moves
685 // the loaded Manifest out of the optional so we need to
686 // reload it here.
687 mo = deserializeManifest(serialized);
688 XRPL_ASSERT(
689 mo,
690 "xrpl::OverlayImpl::onManifests : manifest "
691 "deserialization succeeded");
692 // NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
693 app_.getOPs().pubManifest(*mo);
694
695 if (app_.getValidators().listed(mo->masterKey))
696 {
697 auto db = app_.getWalletDB().checkoutDb();
698 addValidatorManifest(*db, serialized);
699 }
700 // NOLINTEND(bugprone-unchecked-optional-access)
701 }
702 }
703 else
704 {
705 JLOG(journal.debug()) << "Malformed manifest #" << i + 1 << ": " << strHex(s);
706 continue;
707 }
708 }
709
710 if (!relay.list().empty())
711 {
712 forEach([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
713 std::shared_ptr<PeerImp> const& p) { p->send(m2); });
714 }
715}
716
717void
719{
720 traffic_.addCount(cat, true, size);
721}
722
723void
725{
726 traffic_.addCount(cat, false, size);
727}
728
734{
735 std::scoped_lock const lock(mutex_);
736 return ids_.size();
737}
738
739int
741{
742 return peerFinder_->config().maxPeers;
743}
744
747{
748 using namespace std::chrono;
749 json::Value jv;
750 auto& av = jv[jss::active] = json::Value(json::ValueType::Array);
751
752 forEach([&](std::shared_ptr<PeerImp> const& sp) {
753 auto& pv = av.append(json::Value(json::ValueType::Object));
754 pv[jss::public_key] = base64Encode(sp->getNodePublic().data(), sp->getNodePublic().size());
755 pv[jss::type] = sp->slot()->inbound() ? jss::in : jss::out;
756 pv[jss::uptime] = static_cast<std::uint32_t>(duration_cast<seconds>(sp->uptime()).count());
757 if (sp->crawl())
758 {
759 pv[jss::ip] = sp->getRemoteAddress().address().to_string();
760 if (sp->slot()->inbound())
761 {
762 if (auto port = sp->slot()->listeningPort())
763 pv[jss::port] = *port;
764 }
765 else
766 {
767 pv[jss::port] = sp->getRemoteAddress().port();
768 }
769 }
770
771 {
772 auto version{sp->getVersion()};
773 if (!version.empty())
774 {
775 // Could move here if json::value supported moving from strings
776 pv[jss::version] = std::string{version};
777 }
778 }
779
780 std::uint32_t minSeq = 0, maxSeq = 0;
781 sp->ledgerRange(minSeq, maxSeq);
782 if (minSeq != 0 || maxSeq != 0)
783 pv[jss::complete_ledgers] = std::to_string(minSeq) + "-" + std::to_string(maxSeq);
784 });
785
786 return jv;
787}
788
791{
792 bool const humanReadable = false;
793 bool const admin = false;
794 bool const counters = false;
795
796 json::Value serverInfo = app_.getOPs().getServerInfo(humanReadable, admin, counters);
797
798 // Filter out some information
799 serverInfo.removeMember(jss::hostid);
800 serverInfo.removeMember(jss::load_factor_fee_escalation);
801 serverInfo.removeMember(jss::load_factor_fee_queue);
802 serverInfo.removeMember(jss::validation_quorum);
803
804 if (serverInfo.isMember(jss::validated_ledger))
805 {
806 json::Value& validatedLedger = serverInfo[jss::validated_ledger];
807
808 validatedLedger.removeMember(jss::base_fee);
809 validatedLedger.removeMember(jss::reserve_base_xrp);
810 validatedLedger.removeMember(jss::reserve_inc_xrp);
811 }
812
813 return serverInfo;
814}
815
821
824{
825 json::Value validators = app_.getValidators().getJson();
826
827 if (validators.isMember(jss::publisher_lists))
828 {
829 json::Value& publisherLists = validators[jss::publisher_lists];
830
831 for (auto& publisher : publisherLists)
832 {
833 publisher.removeMember(jss::list);
834 }
835 }
836
837 validators.removeMember(jss::signing_keys);
838 validators.removeMember(jss::trusted_validator_keys);
839 validators.removeMember(jss::validation_quorum);
840
841 json::Value validatorSites = app_.getValidatorSites().getJson();
842
843 if (validatorSites.isMember(jss::validator_sites))
844 {
845 validators[jss::validator_sites] = std::move(validatorSites[jss::validator_sites]);
846 }
847
848 return validators;
849}
850
851// Returns information on verified peers.
854{
856 for (auto const& peer : getActivePeers())
857 {
858 json.append(peer->json());
859 }
860 return json;
861}
862
863bool
865{
866 if (req.target() != "/crawl" || setup_.crawlOptions == CrawlOptions::kDisabled)
867 return false;
868
869 boost::beast::http::response<JsonBody> msg;
870 msg.version(req.version());
871 msg.result(boost::beast::http::status::ok);
872 msg.insert("Server", BuildInfo::getFullVersionString());
873 msg.insert("Content-Type", "application/json");
874 msg.insert("Connection", "close");
875 msg.body()["version"] = json::Value(2u);
876
877 if ((setup_.crawlOptions & CrawlOptions::kOverlay) != 0u)
878 {
879 msg.body()["overlay"] = getOverlayInfo();
880 }
881 if ((setup_.crawlOptions & CrawlOptions::kServerInfo) != 0u)
882 {
883 msg.body()["server"] = getServerInfo();
884 }
885 if ((setup_.crawlOptions & CrawlOptions::kServerCounts) != 0u)
886 {
887 msg.body()["counts"] = getServerCounts();
888 }
889 if ((setup_.crawlOptions & CrawlOptions::kUnl) != 0u)
890 {
891 msg.body()["unl"] = getUnlInfo();
892 }
893
894 msg.prepare_payload();
896 return true;
897}
898
899bool
901{
902 // If the target is in the form "/vl/<validator_list_public_key>",
903 // return the most recent validator list for that key.
904 constexpr std::string_view kPrefix("/vl/");
905
906 if (!req.target().starts_with(kPrefix) || !setup_.vlEnabled)
907 return false;
908
909 std::uint32_t version = 1;
910
911 boost::beast::http::response<JsonBody> msg;
912 msg.version(req.version());
913 msg.insert("Server", BuildInfo::getFullVersionString());
914 msg.insert("Content-Type", "application/json");
915 msg.insert("Connection", "close");
916
917 auto fail = [&msg, &handoff](auto status) {
918 msg.result(status);
919 msg.insert("Content-Length", "0");
920
921 msg.body() = json::ValueType::Null;
922
923 msg.prepare_payload();
925 return true;
926 };
927
928 std::string_view key = req.target().substr(kPrefix.size());
929
930 if (auto slash = key.find('/'); slash != std::string_view::npos)
931 {
932 auto verString = key.substr(0, slash);
933 if (!boost::conversion::try_lexical_convert(verString, version))
934 return fail(boost::beast::http::status::bad_request);
935 key = key.substr(slash + 1);
936 }
937
938 if (key.empty())
939 return fail(boost::beast::http::status::bad_request);
940
941 // find the list
942 auto vl = app_.getValidators().getAvailable(key, version);
943
944 if (!vl)
945 {
946 // 404 not found
947 return fail(boost::beast::http::status::not_found);
948 }
949 if (!*vl)
950 {
951 return fail(boost::beast::http::status::bad_request);
952 }
953
954 msg.result(boost::beast::http::status::ok);
955
956 msg.body() = *vl;
957
958 msg.prepare_payload();
960 return true;
961}
962
963bool
965{
966 if (req.target() != "/health")
967 return false;
968 boost::beast::http::response<JsonBody> msg;
969 msg.version(req.version());
970 msg.insert("Server", BuildInfo::getFullVersionString());
971 msg.insert("Content-Type", "application/json");
972 msg.insert("Connection", "close");
973
974 auto info = getServerInfo();
975
976 int lastValidatedLedgerAge = -1;
977 if (info.isMember(jss::validated_ledger))
978 lastValidatedLedgerAge = info[jss::validated_ledger][jss::age].asInt();
979 bool amendmentBlocked = false;
980 if (info.isMember(jss::amendment_blocked))
981 amendmentBlocked = true;
982 int const numberPeers = info[jss::peers].asInt();
983 std::string const serverState = info[jss::server_state].asString();
984 auto loadFactor = info[jss::load_factor_server].asDouble() / info[jss::load_base].asDouble();
985
986 enum class HealthState { Healthy, Warning, Critical };
987 auto health = HealthState::Healthy;
988 auto setHealth = [&health](HealthState state) { health = std::max(health, state); };
989
990 msg.body()[jss::info] = json::ValueType::Object;
991 if (lastValidatedLedgerAge >= 7 || lastValidatedLedgerAge < 0)
992 {
993 msg.body()[jss::info][jss::validated_ledger] = lastValidatedLedgerAge;
994 if (lastValidatedLedgerAge < 20)
995 {
996 setHealth(HealthState::Warning);
997 }
998 else
999 {
1000 setHealth(HealthState::Critical);
1001 }
1002 }
1003
1004 if (amendmentBlocked)
1005 {
1006 msg.body()[jss::info][jss::amendment_blocked] = true;
1007 setHealth(HealthState::Critical);
1008 }
1009
1010 if (numberPeers <= 7)
1011 {
1012 msg.body()[jss::info][jss::peers] = numberPeers;
1013 if (numberPeers != 0)
1014 {
1015 setHealth(HealthState::Warning);
1016 }
1017 else
1018 {
1019 setHealth(HealthState::Critical);
1020 }
1021 }
1022
1023 if (!(serverState == "full" || serverState == "validating" || serverState == "proposing"))
1024 {
1025 msg.body()[jss::info][jss::server_state] = serverState;
1026 if (serverState == "syncing" || serverState == "tracking" || serverState == "connected")
1027 {
1028 setHealth(HealthState::Warning);
1029 }
1030 else
1031 {
1032 setHealth(HealthState::Critical);
1033 }
1034 }
1035
1036 if (loadFactor > 100)
1037 {
1038 msg.body()[jss::info][jss::load_factor] = loadFactor;
1039 if (loadFactor < 1000)
1040 {
1041 setHealth(HealthState::Warning);
1042 }
1043 else
1044 {
1045 setHealth(HealthState::Critical);
1046 }
1047 }
1048
1049 switch (health)
1050 {
1051 case HealthState::Healthy:
1052 msg.result(boost::beast::http::status::ok);
1053 break;
1054 case HealthState::Warning:
1055 msg.result(boost::beast::http::status::service_unavailable);
1056 break;
1057 case HealthState::Critical:
1058 msg.result(boost::beast::http::status::internal_server_error);
1059 break;
1060 }
1061
1062 msg.prepare_payload();
1064 return true;
1065}
1066
1067bool
1069{
1070 // Take advantage of || short-circuiting
1071 return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
1072 processHealth(req, handoff);
1073}
1074
1077{
1079 ret.reserve(size());
1080
1081 forEach([&ret](std::shared_ptr<PeerImp> const& sp) { ret.emplace_back(sp); });
1082
1083 return ret;
1084}
1085
1088 std::set<Peer::id_t> const& toSkip,
1089 std::size_t& active,
1090 std::size_t& disabled,
1091 std::size_t& enabledInSkip) const
1092{
1095
1096 active = ids_.size();
1097 disabled = enabledInSkip = 0;
1098 ret.reserve(ids_.size());
1099
1100 // NOTE The purpose of p is to delay the destruction of PeerImp
1102 for (auto& [id, w] : ids_)
1103 {
1104 if (p = w.lock(); p != nullptr)
1105 {
1106 bool const reduceRelayEnabled = p->txReduceRelayEnabled();
1107 // tx reduced relay feature disabled
1108 if (!reduceRelayEnabled)
1109 ++disabled;
1110
1111 if (!toSkip.contains(id))
1112 {
1113 ret.emplace_back(std::move(p));
1114 }
1115 else if (reduceRelayEnabled)
1116 {
1117 ++enabledInSkip;
1118 }
1119 }
1120 }
1121
1122 return ret;
1123}
1124
1125void
1127{
1128 forEach([index](std::shared_ptr<PeerImp> const& sp) { sp->checkTracking(index); });
1129}
1130
1133{
1135 auto const iter = ids_.find(id);
1136 if (iter != ids_.end())
1137 return iter->second.lock();
1138 return {};
1139}
1140
1141// A public key hash map was not used due to the peer connect/disconnect
1142// update overhead outweighing the performance of a small set linear search.
1145{
1147 // NOTE The purpose of peer is to delay the destruction of PeerImp
1149 for (auto const& e : ids_)
1150 {
1151 if (peer = e.second.lock(); peer != nullptr)
1152 {
1153 if (peer->getNodePublic() == pubKey)
1154 return peer;
1155 }
1156 }
1157 return {};
1158}
1159
1160void
1161OverlayImpl::broadcast(protocol::TMProposeSet const& m)
1162{
1163 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
1164 forEach([&](std::shared_ptr<PeerImp> const& p) { p->send(sm); });
1165}
1166
1168OverlayImpl::relay(protocol::TMProposeSet const& m, uint256 const& uid, PublicKey const& validator)
1169{
1170 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1171 {
1172 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
1173 forEach([&](std::shared_ptr<PeerImp> const& p) {
1174 if (!toSkip->contains(p->id()))
1175 p->send(sm);
1176 });
1177 return *toSkip;
1178 }
1179 return {};
1180}
1181
1182void
1183OverlayImpl::broadcast(protocol::TMValidation const& m)
1184{
1185 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
1186 forEach([sm](std::shared_ptr<PeerImp> const& p) { p->send(sm); });
1187}
1188
1190OverlayImpl::relay(protocol::TMValidation const& m, uint256 const& uid, PublicKey const& validator)
1191{
1192 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1193 {
1194 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
1195 forEach([&](std::shared_ptr<PeerImp> const& p) {
1196 if (!toSkip->contains(p->id()))
1197 p->send(sm);
1198 });
1199 return *toSkip;
1200 }
1201 return {};
1202}
1203
1206{
1208
1209 if (auto seq = app_.getValidatorManifests().sequence(); seq != manifestListSeq_)
1210 {
1211 protocol::TMManifests tm;
1212
1213 app_.getValidatorManifests().forEachManifest(
1214 [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
1215 [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
1216 tm.add_list()->set_stobject(manifest.serialized.data(), manifest.serialized.size());
1217 hr.addSuppression(manifest.hash());
1218 });
1219
1220 manifestMessage_.reset();
1221
1222 if (tm.list_size() != 0)
1223 manifestMessage_ = std::make_shared<Message>(tm, protocol::mtMANIFESTS);
1224
1225 manifestListSeq_ = seq;
1226 }
1227
1228 return manifestMessage_;
1229}
1230
1231void
1233 uint256 const& hash,
1235 std::set<Peer::id_t> const& toSkip)
1236{
1237 bool relay = tx.has_value();
1238 if (relay)
1239 {
1240 auto& txn = tx->get();
1241 SerialIter sit(makeSlice(txn.rawtransaction()));
1242 try
1243 {
1244 relay = !isPseudoTx(STTx{sit});
1245 }
1246 catch (std::exception const&)
1247 {
1248 // Could not construct STTx, not relaying
1249 JLOG(journal_.debug()) << "Could not construct STTx: " << hash;
1250 return;
1251 }
1252 }
1253
1254 Overlay::PeerSequence peers = {};
1255 std::size_t total = 0;
1256 std::size_t disabled = 0;
1257 std::size_t enabledInSkip = 0;
1258
1259 if (!relay)
1260 {
1261 if (!app_.config().txReduceRelayEnable)
1262 return;
1263
1264 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1265 JLOG(journal_.trace()) << "not relaying tx, total peers " << peers.size();
1266 for (auto const& p : peers)
1267 p->addTxQueue(hash);
1268 return;
1269 }
1270
1271 auto& txn = tx->get();
1272 auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
1273 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1274 auto const minRelay = app_.config().txReduceRelayMinPeers + disabled;
1275
1276 if (!app_.config().txReduceRelayEnable || total <= minRelay)
1277 {
1278 for (auto const& p : peers)
1279 p->send(sm);
1280 if (app_.config().txReduceRelayEnable || app_.config().txReduceRelayMetrics)
1281 txMetrics_.addMetrics(total, toSkip.size(), 0);
1282 return;
1283 }
1284
1285 // We have more peers than the minimum (disabled + minimum enabled),
1286 // relay to all disabled and some randomly selected enabled that
1287 // do not have the transaction.
1288 auto const enabledTarget = app_.config().txReduceRelayMinPeers +
1289 ((total - minRelay) * app_.config().txRelayPercentage / 100);
1290
1291 txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
1292
1293 if (enabledTarget > enabledInSkip)
1294 std::shuffle(peers.begin(), peers.end(), defaultPrng());
1295
1296 JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size() << " selected "
1297 << enabledTarget << " skip " << toSkip.size() << " disabled "
1298 << disabled;
1299
1300 // count skipped peers with the enabled feature towards the quota
1301 std::uint16_t enabledAndRelayed = enabledInSkip;
1302 for (auto const& p : peers)
1303 {
1304 // always relay to a peer with the disabled feature
1305 if (!p->txReduceRelayEnabled())
1306 {
1307 p->send(sm);
1308 }
1309 else if (enabledAndRelayed < enabledTarget)
1310 {
1311 enabledAndRelayed++;
1312 p->send(sm);
1313 }
1314 else
1315 {
1316 p->addTxQueue(hash);
1317 }
1318 }
1319}
1320
1321//------------------------------------------------------------------------------
1322
1323void
1325{
1327 list_.erase(&child);
1328 if (list_.empty())
1329 cond_.notify_all();
1330}
1331
1332void
1334{
1335 // Calling list_[].second->stop() may cause list_ to be modified
1336 // (OverlayImpl::remove() may be called on this same thread). So
1337 // iterating directly over list_ to call child->stop() could lead to
1338 // undefined behavior.
1339 //
1340 // Therefore we copy all of the weak/shared ptrs out of list_ before we
1341 // start calling stop() on them. That guarantees OverlayImpl::remove()
1342 // won't be called until vector<> children leaves scope.
1344 {
1346 if (!work_)
1347 return;
1348 work_ = std::nullopt;
1349
1350 children.reserve(list_.size());
1351 for (auto const& element : list_)
1352 {
1353 children.emplace_back(element.second.lock());
1354 }
1355 } // lock released
1356
1357 for (auto const& child : children)
1358 {
1359 if (child != nullptr)
1360 child->stop();
1361 }
1362}
1363
1364void
1366{
1367 auto const result = peerFinder_->autoconnect();
1368 for (auto const& addr : result)
1369 connect(addr);
1370}
1371
1372void
1374{
1375 auto const result = peerFinder_->buildEndpointsForPeers();
1376 for (auto const& e : result)
1377 {
1379 {
1381 auto const iter = peers_.find(e.first);
1382 if (iter != peers_.end())
1383 peer = iter->second.lock();
1384 }
1385 if (peer)
1386 peer->sendEndpoints(e.second.begin(), e.second.end());
1387 }
1388}
1389
1390void
1392{
1393 forEach([](auto const& p) {
1394 if (p->txReduceRelayEnabled())
1395 p->sendTxQueue();
1396 });
1397}
1398
1400makeSquelchMessage(PublicKey const& validator, bool squelch, uint32_t squelchDuration)
1401{
1402 protocol::TMSquelch m;
1403 m.set_squelch(squelch);
1404 m.set_validatorpubkey(validator.data(), validator.size());
1405 if (squelch)
1406 m.set_squelchduration(squelchDuration);
1407 return std::make_shared<Message>(m, protocol::mtSQUELCH);
1408}
1409
1410void
1412{
1413 if (auto peer = findPeerByShortID(id); peer)
1414 {
1415 // optimize - multiple message with different
1416 // validator might be sent to the same peer
1417 peer->send(makeSquelchMessage(validator, false, 0));
1418 }
1419}
1420
1421void
1422OverlayImpl::squelch(PublicKey const& validator, Peer::id_t id, uint32_t squelchDuration) const
1423{
1424 if (auto peer = findPeerByShortID(id); peer)
1425 {
1426 peer->send(makeSquelchMessage(validator, true, squelchDuration));
1427 }
1428}
1429
1430void
1432 uint256 const& key,
1433 PublicKey const& validator,
1434 std::set<Peer::id_t>&& peers,
1435 protocol::MessageType type)
1436{
1437 if (!slots_.baseSquelchReady())
1438 return;
1439
1440 if (!strand_.running_in_this_thread())
1441 {
1442 post(
1443 strand_,
1444 // Must capture copies of reference parameters (i.e. key, validator)
1445 [this, key = key, validator = validator, peers = std::move(peers), type]() mutable {
1446 updateSlotAndSquelch(key, validator, std::move(peers), type);
1447 });
1448
1449 return;
1450 }
1451
1452 for (auto id : peers)
1453 {
1454 slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
1456 });
1457 }
1458}
1459
1460void
1462 uint256 const& key,
1463 PublicKey const& validator,
1464 Peer::id_t peer,
1465 protocol::MessageType type)
1466{
1467 if (!slots_.baseSquelchReady())
1468 return;
1469
1470 if (!strand_.running_in_this_thread())
1471 {
1472 {
1473 post(
1474 strand_,
1475 // Must capture copies of reference parameters (i.e. key, validator)
1476 [this, key = key, validator = validator, peer, type]() {
1477 updateSlotAndSquelch(key, validator, peer, type);
1478 });
1479 }
1480 return;
1481 }
1482
1483 slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
1485 });
1486}
1487
1488void
1490{
1491 if (!strand_.running_in_this_thread())
1492 {
1493 post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
1494 return;
1495 }
1496
1497 slots_.deletePeer(id, true);
1498}
1499
1500void
1502{
1503 if (!strand_.running_in_this_thread())
1504 {
1506 return;
1507 }
1508
1509 slots_.deleteIdlePeers();
1510}
1511
1512//------------------------------------------------------------------------------
1513
1516{
1517 Overlay::Setup setup;
1518
1519 {
1520 auto const& section = config.section(Sections::kOverlay);
1521 setup.context = makeSslContext("");
1522
1523 set(setup.ipLimit, "ip_limit", section);
1524 if (setup.ipLimit < 0)
1525 Throw<std::runtime_error>("Configured IP limit is invalid");
1526
1527 std::string ip;
1528 set(ip, "public_ip", section);
1529 if (!ip.empty())
1530 {
1531 boost::system::error_code ec;
1532 setup.publicIp = boost::asio::ip::make_address(ip, ec);
1533 if (ec || !beast::IP::isPublic(setup.publicIp))
1534 Throw<std::runtime_error>("Configured public IP is invalid");
1535 }
1536
1537 set(setup.verifyEndpoints, true, "verify_endpoints", section);
1538 if (!setup.verifyEndpoints)
1539 {
1540 JLOG(j.warn()) << "Endpoint verification is disabled. This is a "
1541 "security risk and should only be used for "
1542 "testing.";
1543 }
1544 }
1545
1546 {
1547 auto const& section = config.section(Sections::kCrawl);
1548 auto const& values = section.values();
1549
1550 if (values.size() > 1)
1551 {
1552 Throw<std::runtime_error>("Configured [crawl] section is invalid, too many values");
1553 }
1554
1555 bool crawlEnabled = true;
1556
1557 // Only allow "0|1" as a value
1558 if (values.size() == 1)
1559 {
1560 try
1561 {
1562 crawlEnabled = boost::lexical_cast<bool>(values.front());
1563 }
1564 catch (boost::bad_lexical_cast const&)
1565 {
1567 "Configured [crawl] section has invalid value: " + values.front());
1568 }
1569 }
1570
1571 if (crawlEnabled)
1572 {
1573 if (get<bool>(section, Keys::kOverlay, true))
1574 {
1576 }
1577 if (get<bool>(section, Keys::kServer, true))
1578 {
1580 }
1581 if (get<bool>(section, Keys::kCounts, false))
1582 {
1584 }
1585 if (get<bool>(section, Keys::kUnl, true))
1586 {
1588 }
1589 }
1590 }
1591 {
1592 auto const& section = config.section(Sections::kVl);
1593
1594 set(setup.vlEnabled, "enabled", section);
1595 }
1596
1597 try
1598 {
1599 auto id = config.legacy(Sections::kNetworkId);
1600
1601 if (!id.empty())
1602 {
1603 if (id == "main")
1604 id = "0";
1605
1606 if (id == "testnet")
1607 id = "1";
1608
1609 if (id == "devnet")
1610 id = "2";
1611
1613 }
1614 }
1615 catch (...)
1616 {
1618 "Configured [network_id] section is invalid: must be a number "
1619 "or one of the strings 'main', 'testnet' or 'devnet'.");
1620 }
1621
1622 return setup;
1623}
1624
1627 Application& app,
1628 Overlay::Setup const& setup,
1629 ServerHandler& serverHandler,
1630 Resource::Manager& resourceManager,
1631 Resolver& resolver,
1632 boost::asio::io_context& ioContext,
1633 BasicConfig const& config,
1634 beast::insight::Collector::ptr const& collector)
1635{
1637 app, setup, serverHandler, resourceManager, resolver, ioContext, config, collector);
1638}
1639
1640} // namespace xrpl
T begin(T... args)
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:17
A generic endpoint for log messages.
Definition Journal.h:38
Stream debug() const
Definition Journal.h:297
Stream warn() const
Definition Journal.h:309
std::string const & name() const
Returns the name of this source.
void add(Source &source)
Add a child source.
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:16
std::shared_ptr< Collector > ptr
Definition Collector.h:26
Represents a JSON value.
Definition json_value.h:130
Value removeMember(char const *key)
Remove and return the named member.
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Holds unparsed configuration information.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
Section & section(std::string const &name)
Returns the section with the given name.
Child(OverlayImpl &overlay)
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
boost::asio::io_context & ioContext_
Definition OverlayImpl.h:83
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
Resource::Manager & resourceManager_
Definition OverlayImpl.h:93
json::Value getOverlayInfo() const
Returns information about peers on the overlay network.
boost::asio::ip::address address_type
Definition OverlayImpl.h:61
static bool isPeerUpgrade(http_request_type const &request)
Resolver & resolver_
Definition OverlayImpl.h:98
void addActive(std::shared_ptr< PeerImp > const &peer)
boost::system::error_code error_code
Definition OverlayImpl.h:63
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
void broadcast(protocol::TMProposeSet const &m) override
Broadcast a proposal.
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition OverlayImpl.h:84
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remoteAddress)
static std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remoteAddress, std::string const &msg)
std::set< Peer::id_t > relay(protocol::TMProposeSet const &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
void connect(beast::IP::Endpoint const &remoteEndpoint) override
Establish a peer connection to the specified endpoint.
void stop() override
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
static bool isUpgrade(boost::beast::http::header< true, Fields > const &req)
ServerHandler & serverHandler_
Definition OverlayImpl.h:92
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remoteEndpoint) override
Conditionally accept an incoming HTTP request.
reduce_relay::Slots< UptimeClock > slots_
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Definition OverlayImpl.h:97
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
OverlayImpl(Application &app, Setup setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &ioContext, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
void reportInboundTraffic(TrafficCount::Category cat, int bytes)
std::shared_ptr< Message > manifestMessage_
void sendTxQueue() const
Send once a second transactions' hashes aggregated by peers.
std::optional< std::uint32_t > manifestListSeq_
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
PeerFinder::Manager & peerFinder()
Application & app_
Definition OverlayImpl.h:82
std::atomic< Peer::id_t > nextId_
Definition OverlayImpl.h:99
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > peers_
Definition OverlayImpl.h:96
json::Value getServerCounts()
Returns information about the local server's performance counters.
std::recursive_mutex mutex_
Definition OverlayImpl.h:86
Resource::Manager & resourceManager()
beast::Journal const journal_
Definition OverlayImpl.h:91
json::Value json() override
Return diagnostics on the status of all peers.
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:62
void forEach(UnaryFunc &&f) const
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition OverlayImpl.h:85
static std::string makePrefix(std::uint32_t id)
Setup const & setup() const
std::unique_ptr< PeerFinder::Manager > peerFinder_
Definition OverlayImpl.h:94
metrics::TxMetrics txMetrics_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition OverlayImpl.h:89
int limit() override
Returns the maximum number of peers we are configured to allow.
std::condition_variable_any cond_
Definition OverlayImpl.h:87
json::Value getUnlInfo()
Returns information about the local server's UNL.
std::shared_ptr< Message > getManifestsMessage()
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
TrafficCount traffic_
Definition OverlayImpl.h:95
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
json::Value getServerInfo()
Returns information about the local server.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
void reportOutboundTraffic(TrafficCount::Category cat, int bytes)
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
void start() override
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
std::vector< std::shared_ptr< Peer > > PeerSequence
Definition Overlay.h:53
virtual std::pair< std::shared_ptr< Slot >, Result > newOutboundSlot(beast::IP::Endpoint const &remoteEndpoint)=0
Create a new outbound slot with the specified remote endpoint.
std::uint32_t id_t
Uniquely identifies a peer.
A public key.
Definition PublicKey.h:42
Tracks load and resource consumption.
virtual Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by outbound IP address and port.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:58
T contains(T... args)
T data(T... args)
T duration_cast(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T find_if(T... args)
T get(T... args)
T lock(T... args)
T make_shared(T... args)
T make_tuple(T... args)
T make_unique(T... args)
T max(T... args)
bool isPublic(Address const &addr)
Returns true if the address is a public routable address.
Definition IPAddress.h:58
bool isKeepAlive(boost::beast::http::message< IsRequest, Body, Fields > const &m)
Definition rfc2616.h:361
Result splitCommas(FwdIt first, FwdIt last)
Definition rfc2616.h:177
Out lexicalCastThrow(In in)
Convert from one type to another, throw on error.
JSON (JavaScript Object Notation).
Definition json_errors.h:5
@ Array
array value (ordered list)
Definition json_value.h:25
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
@ Null
'null' value
Definition json_value.h:19
STL namespace.
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:82
static constexpr auto kUnl
static constexpr auto kServerCounts
static constexpr auto kDisabled
static constexpr auto kOverlay
static constexpr auto kServerInfo
static constexpr auto kCheckIdlePeers
How often we check for idle peers (seconds).
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
PublicKey verifyHandshake(boost::beast::http::fields const &headers, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address publicIp, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:94
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:10
std::optional< ProtocolVersion > negotiateProtocolVersion(std::vector< ProtocolVersion > const &versions)
Given a list of supported protocol versions, choose the one we prefer.
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
void addValidatorManifest(soci::session &session, std::string const &serialized)
addValidatorManifest Saves the manifest of a validator to the database.
Definition Wallet.cpp:124
std::optional< Manifest > deserializeManifest(Slice s, beast::Journal journal)
Constructs Manifest from serialized string.
std::unique_ptr< Overlay > makeOverlay(Application &app, Overlay::Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &ioContext, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
Creates the implementation of Overlay.
std::shared_ptr< boost::asio::ssl::context > makeSslContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
std::string base64Encode(std::uint8_t const *data, std::size_t len)
constexpr Number squelch(Number const &x, Number const &limit) noexcept
Definition Number.h:854
beast::xor_shift_engine & defaultPrng()
Return the default random engine.
std::shared_ptr< Message > makeSquelchMessage(PublicKey const &validator, bool squelch, uint32_t squelchDuration)
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
Overlay::Setup setupOverlay(BasicConfig const &config, beast::Journal j)
json::Value getCountsJson(Application &app, int minObjectCount)
Definition GetCounts.cpp:46
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:810
BaseUInt< 256 > uint256
Definition base_uint.h:562
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
std::enable_if_t< std::is_same_v< T, char >||std::is_same_v< T, unsigned char >, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
@ Accepted
Manifest is valid.
Definition Manifest.h:197
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T reserve(T... args)
T setfill(T... args)
T setw(T... args)
T size(T... args)
T str(T... args)
static IP::Endpoint fromAsio(boost::asio::ip::address const &address)
static boost::asio::ip::tcp::endpoint toAsioEndpoint(IP::Endpoint const &address)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:18
bool keepAlive
Definition Handoff.h:24
std::shared_ptr< Writer > response
Definition Handoff.h:27
static constexpr auto kUnl
Definition Constants.h:172
static constexpr auto kCounts
Definition Constants.h:102
static constexpr auto kOverlay
Definition Constants.h:136
static constexpr auto kServer
Definition Constants.h:153
std::string serialized
The manifest in serialized form.
Definition Manifest.h:64
uint256 hash() const
Returns hash of serialized manifest data.
void onTimer(error_code ec)
boost::asio::basic_waitable_timer< clock_type > timer
Definition OverlayImpl.h:67
Timer(OverlayImpl &overlay)
std::uint32_t crawlOptions
Definition Overlay.h:47
std::optional< std::uint32_t > networkID
Definition Overlay.h:48
beast::IP::Address publicIp
Definition Overlay.h:45
std::shared_ptr< boost::asio::ssl::context > context
Definition Overlay.h:44
PeerFinder configuration settings.
static Config makeConfig(xrpl::Config const &config, std::uint16_t port, bool validationPublicKey, int ipLimit, bool verifyEndpoints)
Make PeerFinder::Config from configuration parameters.
static constexpr auto kOverlay
Definition Constants.h:34
static constexpr auto kVl
Definition Constants.h:76
static constexpr auto kCrawl
Definition Constants.h:12
static constexpr auto kNetworkId
Definition Constants.h:29
T substr(T... args)
T to_string(T... args)
T what(T... args)