rippled
Loading...
Searching...
No Matches
OverlayImpl.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/misc/HashRouter.h>
21#include <xrpld/app/misc/NetworkOPs.h>
22#include <xrpld/app/misc/ValidatorList.h>
23#include <xrpld/app/misc/ValidatorSite.h>
24#include <xrpld/app/rdb/RelationalDatabase.h>
25#include <xrpld/app/rdb/Wallet.h>
26#include <xrpld/overlay/Cluster.h>
27#include <xrpld/overlay/detail/ConnectAttempt.h>
28#include <xrpld/overlay/detail/PeerImp.h>
29#include <xrpld/overlay/detail/TrafficCount.h>
30#include <xrpld/overlay/detail/Tuning.h>
31#include <xrpld/overlay/predicates.h>
32#include <xrpld/peerfinder/make_Manager.h>
33#include <xrpld/rpc/handlers/GetCounts.h>
34#include <xrpld/rpc/json_body.h>
35
36#include <xrpl/basics/base64.h>
37#include <xrpl/basics/make_SSLContext.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/beast/core/LexicalCast.h>
40#include <xrpl/protocol/STTx.h>
41#include <xrpl/server/SimpleWriter.h>
42
43#include <boost/algorithm/string/predicate.hpp>
44#include <boost/asio/executor_work_guard.hpp>
45
46namespace ripple {
47
48namespace CrawlOptions {
49enum {
51 Overlay = (1 << 0),
52 ServerInfo = (1 << 1),
53 ServerCounts = (1 << 2),
54 Unl = (1 << 3)
55};
56}
57
58//------------------------------------------------------------------------------
59
60OverlayImpl::Child::Child(OverlayImpl& overlay) : overlay_(overlay)
61{
62}
63
65{
66 overlay_.remove(*this);
67}
68
69//------------------------------------------------------------------------------
70
72 : Child(overlay), timer_(overlay_.io_context_)
73{
74}
75
76void
78{
79 // This method is only ever called from the same strand that calls
80 // Timer::on_timer, ensuring they never execute concurrently.
81 stopping_ = true;
82 timer_.cancel();
83}
84
85void
87{
88 timer_.expires_after(std::chrono::seconds(1));
89 timer_.async_wait(boost::asio::bind_executor(
90 overlay_.strand_,
92 &Timer::on_timer, shared_from_this(), std::placeholders::_1)));
93}
94
95void
97{
98 if (ec || stopping_)
99 {
100 if (ec && ec != boost::asio::error::operation_aborted)
101 {
102 JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
103 }
104 return;
105 }
106
107 overlay_.m_peerFinder->once_per_second();
108 overlay_.sendEndpoints();
109 overlay_.autoConnect();
110 if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
111 overlay_.sendTxQueue();
112
113 if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
114 overlay_.deleteIdlePeers();
115
116 async_wait();
117}
118
119//------------------------------------------------------------------------------
120
122 Application& app,
123 Setup const& setup,
124 ServerHandler& serverHandler,
126 Resolver& resolver,
127 boost::asio::io_context& io_context,
128 BasicConfig const& config,
129 beast::insight::Collector::ptr const& collector)
130 : app_(app)
131 , io_context_(io_context)
132 , work_(std::in_place, boost::asio::make_work_guard(io_context_))
133 , strand_(boost::asio::make_strand(io_context_))
134 , setup_(setup)
135 , journal_(app_.journal("Overlay"))
136 , serverHandler_(serverHandler)
138 , m_peerFinder(PeerFinder::make_Manager(
139 io_context,
140 stopwatch(),
141 app_.journal("PeerFinder"),
142 config,
143 collector))
144 , m_resolver(resolver)
145 , next_id_(1)
146 , timer_count_(0)
147 , slots_(app.logs(), *this, app.config())
148 , m_stats(
149 std::bind(&OverlayImpl::collect_metrics, this),
150 collector,
151 [counts = m_traffic.getCounts(), collector]() {
153
154 for (auto const& pair : counts)
155 ret.emplace(
156 pair.first, TrafficGauges(pair.second.name, collector));
157
158 return ret;
159 }())
160{
162}
163
164Handoff
166 std::unique_ptr<stream_type>&& stream_ptr,
167 http_request_type&& request,
168 endpoint_type remote_endpoint)
169{
170 auto const id = next_id_++;
171 beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
172 beast::Journal journal(sink);
173
174 Handoff handoff;
175 if (processRequest(request, handoff))
176 return handoff;
177 if (!isPeerUpgrade(request))
178 return handoff;
179
180 handoff.moved = true;
181
182 JLOG(journal.debug()) << "Peer connection upgrade from " << remote_endpoint;
183
184 error_code ec;
185 auto const local_endpoint(
186 stream_ptr->next_layer().socket().local_endpoint(ec));
187 if (ec)
188 {
189 JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
190 return handoff;
191 }
192
195 if (consumer.disconnect(journal))
196 return handoff;
197
198 auto const [slot, result] = m_peerFinder->new_inbound_slot(
201
202 if (slot == nullptr)
203 {
204 // connection refused either IP limit exceeded or self-connect
205 handoff.moved = false;
206 JLOG(journal.debug())
207 << "Peer " << remote_endpoint << " refused, " << to_string(result);
208 return handoff;
209 }
210
211 // Validate HTTP request
212
213 {
214 auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
215 if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
216 return boost::iequals(s, "peer");
217 }) == types.end())
218 {
219 handoff.moved = false;
220 handoff.response =
221 makeRedirectResponse(slot, request, remote_endpoint.address());
222 handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
223 return handoff;
224 }
225 }
226
227 auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
228 if (!negotiatedVersion)
229 {
230 m_peerFinder->on_closed(slot);
231 handoff.moved = false;
232 handoff.response = makeErrorResponse(
233 slot,
234 request,
235 remote_endpoint.address(),
236 "Unable to agree on a protocol version");
237 handoff.keep_alive = false;
238 return handoff;
239 }
240
241 auto const sharedValue = makeSharedValue(*stream_ptr, journal);
242 if (!sharedValue)
243 {
244 m_peerFinder->on_closed(slot);
245 handoff.moved = false;
246 handoff.response = makeErrorResponse(
247 slot,
248 request,
249 remote_endpoint.address(),
250 "Incorrect security cookie");
251 handoff.keep_alive = false;
252 return handoff;
253 }
254
255 try
256 {
257 auto publicKey = verifyHandshake(
258 request,
259 *sharedValue,
262 remote_endpoint.address(),
263 app_);
264
265 consumer.setPublicKey(publicKey);
266
267 {
268 // The node gets a reserved slot if it is in our cluster
269 // or if it has a reservation.
270 bool const reserved =
271 static_cast<bool>(app_.cluster().member(publicKey)) ||
272 app_.peerReservations().contains(publicKey);
273 auto const result =
274 m_peerFinder->activate(slot, publicKey, reserved);
275 if (result != PeerFinder::Result::success)
276 {
277 m_peerFinder->on_closed(slot);
278 JLOG(journal.debug()) << "Peer " << remote_endpoint
279 << " redirected, " << to_string(result);
280 handoff.moved = false;
282 slot, request, remote_endpoint.address());
283 handoff.keep_alive = false;
284 return handoff;
285 }
286 }
287
288 auto const peer = std::make_shared<PeerImp>(
289 app_,
290 id,
291 slot,
292 std::move(request),
293 publicKey,
294 *negotiatedVersion,
295 consumer,
296 std::move(stream_ptr),
297 *this);
298 {
299 // As we are not on the strand, run() must be called
300 // while holding the lock, otherwise new I/O can be
301 // queued after a call to stop().
302 std::lock_guard<decltype(mutex_)> lock(mutex_);
303 {
304 auto const result = m_peers.emplace(peer->slot(), peer);
305 XRPL_ASSERT(
306 result.second,
307 "ripple::OverlayImpl::onHandoff : peer is inserted");
308 (void)result.second;
309 }
310 list_.emplace(peer.get(), peer);
311
312 peer->run();
313 }
314 handoff.moved = true;
315 return handoff;
316 }
317 catch (std::exception const& e)
318 {
319 JLOG(journal.debug()) << "Peer " << remote_endpoint
320 << " fails handshake (" << e.what() << ")";
321
322 m_peerFinder->on_closed(slot);
323 handoff.moved = false;
324 handoff.response = makeErrorResponse(
325 slot, request, remote_endpoint.address(), e.what());
326 handoff.keep_alive = false;
327 return handoff;
328 }
329}
330
331//------------------------------------------------------------------------------
332
333bool
335{
336 if (!is_upgrade(request))
337 return false;
338 auto const versions = parseProtocolVersions(request["Upgrade"]);
339 return !versions.empty();
340}
341
344{
346 ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
347 return ss.str();
348}
349
353 http_request_type const& request,
354 address_type remote_address)
355{
356 boost::beast::http::response<json_body> msg;
357 msg.version(request.version());
358 msg.result(boost::beast::http::status::service_unavailable);
359 msg.insert("Server", BuildInfo::getFullVersionString());
360 {
362 ostr << remote_address;
363 msg.insert("Remote-Address", ostr.str());
364 }
365 msg.insert("Content-Type", "application/json");
366 msg.insert(boost::beast::http::field::connection, "close");
367 msg.body() = Json::objectValue;
368 {
369 Json::Value& ips = (msg.body()["peer-ips"] = Json::arrayValue);
370 for (auto const& _ : m_peerFinder->redirect(slot))
371 ips.append(_.address.to_string());
372 }
373 msg.prepare_payload();
375}
376
380 http_request_type const& request,
381 address_type remote_address,
382 std::string text)
383{
384 boost::beast::http::response<boost::beast::http::empty_body> msg;
385 msg.version(request.version());
386 msg.result(boost::beast::http::status::bad_request);
387 msg.reason("Bad Request (" + text + ")");
388 msg.insert("Server", BuildInfo::getFullVersionString());
389 msg.insert("Remote-Address", remote_address.to_string());
390 msg.insert(boost::beast::http::field::connection, "close");
391 msg.prepare_payload();
393}
394
395//------------------------------------------------------------------------------
396
397void
399{
400 XRPL_ASSERT(work_, "ripple::OverlayImpl::connect : work is set");
401
402 auto usage = resourceManager().newOutboundEndpoint(remote_endpoint);
403 if (usage.disconnect(journal_))
404 {
405 JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
406 return;
407 }
408
409 auto const [slot, result] = peerFinder().new_outbound_slot(remote_endpoint);
410 if (slot == nullptr)
411 {
412 JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint
413 << ": " << to_string(result);
414 return;
415 }
416
418 app_,
421 usage,
423 next_id_++,
424 slot,
425 app_.journal("Peer"),
426 *this);
427
429 list_.emplace(p.get(), p);
430 p->run();
431}
432
433//------------------------------------------------------------------------------
434
435// Adds a peer that is already handshaked and active
436void
438{
439 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
440 beast::Journal journal{sink};
441
443
444 {
445 auto const result = m_peers.emplace(peer->slot(), peer);
446 XRPL_ASSERT(
447 result.second,
448 "ripple::OverlayImpl::add_active : peer is inserted");
449 (void)result.second;
450 }
451
452 {
453 auto const result = ids_.emplace(
455 std::make_tuple(peer->id()),
456 std::make_tuple(peer));
457 XRPL_ASSERT(
458 result.second,
459 "ripple::OverlayImpl::add_active : peer ID is inserted");
460 (void)result.second;
461 }
462
463 list_.emplace(peer.get(), peer);
464
465 JLOG(journal.debug()) << "activated";
466
467 // As we are not on the strand, run() must be called
468 // while holding the lock, otherwise new I/O can be
469 // queued after a call to stop().
470 peer->run();
471}
472
473void
475{
477 auto const iter = m_peers.find(slot);
478 XRPL_ASSERT(
479 iter != m_peers.end(), "ripple::OverlayImpl::remove : valid input");
480 m_peers.erase(iter);
481}
482
483void
485{
487 app_.config(),
488 serverHandler_.setup().overlay.port(),
489 app_.getValidationPublicKey().has_value(),
491
492 m_peerFinder->setConfig(config);
493 m_peerFinder->start();
494
495 // Populate our boot cache: if there are no entries in [ips] then we use
496 // the entries in [ips_fixed].
497 auto bootstrapIps =
499
500 // If nothing is specified, default to several well-known high-capacity
501 // servers to serve as bootstrap:
502 if (bootstrapIps.empty())
503 {
504 // Pool of servers operated by Ripple Labs Inc. - https://ripple.com
505 bootstrapIps.push_back("r.ripple.com 51235");
506
507 // Pool of servers operated by ISRDC - https://isrdc.in
508 bootstrapIps.push_back("sahyadri.isrdc.in 51235");
509
510 // Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
511 bootstrapIps.push_back("hubs.xrpkuwait.com 51235");
512
513 // Pool of servers operated by XRPL Commons - https://xrpl-commons.org
514 bootstrapIps.push_back("hub.xrpl-commons.org 51235");
515 }
516
518 bootstrapIps,
519 [this](
520 std::string const& name,
521 std::vector<beast::IP::Endpoint> const& addresses) {
523 ips.reserve(addresses.size());
524 for (auto const& addr : addresses)
525 {
526 if (addr.port() == 0)
527 ips.push_back(to_string(addr.at_port(DEFAULT_PEER_PORT)));
528 else
529 ips.push_back(to_string(addr));
530 }
531
532 std::string const base("config: ");
533 if (!ips.empty())
534 m_peerFinder->addFallbackStrings(base + name, ips);
535 });
536
537 // Add the ips_fixed from the rippled.cfg file
539 {
542 [this](
543 std::string const& name,
544 std::vector<beast::IP::Endpoint> const& addresses) {
546 ips.reserve(addresses.size());
547
548 for (auto& addr : addresses)
549 {
550 if (addr.port() == 0)
551 ips.emplace_back(addr.address(), DEFAULT_PEER_PORT);
552 else
553 ips.emplace_back(addr);
554 }
555
556 if (!ips.empty())
557 m_peerFinder->addFixedPeer(name, ips);
558 });
559 }
560 auto const timer = std::make_shared<Timer>(*this);
562 list_.emplace(timer.get(), timer);
563 timer_ = timer;
564 timer->async_wait();
565}
566
567void
569{
570 boost::asio::dispatch(strand_, std::bind(&OverlayImpl::stopChildren, this));
571 {
572 std::unique_lock<decltype(mutex_)> lock(mutex_);
573 cond_.wait(lock, [this] { return list_.empty(); });
574 }
575 m_peerFinder->stop();
576}
577
578//------------------------------------------------------------------------------
579//
580// PropertyStream
581//
582//------------------------------------------------------------------------------
583
584void
586{
587 beast::PropertyStream::Set set("traffic", stream);
588 auto const stats = m_traffic.getCounts();
589 for (auto const& pair : stats)
590 {
592 item["category"] = pair.second.name;
593 item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
594 item["messages_in"] = std::to_string(pair.second.messagesIn.load());
595 item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
596 item["messages_out"] = std::to_string(pair.second.messagesOut.load());
597 }
598}
599
600//------------------------------------------------------------------------------
606void
608{
609 beast::WrappedSink sink{journal_.sink(), peer->prefix()};
610 beast::Journal journal{sink};
611
612 // Now track this peer
613 {
615 auto const result(ids_.emplace(
617 std::make_tuple(peer->id()),
618 std::make_tuple(peer)));
619 XRPL_ASSERT(
620 result.second,
621 "ripple::OverlayImpl::activate : peer ID is inserted");
622 (void)result.second;
623 }
624
625 JLOG(journal.debug()) << "activated";
626
627 // We just accepted this peer so we have non-zero active peers
628 XRPL_ASSERT(size(), "ripple::OverlayImpl::activate : nonzero peers");
629}
630
631void
637
638void
641 std::shared_ptr<PeerImp> const& from)
642{
643 auto const n = m->list_size();
644 auto const& journal = from->pjournal();
645
646 protocol::TMManifests relay;
647
648 for (std::size_t i = 0; i < n; ++i)
649 {
650 auto& s = m->list().Get(i).stobject();
651
652 if (auto mo = deserializeManifest(s))
653 {
654 auto const serialized = mo->serialized;
655
656 auto const result =
657 app_.validatorManifests().applyManifest(std::move(*mo));
658
659 if (result == ManifestDisposition::accepted)
660 {
661 relay.add_list()->set_stobject(s);
662
663 // N.B.: this is important; the applyManifest call above moves
664 // the loaded Manifest out of the optional so we need to
665 // reload it here.
666 mo = deserializeManifest(serialized);
667 XRPL_ASSERT(
668 mo,
669 "ripple::OverlayImpl::onManifests : manifest "
670 "deserialization succeeded");
671
672 app_.getOPs().pubManifest(*mo);
673
674 if (app_.validators().listed(mo->masterKey))
675 {
676 auto db = app_.getWalletDB().checkoutDb();
677 addValidatorManifest(*db, serialized);
678 }
679 }
680 }
681 else
682 {
683 JLOG(journal.debug())
684 << "Malformed manifest #" << i + 1 << ": " << strHex(s);
685 continue;
686 }
687 }
688
689 if (!relay.list().empty())
690 for_each([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
691 std::shared_ptr<PeerImp>&& p) { p->send(m2); });
692}
693
694void
699
700void
711{
713 return ids_.size();
714}
715
716int
718{
719 return m_peerFinder->config().maxPeers;
720}
721
724{
725 using namespace std::chrono;
726 Json::Value jv;
727 auto& av = jv["active"] = Json::Value(Json::arrayValue);
728
730 auto& pv = av.append(Json::Value(Json::objectValue));
731 pv[jss::public_key] = base64_encode(
732 sp->getNodePublic().data(), sp->getNodePublic().size());
733 pv[jss::type] = sp->slot()->inbound() ? "in" : "out";
734 pv[jss::uptime] = static_cast<std::uint32_t>(
735 duration_cast<seconds>(sp->uptime()).count());
736 if (sp->crawl())
737 {
738 pv[jss::ip] = sp->getRemoteAddress().address().to_string();
739 if (sp->slot()->inbound())
740 {
741 if (auto port = sp->slot()->listening_port())
742 pv[jss::port] = *port;
743 }
744 else
745 {
746 pv[jss::port] = std::to_string(sp->getRemoteAddress().port());
747 }
748 }
749
750 {
751 auto version{sp->getVersion()};
752 if (!version.empty())
753 // Could move here if Json::value supported moving from strings
754 pv[jss::version] = std::string{version};
755 }
756
757 std::uint32_t minSeq, maxSeq;
758 sp->ledgerRange(minSeq, maxSeq);
759 if (minSeq != 0 || maxSeq != 0)
760 pv[jss::complete_ledgers] =
761 std::to_string(minSeq) + "-" + std::to_string(maxSeq);
762 });
763
764 return jv;
765}
766
769{
770 bool const humanReadable = false;
771 bool const admin = false;
772 bool const counters = false;
773
774 Json::Value server_info =
775 app_.getOPs().getServerInfo(humanReadable, admin, counters);
776
777 // Filter out some information
778 server_info.removeMember(jss::hostid);
779 server_info.removeMember(jss::load_factor_fee_escalation);
780 server_info.removeMember(jss::load_factor_fee_queue);
781 server_info.removeMember(jss::validation_quorum);
782
783 if (server_info.isMember(jss::validated_ledger))
784 {
785 Json::Value& validated_ledger = server_info[jss::validated_ledger];
786
787 validated_ledger.removeMember(jss::base_fee);
788 validated_ledger.removeMember(jss::reserve_base_xrp);
789 validated_ledger.removeMember(jss::reserve_inc_xrp);
790 }
791
792 return server_info;
793}
794
800
803{
804 Json::Value validators = app_.validators().getJson();
805
806 if (validators.isMember(jss::publisher_lists))
807 {
808 Json::Value& publisher_lists = validators[jss::publisher_lists];
809
810 for (auto& publisher : publisher_lists)
811 {
812 publisher.removeMember(jss::list);
813 }
814 }
815
816 validators.removeMember(jss::signing_keys);
817 validators.removeMember(jss::trusted_validator_keys);
818 validators.removeMember(jss::validation_quorum);
819
820 Json::Value validatorSites = app_.validatorSites().getJson();
821
822 if (validatorSites.isMember(jss::validator_sites))
823 {
824 validators[jss::validator_sites] =
825 std::move(validatorSites[jss::validator_sites]);
826 }
827
828 return validators;
829}
830
831// Returns information on verified peers.
834{
836 for (auto const& peer : getActivePeers())
837 {
838 json.append(peer->json());
839 }
840 return json;
841}
842
843bool
845{
846 if (req.target() != "/crawl" ||
848 return false;
849
850 boost::beast::http::response<json_body> msg;
851 msg.version(req.version());
852 msg.result(boost::beast::http::status::ok);
853 msg.insert("Server", BuildInfo::getFullVersionString());
854 msg.insert("Content-Type", "application/json");
855 msg.insert("Connection", "close");
856 msg.body()["version"] = Json::Value(2u);
857
859 {
860 msg.body()["overlay"] = getOverlayInfo();
861 }
863 {
864 msg.body()["server"] = getServerInfo();
865 }
867 {
868 msg.body()["counts"] = getServerCounts();
869 }
871 {
872 msg.body()["unl"] = getUnlInfo();
873 }
874
875 msg.prepare_payload();
877 return true;
878}
879
880bool
882 http_request_type const& req,
883 Handoff& handoff)
884{
885 // If the target is in the form "/vl/<validator_list_public_key>",
886 // return the most recent validator list for that key.
887 constexpr std::string_view prefix("/vl/");
888
889 if (!req.target().starts_with(prefix.data()) || !setup_.vlEnabled)
890 return false;
891
892 std::uint32_t version = 1;
893
894 boost::beast::http::response<json_body> msg;
895 msg.version(req.version());
896 msg.insert("Server", BuildInfo::getFullVersionString());
897 msg.insert("Content-Type", "application/json");
898 msg.insert("Connection", "close");
899
900 auto fail = [&msg, &handoff](auto status) {
901 msg.result(status);
902 msg.insert("Content-Length", "0");
903
904 msg.body() = Json::nullValue;
905
906 msg.prepare_payload();
908 return true;
909 };
910
911 std::string_view key = req.target().substr(prefix.size());
912
913 if (auto slash = key.find('/'); slash != std::string_view::npos)
914 {
915 auto verString = key.substr(0, slash);
916 if (!boost::conversion::try_lexical_convert(verString, version))
917 return fail(boost::beast::http::status::bad_request);
918 key = key.substr(slash + 1);
919 }
920
921 if (key.empty())
922 return fail(boost::beast::http::status::bad_request);
923
924 // find the list
925 auto vl = app_.validators().getAvailable(key, version);
926
927 if (!vl)
928 {
929 // 404 not found
930 return fail(boost::beast::http::status::not_found);
931 }
932 else if (!*vl)
933 {
934 return fail(boost::beast::http::status::bad_request);
935 }
936 else
937 {
938 msg.result(boost::beast::http::status::ok);
939
940 msg.body() = *vl;
941
942 msg.prepare_payload();
944 return true;
945 }
946}
947
948bool
950{
951 if (req.target() != "/health")
952 return false;
953 boost::beast::http::response<json_body> msg;
954 msg.version(req.version());
955 msg.insert("Server", BuildInfo::getFullVersionString());
956 msg.insert("Content-Type", "application/json");
957 msg.insert("Connection", "close");
958
959 auto info = getServerInfo();
960
961 int last_validated_ledger_age = -1;
962 if (info.isMember(jss::validated_ledger))
963 last_validated_ledger_age =
964 info[jss::validated_ledger][jss::age].asInt();
965 bool amendment_blocked = false;
966 if (info.isMember(jss::amendment_blocked))
967 amendment_blocked = true;
968 int number_peers = info[jss::peers].asInt();
969 std::string server_state = info[jss::server_state].asString();
970 auto load_factor = info[jss::load_factor_server].asDouble() /
971 info[jss::load_base].asDouble();
972
973 enum { healthy, warning, critical };
974 int health = healthy;
975 auto set_health = [&health](int state) {
976 if (health < state)
977 health = state;
978 };
979
980 msg.body()[jss::info] = Json::objectValue;
981 if (last_validated_ledger_age >= 7 || last_validated_ledger_age < 0)
982 {
983 msg.body()[jss::info][jss::validated_ledger] =
984 last_validated_ledger_age;
985 if (last_validated_ledger_age < 20)
986 set_health(warning);
987 else
988 set_health(critical);
989 }
990
991 if (amendment_blocked)
992 {
993 msg.body()[jss::info][jss::amendment_blocked] = true;
994 set_health(critical);
995 }
996
997 if (number_peers <= 7)
998 {
999 msg.body()[jss::info][jss::peers] = number_peers;
1000 if (number_peers != 0)
1001 set_health(warning);
1002 else
1003 set_health(critical);
1004 }
1005
1006 if (!(server_state == "full" || server_state == "validating" ||
1007 server_state == "proposing"))
1008 {
1009 msg.body()[jss::info][jss::server_state] = server_state;
1010 if (server_state == "syncing" || server_state == "tracking" ||
1011 server_state == "connected")
1012 {
1013 set_health(warning);
1014 }
1015 else
1016 set_health(critical);
1017 }
1018
1019 if (load_factor > 100)
1020 {
1021 msg.body()[jss::info][jss::load_factor] = load_factor;
1022 if (load_factor < 1000)
1023 set_health(warning);
1024 else
1025 set_health(critical);
1026 }
1027
1028 switch (health)
1029 {
1030 case healthy:
1031 msg.result(boost::beast::http::status::ok);
1032 break;
1033 case warning:
1034 msg.result(boost::beast::http::status::service_unavailable);
1035 break;
1036 case critical:
1037 msg.result(boost::beast::http::status::internal_server_error);
1038 break;
1039 }
1040
1041 msg.prepare_payload();
1043 return true;
1044}
1045
1046bool
1048{
1049 // Take advantage of || short-circuiting
1050 return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
1051 processHealth(req, handoff);
1052}
1053
1056{
1058 ret.reserve(size());
1059
1060 for_each([&ret](std::shared_ptr<PeerImp>&& sp) {
1061 ret.emplace_back(std::move(sp));
1062 });
1063
1064 return ret;
1065}
1066
1069 std::set<Peer::id_t> const& toSkip,
1070 std::size_t& active,
1071 std::size_t& disabled,
1072 std::size_t& enabledInSkip) const
1073{
1075 std::lock_guard lock(mutex_);
1076
1077 active = ids_.size();
1078 disabled = enabledInSkip = 0;
1079 ret.reserve(ids_.size());
1080
1081 // NOTE The purpose of p is to delay the destruction of PeerImp
1083 for (auto& [id, w] : ids_)
1084 {
1085 if (p = w.lock(); p != nullptr)
1086 {
1087 bool const reduceRelayEnabled = p->txReduceRelayEnabled();
1088 // tx reduced relay feature disabled
1089 if (!reduceRelayEnabled)
1090 ++disabled;
1091
1092 if (toSkip.count(id) == 0)
1093 ret.emplace_back(std::move(p));
1094 else if (reduceRelayEnabled)
1095 ++enabledInSkip;
1096 }
1097 }
1098
1099 return ret;
1100}
1101
1102void
1104{
1105 for_each(
1106 [index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
1107}
1108
1111{
1112 std::lock_guard lock(mutex_);
1113 auto const iter = ids_.find(id);
1114 if (iter != ids_.end())
1115 return iter->second.lock();
1116 return {};
1117}
1118
1119// A public key hash map was not used due to the peer connect/disconnect
1120// update overhead outweighing the performance of a small set linear search.
1123{
1124 std::lock_guard lock(mutex_);
1125 // NOTE The purpose of peer is to delay the destruction of PeerImp
1127 for (auto const& e : ids_)
1128 {
1129 if (peer = e.second.lock(); peer != nullptr)
1130 {
1131 if (peer->getNodePublic() == pubKey)
1132 return peer;
1133 }
1134 }
1135 return {};
1136}
1137
1138void
1139OverlayImpl::broadcast(protocol::TMProposeSet& m)
1140{
1141 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
1142 for_each([&](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1143}
1144
1147 protocol::TMProposeSet& m,
1148 uint256 const& uid,
1149 PublicKey const& validator)
1150{
1151 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1152 {
1153 auto const sm =
1154 std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
1156 if (toSkip->find(p->id()) == toSkip->end())
1157 p->send(sm);
1158 });
1159 return *toSkip;
1160 }
1161 return {};
1162}
1163
1164void
1165OverlayImpl::broadcast(protocol::TMValidation& m)
1166{
1167 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
1168 for_each([sm](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1169}
1170
1173 protocol::TMValidation& m,
1174 uint256 const& uid,
1175 PublicKey const& validator)
1176{
1177 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1178 {
1179 auto const sm =
1180 std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
1182 if (toSkip->find(p->id()) == toSkip->end())
1183 p->send(sm);
1184 });
1185 return *toSkip;
1186 }
1187 return {};
1188}
1189
1192{
1194
1195 if (auto seq = app_.validatorManifests().sequence();
1196 seq != manifestListSeq_)
1197 {
1198 protocol::TMManifests tm;
1199
1201 [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
1202 [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
1203 tm.add_list()->set_stobject(
1204 manifest.serialized.data(), manifest.serialized.size());
1205 hr.addSuppression(manifest.hash());
1206 });
1207
1209
1210 if (tm.list_size() != 0)
1212 std::make_shared<Message>(tm, protocol::mtMANIFESTS);
1213
1214 manifestListSeq_ = seq;
1215 }
1216
1217 return manifestMessage_;
1218}
1219
1220void
1222 uint256 const& hash,
1224 std::set<Peer::id_t> const& toSkip)
1225{
1226 bool relay = tx.has_value();
1227 if (relay)
1228 {
1229 auto& txn = tx->get();
1230 SerialIter sit(makeSlice(txn.rawtransaction()));
1231 try
1232 {
1233 relay = !isPseudoTx(STTx{sit});
1234 }
1235 catch (std::exception const&)
1236 {
1237 // Could not construct STTx, not relaying
1238 JLOG(journal_.debug()) << "Could not construct STTx: " << hash;
1239 return;
1240 }
1241 }
1242
1243 Overlay::PeerSequence peers = {};
1244 std::size_t total = 0;
1245 std::size_t disabled = 0;
1246 std::size_t enabledInSkip = 0;
1247
1248 if (!relay)
1249 {
1251 return;
1252
1253 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1254 JLOG(journal_.trace())
1255 << "not relaying tx, total peers " << peers.size();
1256 for (auto const& p : peers)
1257 p->addTxQueue(hash);
1258 return;
1259 }
1260
1261 auto& txn = tx->get();
1262 auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
1263 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1264 auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
1265
1266 if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
1267 {
1268 for (auto const& p : peers)
1269 p->send(sm);
1272 txMetrics_.addMetrics(total, toSkip.size(), 0);
1273 return;
1274 }
1275
1276 // We have more peers than the minimum (disabled + minimum enabled),
1277 // relay to all disabled and some randomly selected enabled that
1278 // do not have the transaction.
1279 auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
1280 (total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;
1281
1282 txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
1283
1284 if (enabledTarget > enabledInSkip)
1285 std::shuffle(peers.begin(), peers.end(), default_prng());
1286
1287 JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size()
1288 << " selected " << enabledTarget << " skip "
1289 << toSkip.size() << " disabled " << disabled;
1290
1291 // count skipped peers with the enabled feature towards the quota
1292 std::uint16_t enabledAndRelayed = enabledInSkip;
1293 for (auto const& p : peers)
1294 {
1295 // always relay to a peer with the disabled feature
1296 if (!p->txReduceRelayEnabled())
1297 {
1298 p->send(sm);
1299 }
1300 else if (enabledAndRelayed < enabledTarget)
1301 {
1302 enabledAndRelayed++;
1303 p->send(sm);
1304 }
1305 else
1306 {
1307 p->addTxQueue(hash);
1308 }
1309 }
1310}
1311
1312//------------------------------------------------------------------------------
1313
1314void
1316{
1317 std::lock_guard lock(mutex_);
1318 list_.erase(&child);
1319 if (list_.empty())
1320 cond_.notify_all();
1321}
1322
1323void
1325{
1326 // Calling list_[].second->stop() may cause list_ to be modified
1327 // (OverlayImpl::remove() may be called on this same thread). So
1328 // iterating directly over list_ to call child->stop() could lead to
1329 // undefined behavior.
1330 //
1331 // Therefore we copy all of the weak/shared ptrs out of list_ before we
1332 // start calling stop() on them. That guarantees OverlayImpl::remove()
1333 // won't be called until vector<> children leaves scope.
1335 {
1336 std::lock_guard lock(mutex_);
1337 if (!work_)
1338 return;
1340
1341 children.reserve(list_.size());
1342 for (auto const& element : list_)
1343 {
1344 children.emplace_back(element.second.lock());
1345 }
1346 } // lock released
1347
1348 for (auto const& child : children)
1349 {
1350 if (child != nullptr)
1351 child->stop();
1352 }
1353}
1354
1355void
1357{
1358 auto const result = m_peerFinder->autoconnect();
1359 for (auto addr : result)
1360 connect(addr);
1361}
1362
1363void
1365{
1366 auto const result = m_peerFinder->buildEndpointsForPeers();
1367 for (auto const& e : result)
1368 {
1370 {
1371 std::lock_guard lock(mutex_);
1372 auto const iter = m_peers.find(e.first);
1373 if (iter != m_peers.end())
1374 peer = iter->second.lock();
1375 }
1376 if (peer)
1377 peer->sendEndpoints(e.second.begin(), e.second.end());
1378 }
1379}
1380
1381void
1383{
1384 for_each([](auto const& p) {
1385 if (p->txReduceRelayEnabled())
1386 p->sendTxQueue();
1387 });
1388}
1389
1392 PublicKey const& validator,
1393 bool squelch,
1394 uint32_t squelchDuration)
1395{
1396 protocol::TMSquelch m;
1397 m.set_squelch(squelch);
1398 m.set_validatorpubkey(validator.data(), validator.size());
1399 if (squelch)
1400 m.set_squelchduration(squelchDuration);
1401 return std::make_shared<Message>(m, protocol::mtSQUELCH);
1402}
1403
1404void
1406{
1407 if (auto peer = findPeerByShortID(id); peer)
1408 {
1409 // optimize - multiple message with different
1410 // validator might be sent to the same peer
1411 peer->send(makeSquelchMessage(validator, false, 0));
1412 }
1413}
1414
1415void
1417 PublicKey const& validator,
1418 Peer::id_t id,
1419 uint32_t squelchDuration) const
1420{
1421 if (auto peer = findPeerByShortID(id); peer)
1422 {
1423 peer->send(makeSquelchMessage(validator, true, squelchDuration));
1424 }
1425}
1426
1427void
1429 uint256 const& key,
1430 PublicKey const& validator,
1431 std::set<Peer::id_t>&& peers,
1432 protocol::MessageType type)
1433{
1434 if (!slots_.baseSquelchReady())
1435 return;
1436
1437 if (!strand_.running_in_this_thread())
1438 return post(
1439 strand_,
1440 // Must capture copies of reference parameters (i.e. key, validator)
1441 [this,
1442 key = key,
1443 validator = validator,
1444 peers = std::move(peers),
1445 type]() mutable {
1446 updateSlotAndSquelch(key, validator, std::move(peers), type);
1447 });
1448
1449 for (auto id : peers)
1450 slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
1452 });
1453}
1454
1455void
1457 uint256 const& key,
1458 PublicKey const& validator,
1459 Peer::id_t peer,
1460 protocol::MessageType type)
1461{
1462 if (!slots_.baseSquelchReady())
1463 return;
1464
1465 if (!strand_.running_in_this_thread())
1466 return post(
1467 strand_,
1468 // Must capture copies of reference parameters (i.e. key, validator)
1469 [this, key = key, validator = validator, peer, type]() {
1470 updateSlotAndSquelch(key, validator, peer, type);
1471 });
1472
1473 slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
1475 });
1476}
1477
1478void
1480{
1481 if (!strand_.running_in_this_thread())
1482 return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
1483
1484 slots_.deletePeer(id, true);
1485}
1486
1487void
1489{
1490 if (!strand_.running_in_this_thread())
1491 return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this));
1492
1493 slots_.deleteIdlePeers();
1494}
1495
1496//------------------------------------------------------------------------------
1497
1500{
1501 Overlay::Setup setup;
1502
1503 {
1504 auto const& section = config.section("overlay");
1505 setup.context = make_SSLContext("");
1506
1507 set(setup.ipLimit, "ip_limit", section);
1508 if (setup.ipLimit < 0)
1509 Throw<std::runtime_error>("Configured IP limit is invalid");
1510
1511 std::string ip;
1512 set(ip, "public_ip", section);
1513 if (!ip.empty())
1514 {
1515 boost::system::error_code ec;
1516 setup.public_ip = boost::asio::ip::make_address(ip, ec);
1517 if (ec || beast::IP::is_private(setup.public_ip))
1518 Throw<std::runtime_error>("Configured public IP is invalid");
1519 }
1520 }
1521
1522 {
1523 auto const& section = config.section("crawl");
1524 auto const& values = section.values();
1525
1526 if (values.size() > 1)
1527 {
1528 Throw<std::runtime_error>(
1529 "Configured [crawl] section is invalid, too many values");
1530 }
1531
1532 bool crawlEnabled = true;
1533
1534 // Only allow "0|1" as a value
1535 if (values.size() == 1)
1536 {
1537 try
1538 {
1539 crawlEnabled = boost::lexical_cast<bool>(values.front());
1540 }
1541 catch (boost::bad_lexical_cast const&)
1542 {
1543 Throw<std::runtime_error>(
1544 "Configured [crawl] section has invalid value: " +
1545 values.front());
1546 }
1547 }
1548
1549 if (crawlEnabled)
1550 {
1551 if (get<bool>(section, "overlay", true))
1552 {
1554 }
1555 if (get<bool>(section, "server", true))
1556 {
1558 }
1559 if (get<bool>(section, "counts", false))
1560 {
1562 }
1563 if (get<bool>(section, "unl", true))
1564 {
1566 }
1567 }
1568 }
1569 {
1570 auto const& section = config.section("vl");
1571
1572 set(setup.vlEnabled, "enabled", section);
1573 }
1574
1575 try
1576 {
1577 auto id = config.legacy("network_id");
1578
1579 if (!id.empty())
1580 {
1581 if (id == "main")
1582 id = "0";
1583
1584 if (id == "testnet")
1585 id = "1";
1586
1587 if (id == "devnet")
1588 id = "2";
1589
1590 setup.networkID = beast::lexicalCastThrow<std::uint32_t>(id);
1591 }
1592 }
1593 catch (...)
1594 {
1595 Throw<std::runtime_error>(
1596 "Configured [network_id] section is invalid: must be a number "
1597 "or one of the strings 'main', 'testnet' or 'devnet'.");
1598 }
1599
1600 return setup;
1601}
1602
1605 Application& app,
1606 Overlay::Setup const& setup,
1607 ServerHandler& serverHandler,
1608 Resource::Manager& resourceManager,
1609 Resolver& resolver,
1610 boost::asio::io_context& io_context,
1611 BasicConfig const& config,
1612 beast::insight::Collector::ptr const& collector)
1613{
1615 app,
1616 setup,
1617 serverHandler,
1618 resourceManager,
1619 resolver,
1620 io_context,
1621 config,
1622 collector);
1623}
1624
1625} // namespace ripple
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:149
Value & append(Value const &value)
Append value to array at the end.
Value removeMember(char const *key)
Remove and return the named member.
bool isMember(char const *key) const
Return true if the object has a member named key.
A version-independent IP address and port combination.
Definition IPEndpoint.h:38
A generic endpoint for log messages.
Definition Journal.h:60
Stream debug() const
Definition Journal.h:328
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:297
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
std::string const & name() const
Returns the name of this source.
void add(Source &source)
Add a child source.
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:34
virtual Config & config()=0
virtual beast::Journal journal(std::string const &name)=0
virtual ValidatorSite & validatorSites()=0
virtual DatabaseCon & getWalletDB()=0
Retrieve the "wallet database".
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual ManifestCache & validatorManifests()=0
virtual PeerReservationTable & peerReservations()=0
virtual Cluster & cluster()=0
virtual Logs & logs()=0
virtual HashRouter & getHashRouter()=0
Holds unparsed configuration information.
Section & section(std::string const &name)
Returns the section with the given name.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:38
std::vector< std::string > IPS_FIXED
Definition Config.h:144
std::vector< std::string > IPS
Definition Config.h:141
bool standalone() const
Definition Config.h:336
std::size_t TX_REDUCE_RELAY_MIN_PEERS
Definition Config.h:268
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:258
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:265
std::size_t TX_RELAY_PERCENTAGE
Definition Config.h:271
LockedSociSession checkoutDb()
std::optional< std::set< PeerShortID > > shouldRelay(uint256 const &key)
Determines whether the hashed item should be relayed.
virtual void pubManifest(Manifest const &)=0
std::uint32_t sequence() const
A monotonically increasing number used to detect new manifests.
Definition Manifest.h:278
void for_each_manifest(Function &&f) const
Invokes the callback once for every populated manifest.
Definition Manifest.h:425
ManifestDisposition applyManifest(Manifest m)
Add manifest to cache.
Definition Manifest.cpp:383
virtual Json::Value getServerInfo(bool human, bool admin, bool counters)=0
Child(OverlayImpl &overlay)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
boost::system::error_code error_code
Definition OverlayImpl.h:84
Json::Value getUnlInfo()
Returns information about the local server's UNL.
void stop() override
static std::string makePrefix(std::uint32_t id)
PeerFinder::Manager & peerFinder()
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:83
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
boost::asio::ip::address address_type
Definition OverlayImpl.h:82
boost::asio::io_context & io_context_
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
std::condition_variable_any cond_
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
Resolver & m_resolver
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void start() override
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
void add_active(std::shared_ptr< PeerImp > const &peer)
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
Resource::Manager & m_resourceManager
std::shared_ptr< Message > manifestMessage_
std::optional< std::uint32_t > manifestListSeq_
TrafficCount m_traffic
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
std::atomic< Peer::id_t > next_id_
Application & app_
std::weak_ptr< Timer > timer_
metrics::TxMetrics txMetrics_
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
std::recursive_mutex mutex_
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
OverlayImpl(Application &app, Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
void sendTxQueue()
Send once a second transactions' hashes aggregated by peers.
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
boost::asio::strand< boost::asio::io_context::executor_type > strand_
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
void for_each(UnaryFunc &&f) const
Json::Value getOverlayInfo()
Returns information about peers on the overlay network.
Resource::Manager & resourceManager()
static bool isPeerUpgrade(http_request_type const &request)
Json::Value getServerCounts()
Returns information about the local server's performance counters.
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::unique_ptr< PeerFinder::Manager > m_peerFinder
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Json::Value getServerInfo()
Returns information about the local server.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
Json::Value json() override
Return diagnostics on the status of all peers.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
ServerHandler & serverHandler_
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
int limit() override
Returns the maximum number of peers we are configured to allow.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
beast::Journal const journal_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Manages the set of connected peers.
Definition Overlay.h:49
virtual std::pair< std::shared_ptr< Slot >, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)=0
Create a new outbound slot with the specified remote endpoint.
bool contains(PublicKey const &nodeId)
A public key.
Definition PublicKey.h:62
void resolve(std::vector< std::string > const &names, Handler handler)
resolve all hostnames on the list
Definition Resolver.h:57
Tracks load and resource consumption.
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
virtual Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by outbound IP address and port.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:79
void setup(Setup const &setup, beast::Journal journal)
auto const & getCounts() const
An up-to-date copy of all the counters.
void addCount(category cat, bool inbound, int bytes)
Account for traffic associated with the given category.
bool listed(PublicKey const &identity) const
Returns true if public key is included on any lists.
std::optional< Json::Value > getAvailable(std::string_view pubKey, std::optional< std::uint32_t > forceVersion={})
Returns the current valid list for the given publisher key, if available, as a Json object.
Json::Value getJson() const
Return a JSON representation of the state of the validator list.
Json::Value getJson() const
Return JSON representation of configured validator sites.
T count(T... args)
T data(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T find_if(T... args)
T get(T... args)
T is_same_v
T make_tuple(T... args)
@ nullValue
'null' value
Definition json_value.h:38
@ arrayValue
array value (ordered list)
Definition json_value.h:44
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
bool is_private(Address const &addr)
Returns true if the address is a private unroutable address.
Definition IPAddress.h:71
Result split_commas(FwdIt first, FwdIt last)
Definition rfc2616.h:199
bool is_keep_alive(boost::beast::http::message< isRequest, Body, Fields > const &m)
Definition rfc2616.h:386
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:81
@ checkIdlePeers
How often we check for idle peers (seconds)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::optional< ProtocolVersion > negotiateProtocolVersion(std::vector< ProtocolVersion > const &versions)
Given a list of supported protocol versions, choose the one we prefer.
std::optional< Manifest > deserializeManifest(Slice s, beast::Journal journal)
Constructs Manifest from serialized string.
Definition Manifest.cpp:54
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:870
void addValidatorManifest(soci::session &session, std::string const &serialized)
addValidatorManifest Saves the manifest of a validator to the database.
Definition Wallet.cpp:119
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
std::shared_ptr< Message > makeSquelchMessage(PublicKey const &validator, bool squelch, uint32_t squelchDuration)
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:30
@ accepted
Manifest is valid.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:244
std::string base64_encode(std::uint8_t const *data, std::size_t len)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:119
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:33
Json::Value getCountsJson(Application &app, int minObjectCount)
Definition GetCounts.cpp:62
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
PublicKey verifyHandshake(boost::beast::http::fields const &headers, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
std::unique_ptr< Overlay > make_Overlay(Application &app, Overlay::Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
Creates the implementation of Overlay.
@ manifest
Manifest.
Overlay::Setup setup_Overlay(BasicConfig const &config)
constexpr Number squelch(Number const &x, Number const &limit) noexcept
Definition Number.h:381
beast::xor_shift_engine & default_prng()
Return the default random engine.
STL namespace.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T reserve(T... args)
T reset(T... args)
T setfill(T... args)
T setw(T... args)
T size(T... args)
T str(T... args)
static boost::asio::ip::tcp::endpoint to_asio_endpoint(IP::Endpoint const &address)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:40
bool keep_alive
Definition Handoff.h:46
std::shared_ptr< Writer > response
Definition Handoff.h:49
void on_timer(error_code ec)
Timer(OverlayImpl &overlay)
beast::IP::Address public_ip
Definition Overlay.h:69
std::uint32_t crawlOptions
Definition Overlay.h:71
std::shared_ptr< boost::asio::ssl::context > context
Definition Overlay.h:68
std::optional< std::uint32_t > networkID
Definition Overlay.h:72
PeerFinder configuration settings.
static Config makeConfig(ripple::Config const &config, std::uint16_t port, bool validationPublicKey, int ipLimit)
Make PeerFinder::Config from configuration parameters.
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition TxMetrics.cpp:31
T substr(T... args)
T to_string(T... args)
T what(T... args)