rippled
Loading...
Searching...
No Matches
PeerImp.cpp
1#include <xrpld/app/consensus/RCLValidations.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionMaster.h>
6#include <xrpld/app/misc/Transaction.h>
7#include <xrpld/app/misc/ValidatorList.h>
8#include <xrpld/overlay/Cluster.h>
9#include <xrpld/overlay/detail/PeerImp.h>
10#include <xrpld/overlay/detail/Tuning.h>
11
12#include <xrpl/basics/UptimeClock.h>
13#include <xrpl/basics/base64.h>
14#include <xrpl/basics/random.h>
15#include <xrpl/basics/safe_cast.h>
16#include <xrpl/core/HashRouter.h>
17#include <xrpl/core/PerfLog.h>
18#include <xrpl/protocol/TxFlags.h>
19#include <xrpl/protocol/digest.h>
20#include <xrpl/server/LoadFeeTrack.h>
21#include <xrpl/server/NetworkOPs.h>
22#include <xrpl/tx/apply.h>
23
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/beast/core/ostream.hpp>
26
27#include <algorithm>
28#include <chrono>
29#include <memory>
30#include <mutex>
31#include <numeric>
32#include <sstream>
33
34using namespace std::chrono_literals;
35
36namespace xrpl {
37
38namespace {
40std::chrono::milliseconds constexpr peerHighLatency{300};
41
43std::chrono::seconds constexpr peerTimerInterval{60};
44
46std::chrono::seconds constexpr shutdownTimerInterval{5};
47
48} // namespace
49
50// TODO: Remove this exclusion once unit tests are added after the hotfix
51// release.
52
54 Application& app,
55 id_t id,
57 http_request_type&& request,
58 PublicKey const& publicKey,
60 Resource::Consumer consumer,
62 OverlayImpl& overlay)
63 : Child(overlay)
64 , app_(app)
65 , id_(id)
66 , fingerprint_(getFingerprint(slot->remote_endpoint(), publicKey, to_string(id)))
67 , prefix_(makePrefix(fingerprint_))
68 , sink_(app_.getJournal("Peer"), prefix_)
69 , p_sink_(app_.getJournal("Protocol"), prefix_)
70 , journal_(sink_)
71 , p_journal_(p_sink_)
72 , stream_ptr_(std::move(stream_ptr))
73 , socket_(stream_ptr_->next_layer().socket())
74 , stream_(*stream_ptr_)
75 , strand_(boost::asio::make_strand(socket_.get_executor()))
76 , timer_(waitable_timer{socket_.get_executor()})
77 , remote_address_(slot->remote_endpoint())
78 , overlay_(overlay)
79 , inbound_(true)
80 , protocol_(protocol)
81 , tracking_(Tracking::unknown)
82 , trackingTime_(clock_type::now())
83 , publicKey_(publicKey)
84 , lastPingTime_(clock_type::now())
85 , creationTime_(clock_type::now())
86 , squelch_(app_.getJournal("Squelch"))
87 , usage_(consumer)
88 , fee_{Resource::feeTrivialPeer, ""}
89 , slot_(slot)
90 , request_(std::move(request))
91 , headers_(request_)
92 , compressionEnabled_(
93 peerFeatureEnabled(headers_, FEATURE_COMPR, "lz4", app_.config().COMPRESSION)
94 ? Compressed::On
95 : Compressed::Off)
96 , txReduceRelayEnabled_(
97 peerFeatureEnabled(headers_, FEATURE_TXRR, app_.config().TX_REDUCE_RELAY_ENABLE))
98 , ledgerReplayEnabled_(
99 peerFeatureEnabled(headers_, FEATURE_LEDGER_REPLAY, app_.config().LEDGER_REPLAY))
100 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
101{
102 JLOG(journal_.info()) << "compression enabled " << (compressionEnabled_ == Compressed::On)
103 << " vp reduce-relay base squelch enabled "
105 headers_,
108 << " tx reduce-relay enabled " << txReduceRelayEnabled_;
109}
110
112{
113 bool const inCluster{cluster()};
114
119
120 if (inCluster)
121 {
122 JLOG(journal_.warn()) << name() << " left cluster";
123 }
124}
125
126// Helper function to check for valid uint256 values in protobuf buffers
127static bool
129{
130 return pBuffStr.size() == uint256::size();
131}
132
133void
135{
136 if (!strand_.running_in_this_thread())
137 {
139 return;
140 }
141
142 auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
143 if (uint256 ret; ret.parseHex(value))
144 return ret;
145
146 if (auto const s = base64_decode(value); s.size() == uint256::size())
147 return uint256{s};
148
149 return std::nullopt;
150 };
151
153 std::optional<uint256> previous;
154
155 if (auto const iter = headers_.find("Closed-Ledger"); iter != headers_.end())
156 {
157 closed = parseLedgerHash(iter->value());
158
159 if (!closed)
160 fail("Malformed handshake data (1)");
161 }
162
163 if (auto const iter = headers_.find("Previous-Ledger"); iter != headers_.end())
164 {
165 previous = parseLedgerHash(iter->value());
166
167 if (!previous)
168 fail("Malformed handshake data (2)");
169 }
170
171 if (previous && !closed)
172 fail("Malformed handshake data (3)");
173
174 {
176 if (closed)
177 closedLedgerHash_ = *closed;
178 if (previous)
179 previousLedgerHash_ = *previous;
180 }
181
182 if (inbound_)
183 {
184 doAccept();
185 }
186 else
187 {
189 }
190
191 // Anything else that needs to be done with the connection should be
192 // done in doProtocolStart
193}
194
195void
197{
198 if (!strand_.running_in_this_thread())
199 {
201 return;
202 }
203
204 if (!socket_.is_open())
205 return;
206
207 // The rationale for using different severity levels is that
208 // outbound connections are under our control and may be logged
209 // at a higher level, but inbound connections are more numerous and
210 // uncontrolled so to prevent log flooding the severity is reduced.
211 JLOG(journal_.debug()) << "stop: Stop";
212
213 shutdown();
214}
215
216//------------------------------------------------------------------------------
217
218void
220{
221 if (!strand_.running_in_this_thread())
222 {
224 return;
225 }
226
227 if (!socket_.is_open())
228 return;
229
230 // we are in progress of closing the connection
231 if (shutdown_)
232 {
234 return;
235 }
236
237 auto validator = m->getValidatorKey();
238 if (validator && !squelch_.expireSquelch(*validator))
239 {
242 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
243 return;
244 }
245
246 // report categorized outgoing traffic
248 safe_cast<TrafficCount::category>(m->getCategory()),
249 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
250
251 // report total outgoing traffic
253 TrafficCount::category::total, static_cast<int>(m->getBuffer(compressionEnabled_).size()));
254
255 auto sendq_size = send_queue_.size();
256
257 if (sendq_size < Tuning::targetSendQueue)
258 {
259 // To detect a peer that does not read from their
260 // side of the connection, we expect a peer to have
261 // a small senq periodically
262 large_sendq_ = 0;
263 }
264 else if (auto sink = journal_.debug(); sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
265 {
266 std::string const n = name();
267 sink << n << " sendq: " << sendq_size;
268 }
269
270 send_queue_.push(m);
271
272 if (sendq_size != 0)
273 return;
274
275 writePending_ = true;
276 boost::asio::async_write(
277 stream_,
278 boost::asio::buffer(send_queue_.front()->getBuffer(compressionEnabled_)),
279 bind_executor(
280 strand_,
281 std::bind(
284 std::placeholders::_1,
285 std::placeholders::_2)));
286}
287
288void
290{
291 if (!strand_.running_in_this_thread())
292 {
294 return;
295 }
296
297 if (!txQueue_.empty())
298 {
299 protocol::TMHaveTransactions ht;
300 std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
301 ht.add_hashes(hash.data(), hash.size());
302 });
303 JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
304 txQueue_.clear();
305 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
306 }
307}
308
309void
311{
312 if (!strand_.running_in_this_thread())
313 {
315 return;
316 }
317
319 {
320 JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap";
321 sendTxQueue();
322 }
323
324 txQueue_.insert(hash);
325 JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
326}
327
328void
330{
331 if (!strand_.running_in_this_thread())
332 {
334 return;
335 }
336
337 auto removed = txQueue_.erase(hash);
338 JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
339}
340
341void
343{
344 if ((usage_.charge(fee, context) == Resource::drop) && usage_.disconnect(p_journal_) &&
345 strand_.running_in_this_thread())
346 {
347 // Sever the connection
349 fail("charge: Resources");
350 }
351}
352
353//------------------------------------------------------------------------------
354
355bool
357{
358 auto const iter = headers_.find("Crawl");
359 if (iter == headers_.end())
360 return false;
361 return boost::iequals(iter->value(), "public");
362}
363
364bool
366{
367 return static_cast<bool>(app_.getCluster().member(publicKey_));
368}
369
372{
373 if (inbound_)
374 return headers_["User-Agent"];
375 return headers_["Server"];
376}
377
380{
382
383 ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
384 ret[jss::address] = remote_address_.to_string();
385
386 if (inbound_)
387 ret[jss::inbound] = true;
388
389 if (cluster())
390 {
391 ret[jss::cluster] = true;
392
393 if (auto const n = name(); !n.empty())
394 {
395 // Could move here if Json::Value supported moving from a string
396 ret[jss::name] = n;
397 }
398 }
399
400 if (auto const d = domain(); !d.empty())
401 ret[jss::server_domain] = std::string{d};
402
403 if (auto const nid = headers_["Network-ID"]; !nid.empty())
404 ret[jss::network_id] = std::string{nid};
405
406 ret[jss::load] = usage_.balance();
407
408 if (auto const version = getVersion(); !version.empty())
409 ret[jss::version] = std::string{version};
410
411 ret[jss::protocol] = to_string(protocol_);
412
413 {
415 if (latency_)
416 ret[jss::latency] = static_cast<Json::UInt>(latency_->count());
417 }
418
419 ret[jss::uptime] =
420 static_cast<Json::UInt>(std::chrono::duration_cast<std::chrono::seconds>(uptime()).count());
421
422 std::uint32_t minSeq = 0, maxSeq = 0;
423 ledgerRange(minSeq, maxSeq);
424
425 if ((minSeq != 0) || (maxSeq != 0))
426 ret[jss::complete_ledgers] = std::to_string(minSeq) + " - " + std::to_string(maxSeq);
427
428 switch (tracking_.load())
429 {
431 ret[jss::track] = "diverged";
432 break;
433
435 ret[jss::track] = "unknown";
436 break;
437
439 // Nothing to do here
440 break;
441 }
442
443 uint256 closedLedgerHash;
444 protocol::TMStatusChange last_status;
445 {
447 closedLedgerHash = closedLedgerHash_;
448 last_status = last_status_;
449 }
450
451 if (closedLedgerHash != beast::zero)
452 ret[jss::ledger] = to_string(closedLedgerHash);
453
454 if (last_status.has_newstatus())
455 {
456 switch (last_status.newstatus())
457 {
458 case protocol::nsCONNECTING:
459 ret[jss::status] = "connecting";
460 break;
461
462 case protocol::nsCONNECTED:
463 ret[jss::status] = "connected";
464 break;
465
466 case protocol::nsMONITORING:
467 ret[jss::status] = "monitoring";
468 break;
469
470 case protocol::nsVALIDATING:
471 ret[jss::status] = "validating";
472 break;
473
474 case protocol::nsSHUTTING:
475 ret[jss::status] = "shutting";
476 break;
477
478 default:
479 JLOG(p_journal_.warn()) << "Unknown status: " << last_status.newstatus();
480 }
481 }
482
483 ret[jss::metrics] = Json::Value(Json::objectValue);
484 ret[jss::metrics][jss::total_bytes_recv] = std::to_string(metrics_.recv.total_bytes());
485 ret[jss::metrics][jss::total_bytes_sent] = std::to_string(metrics_.sent.total_bytes());
486 ret[jss::metrics][jss::avg_bps_recv] = std::to_string(metrics_.recv.average_bytes());
487 ret[jss::metrics][jss::avg_bps_sent] = std::to_string(metrics_.sent.average_bytes());
488
489 return ret;
490}
491
492bool
494{
495 switch (f)
496 {
498 return protocol_ >= make_protocol(2, 1);
500 return protocol_ >= make_protocol(2, 2);
503 }
504 return false;
505}
506
507//------------------------------------------------------------------------------
508
509bool
511{
512 {
514 if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
516 return true;
517 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) != recentLedgers_.end())
518 return true;
519 }
520 return false;
521}
522
523void
525{
527
528 minSeq = minLedger_;
529 maxSeq = maxLedger_;
530}
531
532bool
533PeerImp::hasTxSet(uint256 const& hash) const
534{
536 return std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) != recentTxSets_.end();
537}
538
539void
541{
542 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
543 // guarded by recentLock_.
547}
548
549bool
551{
553 return (tracking_ != Tracking::diverged) && (uMin >= minLedger_) && (uMax <= maxLedger_);
554}
555
556//------------------------------------------------------------------------------
557
558void
560{
561 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::PeerImp::fail : strand in this thread");
562
563 if (!socket_.is_open())
564 return;
565
566 JLOG(journal_.warn()) << name << ": " << ec.message();
567
568 shutdown();
569}
570
571void
573{
574 if (!strand_.running_in_this_thread())
575 {
576 post(
577 strand_,
578 std::bind(
579 (void (Peer::*)(std::string const&))&PeerImp::fail, shared_from_this(), reason));
580 return;
581 }
582
583 if (!socket_.is_open())
584 return;
585
586 // Call to name() locks, log only if the message will be outputted
588 {
589 std::string const n = name();
590 JLOG(journal_.warn()) << n << " failed: " << reason;
591 }
592
593 shutdown();
594}
595
596void
598{
599 XRPL_ASSERT(
600 strand_.running_in_this_thread(),
601 "xrpl::PeerImp::tryAsyncShutdown : strand in this thread");
602
604 return;
605
607 return;
608
609 shutdownStarted_ = true;
610
611 setTimer(shutdownTimerInterval);
612
613 // gracefully shutdown the SSL socket, performing a shutdown handshake
614 stream_.async_shutdown(bind_executor(
615 strand_, std::bind(&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
616}
617
618void
620{
621 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::PeerImp::shutdown: strand in this thread");
622
623 if (!socket_.is_open() || shutdown_)
624 return;
625
626 shutdown_ = true;
627
628 boost::beast::get_lowest_layer(stream_).cancel();
629
631}
632
633void
635{
636 cancelTimer();
637 if (ec)
638 {
639 // - eof: the stream was cleanly closed
640 // - operation_aborted: an expired timer (slow shutdown)
641 // - stream_truncated: the tcp connection closed (no handshake) it could
642 // occur if a peer does not perform a graceful disconnect
643 // - broken_pipe: the peer is gone
644 bool const shouldLog =
645 (ec != boost::asio::error::eof && ec != boost::asio::error::operation_aborted &&
646 ec.message().find("application data after close notify") == std::string::npos);
647
648 if (shouldLog)
649 {
650 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
651 }
652 }
653
654 close();
655}
656
657void
659{
660 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::PeerImp::close : strand in this thread");
661
662 if (!socket_.is_open())
663 return;
664
665 cancelTimer();
666
667 error_code ec;
668 socket_.close(ec); // NOLINT(bugprone-unused-return-value)
669
671
672 // The rationale for using different severity levels is that
673 // outbound connections are under our control and may be logged
674 // at a higher level, but inbound connections are more numerous and
675 // uncontrolled so to prevent log flooding the severity is reduced.
676 JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed";
677}
678
679//------------------------------------------------------------------------------
680
681void
683{
684 try
685 {
686 timer_.expires_after(interval);
687 }
688 catch (std::exception const& ex)
689 {
690 JLOG(journal_.error()) << "setTimer: " << ex.what();
691 shutdown();
692 return;
693 }
694
695 timer_.async_wait(bind_executor(
696 strand_, std::bind(&PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
697}
698
699//------------------------------------------------------------------------------
700
703{
705 ss << "[" << fingerprint << "] ";
706 return ss.str();
707}
708
709void
711{
712 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::PeerImp::onTimer : strand in this thread");
713
714 if (!socket_.is_open())
715 return;
716
717 if (ec)
718 {
719 // do not initiate shutdown, timers are frequently cancelled
720 if (ec == boost::asio::error::operation_aborted)
721 return;
722
723 // This should never happen
724 JLOG(journal_.error()) << "onTimer: " << ec.message();
725 close();
726 return;
727 }
728
729 // the timer expired before the shutdown completed
730 // force close the connection
731 if (shutdown_)
732 {
733 JLOG(journal_.debug()) << "onTimer: shutdown timer expired";
734 close();
735 return;
736 }
737
739 {
740 fail("Large send queue");
741 return;
742 }
743
744 if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
745 {
746 clock_type::duration duration;
747
748 {
750 duration = clock_type::now() - trackingTime_;
751 }
752
753 if ((t == Tracking::diverged && (duration > app_.config().MAX_DIVERGED_TIME)) ||
754 (t == Tracking::unknown && (duration > app_.config().MAX_UNKNOWN_TIME)))
755 {
757 fail("Not useful");
758 return;
759 }
760 }
761
762 // Already waiting for PONG
763 if (lastPingSeq_)
764 {
765 fail("Ping Timeout");
766 return;
767 }
768
770 lastPingSeq_ = rand_int<std::uint32_t>();
771
772 protocol::TMPing message;
773 message.set_type(protocol::TMPing::ptPING);
774 message.set_seq(*lastPingSeq_);
775
776 send(std::make_shared<Message>(message, protocol::mtPING));
777
778 setTimer(peerTimerInterval);
779}
780
781void
783{
784 try
785 {
786 timer_.cancel();
787 }
788 catch (std::exception const& ex)
789 {
790 JLOG(journal_.error()) << "cancelTimer: " << ex.what();
791 }
792}
793
794//------------------------------------------------------------------------------
795void
797{
798 XRPL_ASSERT(read_buffer_.size() == 0, "xrpl::PeerImp::doAccept : empty read buffer");
799
800 JLOG(journal_.debug()) << "doAccept";
801
802 // a shutdown was initiated before the handshake, there is nothing to do
803 if (shutdown_)
804 {
806 return;
807 }
808
809 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
810
811 // This shouldn't fail since we already computed
812 // the shared value successfully in OverlayImpl
813 if (!sharedValue)
814 {
815 fail("makeSharedValue: Unexpected failure");
816 return;
817 }
818
819 JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_);
820
821 if (auto member = app_.getCluster().member(publicKey_))
822 {
823 {
824 std::unique_lock const lock{nameMutex_};
825 name_ = *member;
826 }
827 JLOG(journal_.info()) << "Cluster name: " << *member;
828 }
829
831
832 // XXX Set timer: connection is in grace period to be useful.
833 // XXX Set timer: connection idle (idle may vary depending on connection
834 // type.)
835
837
838 boost::beast::ostream(*write_buffer) << makeResponse(
840 request_,
843 *sharedValue,
845 protocol_,
846 app_);
847
848 // Write the whole buffer and only start protocol when that's done.
849 boost::asio::async_write(
850 stream_,
851 write_buffer->data(),
852 boost::asio::transfer_all(),
853 bind_executor(
854 strand_,
855 [this, write_buffer, self = shared_from_this()](
856 error_code ec, std::size_t bytes_transferred) {
857 if (!socket_.is_open())
858 return;
859 if (ec == boost::asio::error::operation_aborted)
860 {
862 return;
863 }
864 if (ec)
865 {
866 fail("onWriteResponse", ec);
867 return;
868 }
869 if (write_buffer->size() == bytes_transferred)
870 {
872 return;
873 }
874 fail("Failed to write header");
875 return;
876 }));
877}
878
881{
882 std::shared_lock const read_lock{nameMutex_};
883 return name_;
884}
885
888{
889 return headers_["Server-Domain"];
890}
891
892//------------------------------------------------------------------------------
893
894// Protocol logic
895
896void
898{
899 // a shutdown was initiated before the handshare, there is nothing to do
900 if (shutdown_)
901 {
903 return;
904 }
905
907
908 // Send all the validator lists that have been loaded
910 {
912 [&](std::string const& manifest,
913 std::uint32_t version,
915 PublicKey const& pubKey,
916 std::size_t maxSequence,
917 uint256 const& hash) {
919 *this,
920 0,
921 pubKey,
922 maxSequence,
923 version,
924 manifest,
925 blobInfos,
927 p_journal_);
928
929 // Don't send it next time.
931 });
932 }
933
934 if (auto m = overlay_.getManifestsMessage())
935 send(m);
936
937 setTimer(peerTimerInterval);
938}
939
940// Called repeatedly with protocol message data
941void
943{
944 XRPL_ASSERT(
945 strand_.running_in_this_thread(), "xrpl::PeerImp::onReadMessage : strand in this thread");
946
947 readPending_ = false;
948
949 if (!socket_.is_open())
950 return;
951
952 if (ec)
953 {
954 if (ec == boost::asio::error::eof)
955 {
956 JLOG(journal_.debug()) << "EOF";
957 shutdown();
958 return;
959 }
960
961 if (ec == boost::asio::error::operation_aborted)
962 {
964 return;
965 }
966
967 fail("onReadMessage", ec);
968 return;
969 }
970 // we started shutdown, no reason to process further data
971 if (shutdown_)
972 {
974 return;
975 }
976
977 if (auto stream = journal_.trace())
978 {
979 stream << "onReadMessage: "
980 << (bytes_transferred > 0 ? to_string(bytes_transferred) + " bytes" : "");
981 }
982
983 metrics_.recv.add_message(bytes_transferred);
984
985 read_buffer_.commit(bytes_transferred);
986
987 auto hint = Tuning::readBufferBytes;
988
989 while (read_buffer_.size() > 0)
990 {
991 std::size_t bytes_consumed = 0;
992
993 using namespace std::chrono_literals;
994 std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
995 [&]() { return invokeProtocolMessage(read_buffer_.data(), *this, hint); },
996 "invokeProtocolMessage",
997 350ms,
998 journal_);
999
1000 if (!socket_.is_open())
1001 return;
1002
1003 // the error_code is produced by invokeProtocolMessage
1004 // it could be due to a bad message
1005 if (ec)
1006 {
1007 fail("onReadMessage", ec);
1008 return;
1009 }
1010
1011 if (bytes_consumed == 0)
1012 break;
1013
1014 read_buffer_.consume(bytes_consumed);
1015 }
1016
1017 // check if a shutdown was initiated while processing messages
1018 if (shutdown_)
1019 {
1021 return;
1022 }
1023
1024 readPending_ = true;
1025
1026 XRPL_ASSERT(!shutdownStarted_, "xrpl::PeerImp::onReadMessage : shutdown started");
1027
1028 // Timeout on writes only
1029 stream_.async_read_some(
1031 bind_executor(
1032 strand_,
1033 std::bind(
1036 std::placeholders::_1,
1037 std::placeholders::_2)));
1038}
1039
1040void
1042{
1043 XRPL_ASSERT(
1044 strand_.running_in_this_thread(), "xrpl::PeerImp::onWriteMessage : strand in this thread");
1045
1046 writePending_ = false;
1047
1048 if (!socket_.is_open())
1049 return;
1050
1051 if (ec)
1052 {
1053 if (ec == boost::asio::error::operation_aborted)
1054 {
1056 return;
1057 }
1058
1059 fail("onWriteMessage", ec);
1060 return;
1061 }
1062
1063 if (auto stream = journal_.trace())
1064 {
1065 stream << "onWriteMessage: "
1066 << (bytes_transferred > 0 ? to_string(bytes_transferred) + " bytes" : "");
1067 }
1068
1069 metrics_.sent.add_message(bytes_transferred);
1070
1071 XRPL_ASSERT(!send_queue_.empty(), "xrpl::PeerImp::onWriteMessage : non-empty send buffer");
1072 send_queue_.pop();
1073
1074 if (shutdown_)
1075 {
1077 return;
1078 }
1079
1080 if (!send_queue_.empty())
1081 {
1082 writePending_ = true;
1083 XRPL_ASSERT(!shutdownStarted_, "xrpl::PeerImp::onWriteMessage : shutdown started");
1084
1085 // Timeout on writes only
1086 boost::asio::async_write(
1087 stream_,
1088 boost::asio::buffer(send_queue_.front()->getBuffer(compressionEnabled_)),
1089 bind_executor(
1090 strand_,
1091 std::bind(
1094 std::placeholders::_1,
1095 std::placeholders::_2)));
1096 return;
1097 }
1098}
1099
1100//------------------------------------------------------------------------------
1101//
1102// ProtocolHandler
1103//
1104//------------------------------------------------------------------------------
1105
1106void
1108{
1109 // TODO
1110}
1111
1112void
1114 std::uint16_t type,
1116 std::size_t size,
1117 std::size_t uncompressed_size,
1118 bool isCompressed)
1119{
1120 auto const name = protocolMessageName(type);
1123
1124 auto const category =
1125 TrafficCount::categorize(*m, static_cast<protocol::MessageType>(type), true);
1126
1127 // report total incoming traffic
1129
1130 // increase the traffic received for a specific category
1131 overlay_.reportInboundTraffic(category, static_cast<int>(size));
1132
1133 using namespace protocol;
1134 if ((type == MessageType::mtTRANSACTION || type == MessageType::mtHAVE_TRANSACTIONS ||
1135 type == MessageType::mtTRANSACTIONS ||
1136 // GET_OBJECTS
1138 // GET_LEDGER
1141 // LEDGER_DATA
1145 {
1146 overlay_.addTxMetrics(static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
1147 }
1148 JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " << uncompressed_size
1149 << " " << isCompressed;
1150}
1151
1152void
1158
1159void
1161{
1162 auto const s = m->list_size();
1163
1164 if (s == 0)
1165 {
1167 return;
1168 }
1169
1170 if (s > 100)
1172
1173 app_.getJobQueue().addJob(jtMANIFEST, "RcvManifests", [this, that = shared_from_this(), m]() {
1174 overlay_.onManifests(m, that);
1175 });
1176}
1177
1178void
1180{
1181 if (m->type() == protocol::TMPing::ptPING)
1182 {
1183 // We have received a ping request, reply with a pong
1185 m->set_type(protocol::TMPing::ptPONG);
1186 send(std::make_shared<Message>(*m, protocol::mtPING));
1187 return;
1188 }
1189
1190 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1191 {
1192 // Only reset the ping sequence if we actually received a
1193 // PONG with the correct cookie. That way, any peers which
1194 // respond with incorrect cookies will eventually time out.
1195 if (m->seq() == lastPingSeq_)
1196 {
1198
1199 // Update latency estimate
1200 auto const rtt =
1201 std::chrono::round<std::chrono::milliseconds>(clock_type::now() - lastPingTime_);
1202
1203 std::lock_guard const sl(recentLock_);
1204
1205 if (latency_)
1206 {
1207 latency_ = (*latency_ * 7 + rtt) / 8;
1208 }
1209 else
1210 {
1211 latency_ = rtt;
1212 }
1213 }
1214
1215 return;
1216 }
1217}
1218
1219void
1221{
1222 // VFALCO NOTE I think we should drop the peer immediately
1223 if (!cluster())
1224 {
1225 fee_.update(Resource::feeUselessData, "unknown cluster");
1226 return;
1227 }
1228
1229 for (int i = 0; i < m->clusternodes().size(); ++i)
1230 {
1231 protocol::TMClusterNode const& node = m->clusternodes(i);
1232
1234 if (node.has_nodename())
1235 name = node.nodename();
1236
1237 auto const publicKey = parseBase58<PublicKey>(TokenType::NodePublic, node.publickey());
1238
1239 // NIKB NOTE We should drop the peer immediately if
1240 // they send us a public key we can't parse
1241 if (publicKey)
1242 {
1243 auto const reportTime = NetClock::time_point{NetClock::duration{node.reporttime()}};
1244
1245 app_.getCluster().update(*publicKey, name, node.nodeload(), reportTime);
1246 }
1247 }
1248
1249 int const loadSources = m->loadsources().size();
1250 if (loadSources != 0)
1251 {
1252 Resource::Gossip gossip;
1253 gossip.items.reserve(loadSources);
1254 for (int i = 0; i < m->loadsources().size(); ++i)
1255 {
1256 protocol::TMLoadSource const& node = m->loadsources(i);
1258 item.address = beast::IP::Endpoint::from_string(node.name());
1259 item.balance = node.cost();
1260 if (item.address != beast::IP::Endpoint())
1261 gossip.items.push_back(item);
1262 }
1264 }
1265
1266 // Calculate the cluster fee:
1267 auto const thresh = app_.getTimeKeeper().now() - 90s;
1268 std::uint32_t clusterFee = 0;
1269
1271 fees.reserve(app_.getCluster().size());
1272
1273 app_.getCluster().for_each([&fees, thresh](ClusterNode const& status) {
1274 if (status.getReportTime() >= thresh)
1275 fees.push_back(status.getLoadFee());
1276 });
1277
1278 if (!fees.empty())
1279 {
1280 auto const index = fees.size() / 2;
1281 std::nth_element(fees.begin(), fees.begin() + index, fees.end());
1282 clusterFee = fees[index];
1283 }
1284
1285 app_.getFeeTrack().setClusterFee(clusterFee);
1286}
1287
1288void
1290{
1291 // Don't allow endpoints from peers that are not known tracking or are
1292 // not using a version of the message that we support:
1293 if (tracking_.load() != Tracking::converged || m->version() != 2)
1294 return;
1295
1296 // The number is arbitrary and doesn't have any real significance or
1297 // implication for the protocol.
1298 if (m->endpoints_v2().size() >= 1024)
1299 {
1300 fee_.update(Resource::feeUselessData, "endpoints too large");
1301 return;
1302 }
1303
1305 endpoints.reserve(m->endpoints_v2().size());
1306
1307 auto malformed = 0;
1308 for (auto const& tm : m->endpoints_v2())
1309 {
1310 auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
1311
1312 if (!result)
1313 {
1314 JLOG(p_journal_.error())
1315 << "failed to parse incoming endpoint: {" << tm.endpoint() << "}";
1316 malformed++;
1317 continue;
1318 }
1319
1320 // If hops == 0, this Endpoint describes the peer we are connected
1321 // to -- in that case, we take the remote address seen on the
1322 // socket and store that in the IP::Endpoint. If this is the first
1323 // time, then we'll verify that their listener can receive incoming
1324 // by performing a connectivity test. if hops > 0, then we just
1325 // take the address/port we were given
1326 if (tm.hops() == 0)
1327 result = remote_address_.at_port(result->port());
1328
1329 endpoints.emplace_back(*result, tm.hops());
1330 }
1331
1332 // Charge the peer for each malformed endpoint. As there still may be
1333 // multiple valid endpoints we don't return early.
1334 if (malformed > 0)
1335 {
1336 fee_.update(
1337 Resource::feeInvalidData * malformed,
1338 std::to_string(malformed) + " malformed endpoints");
1339 }
1340
1341 if (!endpoints.empty())
1342 overlay_.peerFinder().on_endpoints(slot_, endpoints);
1343}
1344
1345void
1350
1351void
1354 bool eraseTxQueue,
1355 bool batch)
1356{
1357 XRPL_ASSERT(eraseTxQueue != batch, ("xrpl::PeerImp::handleTransaction : valid inputs"));
1359 return;
1360
1362 {
1363 // If we've never been in synch, there's nothing we can do
1364 // with a transaction
1365 JLOG(p_journal_.debug()) << "Ignoring incoming transaction: Need network ledger";
1366 return;
1367 }
1368
1369 SerialIter sit(makeSlice(m->rawtransaction()));
1370
1371 try
1372 {
1373 auto stx = std::make_shared<STTx const>(sit);
1374 uint256 const txID = stx->getTransactionID();
1375
1376 // Charge strongly for attempting to relay a txn with tfInnerBatchTxn
1377 // LCOV_EXCL_START
1378 /*
1379 There is no need to check whether the featureBatch amendment is
1380 enabled.
1381
1382 * If the `tfInnerBatchTxn` flag is set, and the amendment is
1383 enabled, then it's an invalid transaction because inner batch
1384 transactions should not be relayed.
1385 * If the `tfInnerBatchTxn` flag is set, and the amendment is *not*
1386 enabled, then the transaction is malformed because it's using an
1387 "unknown" flag. There's no need to waste the resources to send it
1388 to the transaction engine.
1389
1390 We don't normally check transaction validity at this level, but
1391 since we _need_ to check it when the amendment is enabled, we may as
1392 well drop it if the flag is set regardless.
1393 */
1394 if (stx->isFlag(tfInnerBatchTxn))
1395 {
1396 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
1397 "tfInnerBatchTxn (handleTransaction).";
1398 fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
1399 return;
1400 }
1401 // LCOV_EXCL_STOP
1402
1404 constexpr std::chrono::seconds tx_interval = 10s;
1405
1406 if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
1407 {
1408 // we have seen this transaction recently
1409 if (any(flags & HashRouterFlags::BAD))
1410 {
1411 fee_.update(Resource::feeUselessData, "known bad");
1412 JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
1413 }
1414
1415 // Erase only if the server has seen this tx. If the server has not
1416 // seen this tx then the tx could not has been queued for this peer.
1417 else if (eraseTxQueue && txReduceRelayEnabled())
1418 {
1419 removeTxQueue(txID);
1420 }
1421
1424
1425 return;
1426 }
1427
1428 JLOG(p_journal_.debug()) << "Got tx " << txID;
1429
1430 bool checkSignature = true;
1431 if (cluster())
1432 {
1433 if (!m->has_deferred() || !m->deferred())
1434 {
1435 // Skip local checks if a server we trust
1436 // put the transaction in its open ledger
1437 flags |= HashRouterFlags::TRUSTED;
1438 }
1439
1440 // for non-validator nodes only -- localPublicKey is set for
1441 // validators only
1443 {
1444 // For now, be paranoid and have each validator
1445 // check each transaction, regardless of source
1446 checkSignature = false;
1447 }
1448 }
1449
1451 {
1452 JLOG(p_journal_.trace()) << "No new transactions until synchronized";
1453 }
1455 {
1457 JLOG(p_journal_.info()) << "Transaction queue is full";
1458 }
1459 else
1460 {
1463 "RcvCheckTx",
1465 flags,
1466 checkSignature,
1467 batch,
1468 stx]() {
1469 if (auto peer = weak.lock())
1470 peer->checkTransaction(flags, checkSignature, stx, batch);
1471 });
1472 }
1473 }
1474 catch (std::exception const& ex)
1475 {
1476 JLOG(p_journal_.warn()) << "Transaction invalid: " << strHex(m->rawtransaction())
1477 << ". Exception: " << ex.what();
1478 }
1479}
1480
1481void
1483{
1484 auto badData = [&](std::string const& msg) {
1485 fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
1486 JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
1487 };
1488 auto const itype{m->itype()};
1489
1490 // Verify ledger info type
1491 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1492 {
1493 badData("Invalid ledger info type");
1494 return;
1495 }
1496
1497 auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
1498 if (m->has_ltype())
1499 return m->ltype();
1500 return std::nullopt;
1501 }();
1502
1503 if (itype == protocol::liTS_CANDIDATE)
1504 {
1505 if (!m->has_ledgerhash())
1506 {
1507 badData("Invalid TX candidate set, missing TX set hash");
1508 return;
1509 }
1510 }
1511 else if (
1512 !m->has_ledgerhash() && !m->has_ledgerseq() && (!ltype || *ltype != protocol::ltCLOSED))
1513 {
1514 badData("Invalid request");
1515 return;
1516 }
1517
1518 // Verify ledger type
1519 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1520 {
1521 badData("Invalid ledger type");
1522 return;
1523 }
1524
1525 // Verify ledger hash
1526 if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash()))
1527 {
1528 badData("Invalid ledger hash");
1529 return;
1530 }
1531
1532 // Verify ledger sequence
1533 if (m->has_ledgerseq())
1534 {
1535 auto const ledgerSeq{m->ledgerseq()};
1536
1537 // Check if within a reasonable range
1538 using namespace std::chrono_literals;
1540 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1541 {
1542 badData("Invalid ledger sequence " + std::to_string(ledgerSeq));
1543 return;
1544 }
1545 }
1546
1547 // Verify ledger node IDs
1548 if (itype != protocol::liBASE)
1549 {
1550 if (m->nodeids_size() <= 0)
1551 {
1552 badData("Invalid ledger node IDs");
1553 return;
1554 }
1555
1556 for (auto const& nodeId : m->nodeids())
1557 {
1559 {
1560 badData("Invalid SHAMap node ID");
1561 return;
1562 }
1563 }
1564 }
1565
1566 // Verify query type
1567 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1568 {
1569 badData("Invalid query type");
1570 return;
1571 }
1572
1573 // Verify query depth
1574 if (m->has_querydepth())
1575 {
1576 if (m->querydepth() > Tuning::maxQueryDepth || itype == protocol::liBASE)
1577 {
1578 badData("Invalid query depth");
1579 return;
1580 }
1581 }
1582
1583 // Queue a job to process the request
1585 app_.getJobQueue().addJob(jtLEDGER_REQ, "RcvGetLedger", [weak, m]() {
1586 if (auto peer = weak.lock())
1587 peer->processLedgerRequest(m);
1588 });
1589}
1590
1591void
1593{
1594 JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
1596 {
1597 fee_.update(Resource::feeMalformedRequest, "proof_path_request disabled");
1598 return;
1599 }
1600
1601 fee_.update(Resource::feeModerateBurdenPeer, "received a proof path request");
1603 app_.getJobQueue().addJob(jtREPLAY_REQ, "RcvProofPReq", [weak, m]() {
1604 if (auto peer = weak.lock())
1605 {
1606 auto reply = peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1607 if (reply.has_error())
1608 {
1609 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1610 {
1611 peer->charge(Resource::feeMalformedRequest, "proof_path_request");
1612 }
1613 else
1614 {
1615 peer->charge(Resource::feeRequestNoReply, "proof_path_request");
1616 }
1617 }
1618 else
1619 {
1620 peer->send(std::make_shared<Message>(reply, protocol::mtPROOF_PATH_RESPONSE));
1621 }
1622 }
1623 });
1624}
1625
1626void
1628{
1629 if (!ledgerReplayEnabled_)
1630 {
1631 fee_.update(Resource::feeMalformedRequest, "proof_path_response disabled");
1632 return;
1633 }
1634
1635 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1636 {
1637 fee_.update(Resource::feeInvalidData, "proof_path_response");
1638 }
1639}
1640
1641void
1643{
1644 JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
1645 if (!ledgerReplayEnabled_)
1646 {
1647 fee_.update(Resource::feeMalformedRequest, "replay_delta_request disabled");
1648 return;
1649 }
1650
1651 fee_.fee = Resource::feeModerateBurdenPeer;
1652 std::weak_ptr<PeerImp> const weak = shared_from_this();
1653 app_.getJobQueue().addJob(jtREPLAY_REQ, "RcvReplDReq", [weak, m]() {
1654 if (auto peer = weak.lock())
1655 {
1656 auto reply = peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1657 if (reply.has_error())
1658 {
1659 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1660 {
1661 peer->charge(Resource::feeMalformedRequest, "replay_delta_request");
1662 }
1663 else
1664 {
1665 peer->charge(Resource::feeRequestNoReply, "replay_delta_request");
1666 }
1667 }
1668 else
1669 {
1670 peer->send(std::make_shared<Message>(reply, protocol::mtREPLAY_DELTA_RESPONSE));
1671 }
1672 }
1673 });
1674}
1675
1676void
1678{
1679 if (!ledgerReplayEnabled_)
1680 {
1681 fee_.update(Resource::feeMalformedRequest, "replay_delta_response disabled");
1682 return;
1683 }
1684
1685 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1686 {
1687 fee_.update(Resource::feeInvalidData, "replay_delta_response");
1688 }
1689}
1690
1691void
1693{
1694 auto badData = [&](std::string const& msg) {
1695 fee_.update(Resource::feeInvalidData, msg);
1696 JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
1697 };
1698
1699 // Verify ledger hash
1700 if (!stringIsUint256Sized(m->ledgerhash()))
1701 {
1702 badData("Invalid ledger hash");
1703 return;
1704 }
1705
1706 // Verify ledger sequence
1707 {
1708 auto const ledgerSeq{m->ledgerseq()};
1709 if (m->type() == protocol::liTS_CANDIDATE)
1710 {
1711 if (ledgerSeq != 0)
1712 {
1713 badData("Invalid ledger sequence " + std::to_string(ledgerSeq));
1714 return;
1715 }
1716 }
1717 else
1718 {
1719 // Check if within a reasonable range
1720 using namespace std::chrono_literals;
1721 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1722 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1723 {
1724 badData("Invalid ledger sequence " + std::to_string(ledgerSeq));
1725 return;
1726 }
1727 }
1728 }
1729
1730 // Verify ledger info type
1731 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1732 {
1733 badData("Invalid ledger info type");
1734 return;
1735 }
1736
1737 // Verify reply error
1738 if (m->has_error() &&
1739 (m->error() < protocol::reNO_LEDGER || m->error() > protocol::reBAD_REQUEST))
1740 {
1741 badData("Invalid reply error");
1742 return;
1743 }
1744
1745 // Verify ledger nodes.
1746 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1747 {
1748 badData("Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
1749 return;
1750 }
1751
1752 // If there is a request cookie, attempt to relay the message
1753 if (m->has_requestcookie())
1754 {
1755 if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1756 {
1757 m->clear_requestcookie();
1758 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1759 }
1760 else
1761 {
1762 JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
1763 }
1764 return;
1765 }
1766
1767 uint256 const ledgerHash{m->ledgerhash()};
1768
1769 // Otherwise check if received data for a candidate transaction set
1770 if (m->type() == protocol::liTS_CANDIDATE)
1771 {
1772 std::weak_ptr<PeerImp> const weak{shared_from_this()};
1773 app_.getJobQueue().addJob(jtTXN_DATA, "RcvPeerData", [weak, ledgerHash, m]() {
1774 if (auto peer = weak.lock())
1775 {
1776 peer->app_.getInboundTransactions().gotData(ledgerHash, peer, m);
1777 }
1778 });
1779 return;
1780 }
1781
1782 // Consume the message
1783 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1784}
1785
1786void
1788{
1789 protocol::TMProposeSet const& set = *m;
1790
1791 auto const sig = makeSlice(set.signature());
1792
1793 // Preliminary check for the validity of the signature: A DER encoded
1794 // signature can't be longer than 72 bytes.
1795 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1796 (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
1797 {
1798 JLOG(p_journal_.warn()) << "Proposal: malformed";
1799 fee_.update(Resource::feeInvalidSignature, " signature can't be longer than 72 bytes");
1800 return;
1801 }
1802
1803 if (!stringIsUint256Sized(set.currenttxhash()) || !stringIsUint256Sized(set.previousledger()))
1804 {
1805 JLOG(p_journal_.warn()) << "Proposal: malformed";
1806 fee_.update(Resource::feeMalformedRequest, "bad hashes");
1807 return;
1808 }
1809
1810 // RH TODO: when isTrusted = false we should probably also cache a key
1811 // suppression for 30 seconds to avoid doing a relatively expensive lookup
1812 // every time a spam packet is received
1813 PublicKey const publicKey{makeSlice(set.nodepubkey())};
1814 auto const isTrusted = app_.getValidators().trusted(publicKey);
1815
1816 // If the operator has specified that untrusted proposals be dropped then
1817 // this happens here I.e. before further wasting CPU verifying the signature
1818 // of an untrusted key
1819 if (!isTrusted)
1820 {
1821 // report untrusted proposal messages
1822 overlay_.reportInboundTraffic(
1823 TrafficCount::category::proposal_untrusted, Message::messageSize(*m));
1824
1825 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1826 return;
1827 }
1828
1829 uint256 const proposeHash{set.currenttxhash()};
1830 uint256 const prevLedger{set.previousledger()};
1831
1832 NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
1833
1834 uint256 const suppression = proposalUniqueId(
1835 proposeHash, prevLedger, set.proposeseq(), closeTime, publicKey.slice(), sig);
1836
1837 if (auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1838 !added)
1839 {
1840 // Count unique messages (Slots has it's own 'HashRouter'), which a peer
1841 // receives within IDLED seconds since the message has been relayed.
1842 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
1843 overlay_.updateSlotAndSquelch(suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1844
1845 // report duplicate proposal messages
1846 overlay_.reportInboundTraffic(
1847 TrafficCount::category::proposal_duplicate, Message::messageSize(*m));
1848
1849 JLOG(p_journal_.trace()) << "Proposal: duplicate";
1850
1851 return;
1852 }
1853
1854 if (!isTrusted)
1855 {
1856 if (tracking_.load() == Tracking::diverged)
1857 {
1858 JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (peer divergence)";
1859 return;
1860 }
1861
1862 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1863 {
1864 JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
1865 return;
1866 }
1867 }
1868
1869 JLOG(p_journal_.trace()) << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
1870
1871 auto proposal = RCLCxPeerPos(
1872 publicKey,
1873 sig,
1874 suppression,
1876 prevLedger,
1877 set.proposeseq(),
1878 proposeHash,
1879 closeTime,
1880 app_.getTimeKeeper().closeTime(),
1881 calcNodeID(app_.getValidatorManifests().getMasterKey(publicKey))});
1882
1883 std::weak_ptr<PeerImp> const weak = shared_from_this();
1884 app_.getJobQueue().addJob(
1885 isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "checkPropose", [weak, isTrusted, m, proposal]() {
1886 if (auto peer = weak.lock())
1887 peer->checkPropose(isTrusted, m, proposal);
1888 });
1889}
1890
1891void
1893{
1894 JLOG(p_journal_.trace()) << "Status: Change";
1895
1896 if (!m->has_networktime())
1897 m->set_networktime(app_.getTimeKeeper().now().time_since_epoch().count());
1898
1899 {
1900 std::lock_guard const sl(recentLock_);
1901 if (!last_status_.has_newstatus() || m->has_newstatus())
1902 {
1903 last_status_ = *m;
1904 }
1905 else
1906 {
1907 // preserve old status
1908 protocol::NodeStatus const status = last_status_.newstatus();
1909 last_status_ = *m;
1910 m->set_newstatus(status);
1911 }
1912 }
1913
1914 if (m->newevent() == protocol::neLOST_SYNC)
1915 {
1916 bool outOfSync{false};
1917 {
1918 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1919 // guarded by recentLock_.
1920 std::lock_guard const sl(recentLock_);
1921 if (!closedLedgerHash_.isZero())
1922 {
1923 outOfSync = true;
1924 closedLedgerHash_.zero();
1925 }
1926 previousLedgerHash_.zero();
1927 }
1928 if (outOfSync)
1929 {
1930 JLOG(p_journal_.debug()) << "Status: Out of sync";
1931 }
1932 return;
1933 }
1934
1935 {
1936 uint256 closedLedgerHash{};
1937 bool const peerChangedLedgers{m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
1938
1939 {
1940 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1941 // guarded by recentLock_.
1942 std::lock_guard const sl(recentLock_);
1943 if (peerChangedLedgers)
1944 {
1945 closedLedgerHash_ = m->ledgerhash();
1946 closedLedgerHash = closedLedgerHash_;
1947 addLedger(closedLedgerHash, sl);
1948 }
1949 else
1950 {
1951 closedLedgerHash_.zero();
1952 }
1953
1954 if (m->has_ledgerhashprevious() && stringIsUint256Sized(m->ledgerhashprevious()))
1955 {
1956 previousLedgerHash_ = m->ledgerhashprevious();
1957 addLedger(previousLedgerHash_, sl);
1958 }
1959 else
1960 {
1961 previousLedgerHash_.zero();
1962 }
1963 }
1964 if (peerChangedLedgers)
1965 {
1966 JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
1967 }
1968 else
1969 {
1970 JLOG(p_journal_.debug()) << "Status: No ledger";
1971 }
1972 }
1973
1974 if (m->has_firstseq() && m->has_lastseq())
1975 {
1976 std::lock_guard const sl(recentLock_);
1977
1978 minLedger_ = m->firstseq();
1979 maxLedger_ = m->lastseq();
1980
1981 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1982 minLedger_ = maxLedger_ = 0;
1983 }
1984
1985 if (m->has_ledgerseq() && app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1986 {
1987 checkTracking(m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1988 }
1989
1990 app_.getOPs().pubPeerStatus([m, this]() -> Json::Value {
1992
1993 if (m->has_newstatus())
1994 {
1995 switch (m->newstatus())
1996 {
1997 case protocol::nsCONNECTING:
1998 j[jss::status] = "CONNECTING";
1999 break;
2000 case protocol::nsCONNECTED:
2001 j[jss::status] = "CONNECTED";
2002 break;
2003 case protocol::nsMONITORING:
2004 j[jss::status] = "MONITORING";
2005 break;
2006 case protocol::nsVALIDATING:
2007 j[jss::status] = "VALIDATING";
2008 break;
2009 case protocol::nsSHUTTING:
2010 j[jss::status] = "SHUTTING";
2011 break;
2012 }
2013 }
2014
2015 if (m->has_newevent())
2016 {
2017 switch (m->newevent())
2018 {
2019 case protocol::neCLOSING_LEDGER:
2020 j[jss::action] = "CLOSING_LEDGER";
2021 break;
2022 case protocol::neACCEPTED_LEDGER:
2023 j[jss::action] = "ACCEPTED_LEDGER";
2024 break;
2025 case protocol::neSWITCHED_LEDGER:
2026 j[jss::action] = "SWITCHED_LEDGER";
2027 break;
2028 case protocol::neLOST_SYNC:
2029 j[jss::action] = "LOST_SYNC";
2030 break;
2031 }
2032 }
2033
2034 if (m->has_ledgerseq())
2035 {
2036 j[jss::ledger_index] = m->ledgerseq();
2037 }
2038
2039 if (m->has_ledgerhash())
2040 {
2041 uint256 closedLedgerHash{};
2042 {
2043 std::lock_guard const sl(recentLock_);
2044 closedLedgerHash = closedLedgerHash_;
2045 }
2046 j[jss::ledger_hash] = to_string(closedLedgerHash);
2047 }
2048
2049 if (m->has_networktime())
2050 {
2051 j[jss::date] = Json::UInt(m->networktime());
2052 }
2053
2054 if (m->has_firstseq() && m->has_lastseq())
2055 {
2056 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2057 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2058 }
2059
2060 return j;
2061 });
2062}
2063
2064void
2065PeerImp::checkTracking(std::uint32_t validationSeq)
2066{
2067 std::uint32_t serverSeq = 0;
2068 {
2069 // Extract the sequence number of the highest
2070 // ledger this peer has
2071 std::lock_guard const sl(recentLock_);
2072
2073 serverSeq = maxLedger_;
2074 }
2075 if (serverSeq != 0)
2076 {
2077 // Compare the peer's ledger sequence to the
2078 // sequence of a recently-validated ledger
2079 checkTracking(serverSeq, validationSeq);
2080 }
2081}
2082
2083void
2084PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
2085{
2086 int const diff = std::max(seq1, seq2) - std::min(seq1, seq2);
2087
2088 if (diff < Tuning::convergedLedgerLimit)
2089 {
2090 // The peer's ledger sequence is close to the validation's
2091 tracking_ = Tracking::converged;
2092 }
2093
2094 if ((diff > Tuning::divergedLedgerLimit) && (tracking_.load() != Tracking::diverged))
2095 {
2096 // The peer's ledger sequence is way off the validation's
2097 std::lock_guard const sl(recentLock_);
2098
2099 tracking_ = Tracking::diverged;
2100 trackingTime_ = clock_type::now();
2101 }
2102}
2103
2104void
2106{
2107 if (!stringIsUint256Sized(m->hash()))
2108 {
2109 fee_.update(Resource::feeMalformedRequest, "bad hash");
2110 return;
2111 }
2112
2113 uint256 const hash{m->hash()};
2114
2115 if (m->status() == protocol::tsHAVE)
2116 {
2117 std::lock_guard const sl(recentLock_);
2118
2119 if (std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) != recentTxSets_.end())
2120 {
2121 fee_.update(Resource::feeUselessData, "duplicate (tsHAVE)");
2122 return;
2123 }
2124
2125 recentTxSets_.push_back(hash);
2126 }
2127}
2128
2129void
2130PeerImp::onValidatorListMessage(
2131 std::string const& messageType,
2132 std::string const& manifest,
2133 std::uint32_t version,
2134 std::vector<ValidatorBlobInfo> const& blobs)
2135{
2136 // If there are no blobs, the message is malformed (possibly because of
2137 // ValidatorList class rules), so charge accordingly and skip processing.
2138 if (blobs.empty())
2139 {
2140 JLOG(p_journal_.warn()) << "Ignored malformed " << messageType;
2141 // This shouldn't ever happen with a well-behaved peer
2142 fee_.update(Resource::feeHeavyBurdenPeer, "no blobs");
2143 return;
2144 }
2145
2146 auto const hash = sha512Half(manifest, blobs, version);
2147
2148 JLOG(p_journal_.debug()) << "Received " << messageType;
2149
2150 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2151 {
2152 JLOG(p_journal_.debug()) << messageType << ": received duplicate " << messageType;
2153 // Charging this fee here won't hurt the peer in the normal
2154 // course of operation (ie. refresh every 5 minutes), but
2155 // will add up if the peer is misbehaving.
2156 fee_.update(Resource::feeUselessData, "duplicate");
2157 return;
2158 }
2159
2160 auto const applyResult = app_.getValidators().applyListsAndBroadcast(
2161 manifest,
2162 version,
2163 blobs,
2164 remote_address_.to_string(),
2165 hash,
2166 app_.getOverlay(),
2167 app_.getHashRouter(),
2168 app_.getOPs());
2169
2170 JLOG(p_journal_.debug()) << "Processed " << messageType << " version " << version << " from "
2171 << (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
2172 : "unknown or invalid publisher")
2173 << " with best result " << to_string(applyResult.bestDisposition());
2174
2175 // Act based on the best result
2176 switch (applyResult.bestDisposition())
2177 {
2178 // New list
2179 case ListDisposition::accepted:
2180 // Newest list is expired, and that needs to be broadcast, too
2181 case ListDisposition::expired:
2182 // Future list
2183 case ListDisposition::pending: {
2184 std::lock_guard<std::mutex> const sl(recentLock_);
2185
2186 XRPL_ASSERT(
2187 applyResult.publisherKey,
2188 "xrpl::PeerImp::onValidatorListMessage : publisher key is "
2189 "set");
2190 auto const& pubKey = *applyResult.publisherKey;
2191#ifndef NDEBUG
2192 if (auto const iter = publisherListSequences_.find(pubKey);
2193 iter != publisherListSequences_.end())
2194 {
2195 XRPL_ASSERT(
2196 iter->second < applyResult.sequence,
2197 "xrpl::PeerImp::onValidatorListMessage : lower sequence");
2198 }
2199#endif
2200 publisherListSequences_[pubKey] = applyResult.sequence;
2201 }
2202 break;
2203 case ListDisposition::same_sequence:
2204 case ListDisposition::known_sequence:
2205#ifndef NDEBUG
2206 {
2207 std::lock_guard<std::mutex> const sl(recentLock_);
2208 XRPL_ASSERT(
2209 applyResult.sequence && applyResult.publisherKey,
2210 "xrpl::PeerImp::onValidatorListMessage : nonzero sequence "
2211 "and set publisher key");
2212 XRPL_ASSERT(
2213 publisherListSequences_[*applyResult.publisherKey] <= applyResult.sequence,
2214 "xrpl::PeerImp::onValidatorListMessage : maximum sequence");
2215 }
2216#endif // !NDEBUG
2217
2218 break;
2219 case ListDisposition::stale:
2220 case ListDisposition::untrusted:
2221 case ListDisposition::invalid:
2222 case ListDisposition::unsupported_version:
2223 break;
2224 // LCOV_EXCL_START
2225 default:
2226 UNREACHABLE(
2227 "xrpl::PeerImp::onValidatorListMessage : invalid best list "
2228 "disposition");
2229 // LCOV_EXCL_STOP
2230 }
2231
2232 // Charge based on the worst result
2233 switch (applyResult.worstDisposition())
2234 {
2235 case ListDisposition::accepted:
2236 case ListDisposition::expired:
2237 case ListDisposition::pending:
2238 // No charges for good data
2239 break;
2240 case ListDisposition::same_sequence:
2241 case ListDisposition::known_sequence:
2242 // Charging this fee here won't hurt the peer in the normal
2243 // course of operation (ie. refresh every 5 minutes), but
2244 // will add up if the peer is misbehaving.
2245 fee_.update(Resource::feeUselessData, " duplicate (same_sequence or known_sequence)");
2246 break;
2247 case ListDisposition::stale:
2248 // There are very few good reasons for a peer to send an
2249 // old list, particularly more than once.
2250 fee_.update(Resource::feeInvalidData, "expired");
2251 break;
2252 case ListDisposition::untrusted:
2253 // Charging this fee here won't hurt the peer in the normal
2254 // course of operation (ie. refresh every 5 minutes), but
2255 // will add up if the peer is misbehaving.
2256 fee_.update(Resource::feeUselessData, "untrusted");
2257 break;
2258 case ListDisposition::invalid:
2259 // This shouldn't ever happen with a well-behaved peer
2260 fee_.update(Resource::feeInvalidSignature, "invalid list disposition");
2261 break;
2262 case ListDisposition::unsupported_version:
2263 // During a version transition, this may be legitimate.
2264 // If it happens frequently, that's probably bad.
2265 fee_.update(Resource::feeInvalidData, "version");
2266 break;
2267 // LCOV_EXCL_START
2268 default:
2269 UNREACHABLE(
2270 "xrpl::PeerImp::onValidatorListMessage : invalid worst list "
2271 "disposition");
2272 // LCOV_EXCL_STOP
2273 }
2274
2275 // Log based on all the results.
2276 for (auto const& [disp, count] : applyResult.dispositions)
2277 {
2278 switch (disp)
2279 {
2280 // New list
2281 case ListDisposition::accepted:
2282 JLOG(p_journal_.debug()) << "Applied " << count << " new " << messageType;
2283 break;
2284 // Newest list is expired, and that needs to be broadcast, too
2285 case ListDisposition::expired:
2286 JLOG(p_journal_.debug()) << "Applied " << count << " expired " << messageType;
2287 break;
2288 // Future list
2289 case ListDisposition::pending:
2290 JLOG(p_journal_.debug()) << "Processed " << count << " future " << messageType;
2291 break;
2292 case ListDisposition::same_sequence:
2293 JLOG(p_journal_.warn())
2294 << "Ignored " << count << " " << messageType << "(s) with current sequence";
2295 break;
2296 case ListDisposition::known_sequence:
2297 JLOG(p_journal_.warn())
2298 << "Ignored " << count << " " << messageType << "(s) with future sequence";
2299 break;
2300 case ListDisposition::stale:
2301 JLOG(p_journal_.warn()) << "Ignored " << count << "stale " << messageType;
2302 break;
2303 case ListDisposition::untrusted:
2304 JLOG(p_journal_.warn()) << "Ignored " << count << " untrusted " << messageType;
2305 break;
2306 case ListDisposition::unsupported_version:
2307 JLOG(p_journal_.warn())
2308 << "Ignored " << count << "unsupported version " << messageType;
2309 break;
2310 case ListDisposition::invalid:
2311 JLOG(p_journal_.warn()) << "Ignored " << count << "invalid " << messageType;
2312 break;
2313 // LCOV_EXCL_START
2314 default:
2315 UNREACHABLE(
2316 "xrpl::PeerImp::onValidatorListMessage : invalid list "
2317 "disposition");
2318 // LCOV_EXCL_STOP
2319 }
2320 }
2321}
2322
2323void
2325{
2326 try
2327 {
2328 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2329 {
2330 JLOG(p_journal_.debug()) << "ValidatorList: received validator list from peer using "
2331 << "protocol version " << to_string(protocol_)
2332 << " which shouldn't support this feature.";
2333 fee_.update(Resource::feeUselessData, "unsupported peer");
2334 return;
2335 }
2336 onValidatorListMessage(
2337 "ValidatorList", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2338 }
2339 catch (std::exception const& e)
2340 {
2341 JLOG(p_journal_.warn()) << "ValidatorList: Exception, " << e.what();
2342 using namespace std::string_literals;
2343 fee_.update(Resource::feeInvalidData, e.what());
2344 }
2345}
2346
2347void
2349{
2350 try
2351 {
2352 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2353 {
2354 JLOG(p_journal_.debug())
2355 << "ValidatorListCollection: received validator list from peer "
2356 << "using protocol version " << to_string(protocol_)
2357 << " which shouldn't support this feature.";
2358 fee_.update(Resource::feeUselessData, "unsupported peer");
2359 return;
2360 }
2361 if (m->version() < 2)
2362 {
2363 JLOG(p_journal_.debug())
2364 << "ValidatorListCollection: received invalid validator list "
2365 "version "
2366 << m->version() << " from peer using protocol version " << to_string(protocol_);
2367 fee_.update(Resource::feeInvalidData, "wrong version");
2368 return;
2369 }
2370 onValidatorListMessage(
2371 "ValidatorListCollection", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2372 }
2373 catch (std::exception const& e)
2374 {
2375 JLOG(p_journal_.warn()) << "ValidatorListCollection: Exception, " << e.what();
2376 using namespace std::string_literals;
2377 fee_.update(Resource::feeInvalidData, e.what());
2378 }
2379}
2380
2381void
2383{
2384 if (m->validation().size() < 50)
2385 {
2386 JLOG(p_journal_.warn()) << "Validation: Too small";
2387 fee_.update(Resource::feeMalformedRequest, "too small");
2388 return;
2389 }
2390
2391 try
2392 {
2393 auto const closeTime = app_.getTimeKeeper().closeTime();
2394
2396 {
2397 SerialIter sit(makeSlice(m->validation()));
2399 std::ref(sit),
2400 [this](PublicKey const& pk) {
2401 return calcNodeID(app_.getValidatorManifests().getMasterKey(pk));
2402 },
2403 false);
2404 val->setSeen(closeTime);
2405 }
2406
2407 if (!isCurrent(
2408 app_.getValidations().parms(),
2409 app_.getTimeKeeper().closeTime(),
2410 val->getSignTime(),
2411 val->getSeenTime()))
2412 {
2413 JLOG(p_journal_.trace()) << "Validation: Not current";
2414 fee_.update(Resource::feeUselessData, "not current");
2415 return;
2416 }
2417
2418 // RH TODO: when isTrusted = false we should probably also cache a key
2419 // suppression for 30 seconds to avoid doing a relatively expensive
2420 // lookup every time a spam packet is received
2421 auto const isTrusted = app_.getValidators().trusted(val->getSignerPublic());
2422
2423 // If the operator has specified that untrusted validations be
2424 // dropped then this happens here I.e. before further wasting CPU
2425 // verifying the signature of an untrusted key
2426 if (!isTrusted)
2427 {
2428 // increase untrusted validations received
2429 overlay_.reportInboundTraffic(
2430 TrafficCount::category::validation_untrusted, Message::messageSize(*m));
2431
2432 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2433 return;
2434 }
2435
2436 auto key = sha512Half(makeSlice(m->validation()));
2437
2438 auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2439
2440 if (!added)
2441 {
2442 // Count unique messages (Slots has it's own 'HashRouter'), which a
2443 // peer receives within IDLED seconds since the message has been
2444 // relayed.
2445 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
2446 {
2447 overlay_.updateSlotAndSquelch(
2448 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2449 }
2450
2451 // increase duplicate validations received
2452 overlay_.reportInboundTraffic(
2453 TrafficCount::category::validation_duplicate, Message::messageSize(*m));
2454
2455 JLOG(p_journal_.trace()) << "Validation: duplicate";
2456 return;
2457 }
2458
2459 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2460 {
2461 JLOG(p_journal_.debug()) << "Dropping untrusted validation from diverged peer";
2462 }
2463 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2464 {
2465 std::string const name = isTrusted ? "ChkTrust" : "ChkUntrust";
2466
2467 std::weak_ptr<PeerImp> const weak = shared_from_this();
2468 app_.getJobQueue().addJob(
2469 isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, name, [weak, val, m, key]() {
2470 if (auto peer = weak.lock())
2471 peer->checkValidation(val, key, m);
2472 });
2473 }
2474 else
2475 {
2476 JLOG(p_journal_.debug()) << "Dropping untrusted validation for load";
2477 }
2478 }
2479 catch (std::exception const& e)
2480 {
2481 JLOG(p_journal_.warn()) << "Exception processing validation: " << e.what();
2482 using namespace std::string_literals;
2483 fee_.update(Resource::feeMalformedRequest, e.what());
2484 }
2485}
2486
2487void
2489{
2490 protocol::TMGetObjectByHash const& packet = *m;
2491
2492 JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type() << " "
2493 << packet.objects_size();
2494
2495 if (packet.query())
2496 {
2497 // this is a query
2498 if (send_queue_.size() >= Tuning::dropSendQueue)
2499 {
2500 JLOG(p_journal_.debug()) << "GetObject: Large send queue";
2501 return;
2502 }
2503
2504 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2505 {
2506 doFetchPack(m);
2507 return;
2508 }
2509
2510 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2511 {
2512 if (!txReduceRelayEnabled())
2513 {
2514 JLOG(p_journal_.error()) << "TMGetObjectByHash: tx reduce-relay is disabled";
2515 fee_.update(Resource::feeMalformedRequest, "disabled");
2516 return;
2517 }
2518
2519 std::weak_ptr<PeerImp> const weak = shared_from_this();
2520 app_.getJobQueue().addJob(jtREQUESTED_TXN, "DoTxs", [weak, m]() {
2521 if (auto peer = weak.lock())
2522 peer->doTransactions(m);
2523 });
2524 return;
2525 }
2526
2527 protocol::TMGetObjectByHash reply;
2528
2529 reply.set_query(false);
2530
2531 if (packet.has_seq())
2532 reply.set_seq(packet.seq());
2533
2534 reply.set_type(packet.type());
2535
2536 if (packet.has_ledgerhash())
2537 {
2538 if (!stringIsUint256Sized(packet.ledgerhash()))
2539 {
2540 fee_.update(Resource::feeMalformedRequest, "ledger hash");
2541 return;
2542 }
2543
2544 reply.set_ledgerhash(packet.ledgerhash());
2545 }
2546
2547 fee_.update(Resource::feeModerateBurdenPeer, " received a get object by hash request");
2548
2549 // This is a very minimal implementation
2550 for (int i = 0; i < packet.objects_size(); ++i)
2551 {
2552 auto const& obj = packet.objects(i);
2553 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2554 {
2555 uint256 const hash{obj.hash()};
2556 // VFALCO TODO Move this someplace more sensible so we dont
2557 // need to inject the NodeStore interfaces.
2558 std::uint32_t const seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2559 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2560 if (nodeObject)
2561 {
2562 protocol::TMIndexedObject& newObj = *reply.add_objects();
2563 newObj.set_hash(hash.begin(), hash.size());
2564 newObj.set_data(&nodeObject->getData().front(), nodeObject->getData().size());
2565
2566 if (obj.has_nodeid())
2567 newObj.set_index(obj.nodeid());
2568 if (obj.has_ledgerseq())
2569 newObj.set_ledgerseq(obj.ledgerseq());
2570
2571 // VFALCO NOTE "seq" in the message is obsolete
2572
2573 // Check if by adding this object, reply has reached its
2574 // limit
2575 if (reply.objects_size() >= Tuning::hardMaxReplyNodes)
2576 {
2577 fee_.update(
2578 Resource::feeModerateBurdenPeer,
2579 " Reply limit reached. Truncating reply.");
2580 break;
2581 }
2582 }
2583 }
2584 }
2585
2586 JLOG(p_journal_.trace()) << "GetObj: " << reply.objects_size() << " of "
2587 << packet.objects_size();
2588 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2589 }
2590 else
2591 {
2592 // this is a reply
2593 std::uint32_t pLSeq = 0;
2594 bool pLDo = true;
2595 bool progress = false;
2596
2597 for (int i = 0; i < packet.objects_size(); ++i)
2598 {
2599 protocol::TMIndexedObject const& obj = packet.objects(i);
2600
2601 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2602 {
2603 if (obj.has_ledgerseq())
2604 {
2605 if (obj.ledgerseq() != pLSeq)
2606 {
2607 if (pLDo && (pLSeq != 0))
2608 {
2609 JLOG(p_journal_.debug()) << "GetObj: Full fetch pack for " << pLSeq;
2610 }
2611 pLSeq = obj.ledgerseq();
2612 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2613
2614 if (!pLDo)
2615 {
2616 JLOG(p_journal_.debug()) << "GetObj: Late fetch pack for " << pLSeq;
2617 }
2618 else
2619 {
2620 progress = true;
2621 }
2622 }
2623 }
2624
2625 if (pLDo)
2626 {
2627 uint256 const hash{obj.hash()};
2628
2629 app_.getLedgerMaster().addFetchPack(
2630 hash, std::make_shared<Blob>(obj.data().begin(), obj.data().end()));
2631 }
2632 }
2633 }
2634
2635 if (pLDo && (pLSeq != 0))
2636 {
2637 JLOG(p_journal_.debug()) << "GetObj: Partial fetch pack for " << pLSeq;
2638 }
2639 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2640 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2641 }
2642}
2643
2644void
2646{
2647 if (!txReduceRelayEnabled())
2648 {
2649 JLOG(p_journal_.error()) << "TMHaveTransactions: tx reduce-relay is disabled";
2650 fee_.update(Resource::feeMalformedRequest, "disabled");
2651 return;
2652 }
2653
2654 std::weak_ptr<PeerImp> const weak = shared_from_this();
2655 app_.getJobQueue().addJob(jtMISSING_TXN, "HandleHaveTxs", [weak, m]() {
2656 if (auto peer = weak.lock())
2657 peer->handleHaveTransactions(m);
2658 });
2659}
2660
2661void
2662PeerImp::handleHaveTransactions(std::shared_ptr<protocol::TMHaveTransactions> const& m)
2663{
2664 protocol::TMGetObjectByHash tmBH;
2665 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2666 tmBH.set_query(true);
2667
2668 JLOG(p_journal_.trace()) << "received TMHaveTransactions " << m->hashes_size();
2669
2670 for (std::uint32_t i = 0; i < m->hashes_size(); i++)
2671 {
2672 if (!stringIsUint256Sized(m->hashes(i)))
2673 {
2674 JLOG(p_journal_.error()) << "TMHaveTransactions with invalid hash size";
2675 fee_.update(Resource::feeMalformedRequest, "hash size");
2676 return;
2677 }
2678
2679 uint256 hash(m->hashes(i));
2680
2681 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2682
2683 JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn;
2684
2685 if (!txn)
2686 {
2687 JLOG(p_journal_.debug()) << "adding transaction to request";
2688
2689 auto obj = tmBH.add_objects();
2690 obj->set_hash(hash.data(), hash.size());
2691 }
2692 else
2693 {
2694 // Erase only if a peer has seen this tx. If the peer has not
2695 // seen this tx then the tx could not has been queued for this
2696 // peer.
2697 removeTxQueue(hash);
2698 }
2699 }
2700
2701 JLOG(p_journal_.trace()) << "transaction request object is " << tmBH.objects_size();
2702
2703 if (tmBH.objects_size() > 0)
2704 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2705}
2706
2707void
2709{
2710 if (!txReduceRelayEnabled())
2711 {
2712 JLOG(p_journal_.error()) << "TMTransactions: tx reduce-relay is disabled";
2713 fee_.update(Resource::feeMalformedRequest, "disabled");
2714 return;
2715 }
2716
2717 JLOG(p_journal_.trace()) << "received TMTransactions " << m->transactions_size();
2718
2719 overlay_.addTxMetrics(m->transactions_size());
2720
2721 for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
2722 {
2723 handleTransaction(
2725 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2726 false,
2727 true);
2728 }
2729}
2730
2731void
2732PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
2733{
2734 using on_message_fn = void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
2735 if (!strand_.running_in_this_thread())
2736 {
2737 post(strand_, std::bind((on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2738 return;
2739 }
2740
2741 if (!m->has_validatorpubkey())
2742 {
2743 fee_.update(Resource::feeInvalidData, "squelch no pubkey");
2744 return;
2745 }
2746 auto validator = m->validatorpubkey();
2747 auto const slice{makeSlice(validator)};
2748 if (!publicKeyType(slice))
2749 {
2750 fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
2751 return;
2752 }
2753 PublicKey const key(slice);
2754
2755 // Ignore the squelch for validator's own messages.
2756 if (key == app_.getValidationPublicKey())
2757 {
2758 JLOG(p_journal_.debug()) << "onMessage: TMSquelch discarding validator's squelch " << slice;
2759 return;
2760 }
2761
2762 std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
2763 if (!m->squelch())
2764 {
2765 squelch_.removeSquelch(key);
2766 }
2767 else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
2768 {
2769 fee_.update(Resource::feeInvalidData, "squelch duration");
2770 }
2771
2772 JLOG(p_journal_.debug()) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
2773}
2774
2775//--------------------------------------------------------------------------
2776
2777void
2778PeerImp::addLedger(uint256 const& hash, std::lock_guard<std::mutex> const& lockedRecentLock)
2779{
2780 // lockedRecentLock is passed as a reminder that recentLock_ must be
2781 // locked by the caller.
2782 (void)lockedRecentLock;
2783
2784 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) != recentLedgers_.end())
2785 return;
2786
2787 recentLedgers_.push_back(hash);
2788}
2789
2790void
2791PeerImp::doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
2792{
2793 // VFALCO TODO Invert this dependency using an observer and shared state
2794 // object. Don't queue fetch pack jobs if we're under load or we already
2795 // have some queued.
2796 if (app_.getFeeTrack().isLoadedLocal() ||
2797 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2798 (app_.getJobQueue().getJobCount(jtPACK) > 10))
2799 {
2800 JLOG(p_journal_.info()) << "Too busy to make fetch pack";
2801 return;
2802 }
2803
2804 if (!stringIsUint256Sized(packet->ledgerhash()))
2805 {
2806 JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
2807 fee_.update(Resource::feeMalformedRequest, "hash size");
2808 return;
2809 }
2810
2811 fee_.fee = Resource::feeHeavyBurdenPeer;
2812
2813 uint256 const hash{packet->ledgerhash()};
2814
2815 std::weak_ptr<PeerImp> const weak = shared_from_this();
2816 auto elapsed = UptimeClock::now();
2817 auto const pap = &app_;
2818 app_.getJobQueue().addJob(jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2819 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2820 });
2821}
2822
2823void
2824PeerImp::doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
2825{
2826 protocol::TMTransactions reply;
2827
2828 JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
2829 << packet->objects_size();
2830
2831 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2832 {
2833 JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
2834 fee_.update(Resource::feeMalformedRequest, "too big");
2835 return;
2836 }
2837
2838 for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
2839 {
2840 auto const& obj = packet->objects(i);
2841
2842 if (!stringIsUint256Sized(obj.hash()))
2843 {
2844 fee_.update(Resource::feeMalformedRequest, "hash size");
2845 return;
2846 }
2847
2848 uint256 hash(obj.hash());
2849
2850 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2851
2852 if (!txn)
2853 {
2854 JLOG(p_journal_.error())
2855 << "doTransactions, transaction not found " << Slice(hash.data(), hash.size());
2856 fee_.update(Resource::feeMalformedRequest, "tx not found");
2857 return;
2858 }
2859
2860 Serializer s;
2861 auto tx = reply.add_transactions();
2862 auto sttx = txn->getSTransaction();
2863 sttx->add(s);
2864 tx->set_rawtransaction(s.data(), s.size());
2865 tx->set_status(txn->getStatus() == INCLUDED ? protocol::tsCURRENT : protocol::tsNEW);
2866 tx->set_receivetimestamp(app_.getTimeKeeper().now().time_since_epoch().count());
2867 tx->set_deferred(txn->getSubmitResult().queued);
2868 }
2869
2870 if (reply.transactions_size() > 0)
2871 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2872}
2873
2874void
2875PeerImp::checkTransaction(
2876 HashRouterFlags flags,
2877 bool checkSignature,
2878 std::shared_ptr<STTx const> const& stx,
2879 bool batch)
2880{
2881 // VFALCO TODO Rewrite to not use exceptions
2882 try
2883 {
2884 // charge strongly for relaying batch txns
2885 // LCOV_EXCL_START
2886 /*
2887 There is no need to check whether the featureBatch amendment is
2888 enabled.
2889
2890 * If the `tfInnerBatchTxn` flag is set, and the amendment is
2891 enabled, then it's an invalid transaction because inner batch
2892 transactions should not be relayed.
2893 * If the `tfInnerBatchTxn` flag is set, and the amendment is *not*
2894 enabled, then the transaction is malformed because it's using an
2895 "unknown" flag. There's no need to waste the resources to send it
2896 to the transaction engine.
2897
2898 We don't normally check transaction validity at this level, but
2899 since we _need_ to check it when the amendment is enabled, we may as
2900 well drop it if the flag is set regardless.
2901 */
2902 if (stx->isFlag(tfInnerBatchTxn))
2903 {
2904 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
2905 "tfInnerBatchTxn (checkSignature).";
2906 charge(Resource::feeModerateBurdenPeer, "inner batch txn");
2907 return;
2908 }
2909 // LCOV_EXCL_STOP
2910
2911 // Expired?
2912 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2913 (stx->getFieldU32(sfLastLedgerSequence) < app_.getLedgerMaster().getValidLedgerIndex()))
2914 {
2915 JLOG(p_journal_.info()) << "Marking transaction " << stx->getTransactionID()
2916 << "as BAD because it's expired";
2917 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2918 charge(Resource::feeUselessData, "expired tx");
2919 return;
2920 }
2921
2922 if (isPseudoTx(*stx))
2923 {
2924 // Don't do anything with pseudo transactions except put them in the
2925 // TransactionMaster cache
2926 std::string reason;
2927 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2928 XRPL_ASSERT(
2929 tx->getStatus() == NEW,
2930 "xrpl::PeerImp::checkTransaction Transaction created "
2931 "correctly");
2932 if (tx->getStatus() == NEW)
2933 {
2934 JLOG(p_journal_.debug()) << "Processing " << (batch ? "batch" : "unsolicited")
2935 << " pseudo-transaction tx " << tx->getID();
2936
2937 app_.getMasterTransaction().canonicalize(&tx);
2938 // Tell the overlay about it, but don't relay it.
2939 auto const toSkip = app_.getHashRouter().shouldRelay(tx->getID());
2940 if (toSkip)
2941 {
2942 JLOG(p_journal_.debug())
2943 << "Passing skipped pseudo pseudo-transaction tx " << tx->getID();
2944 app_.getOverlay().relay(tx->getID(), {}, *toSkip);
2945 }
2946 if (!batch)
2947 {
2948 JLOG(p_journal_.debug())
2949 << "Charging for pseudo-transaction tx " << tx->getID();
2950 charge(Resource::feeUselessData, "pseudo tx");
2951 }
2952
2953 return;
2954 }
2955 }
2956
2957 if (checkSignature)
2958 {
2959 // Check the signature before handing off to the job queue.
2960 if (auto [valid, validReason] = checkValidity(
2961 app_.getHashRouter(), *stx, app_.getLedgerMaster().getValidatedRules());
2962 valid != Validity::Valid)
2963 {
2964 if (!validReason.empty())
2965 {
2966 JLOG(p_journal_.debug()) << "Exception checking transaction: " << validReason;
2967 }
2968
2969 // Probably not necessary to set HashRouterFlags::BAD, but
2970 // doesn't hurt.
2971 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2972 charge(Resource::feeInvalidSignature, "check transaction signature failure");
2973 return;
2974 }
2975 }
2976 else
2977 {
2978 forceValidity(app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2979 }
2980
2981 std::string reason;
2982 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2983
2984 if (tx->getStatus() == INVALID)
2985 {
2986 if (!reason.empty())
2987 {
2988 JLOG(p_journal_.debug()) << "Exception checking transaction: " << reason;
2989 }
2990 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2991 charge(Resource::feeInvalidSignature, "tx (impossible)");
2992 return;
2993 }
2994
2995 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
2996 app_.getOPs().processTransaction(tx, trusted, false, NetworkOPs::FailHard::no);
2997 }
2998 catch (std::exception const& ex)
2999 {
3000 JLOG(p_journal_.warn()) << "Exception in " << __func__ << ": " << ex.what();
3001 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
3002 using namespace std::string_literals;
3003 charge(Resource::feeInvalidData, "tx "s + ex.what());
3004 }
3005}
3006
3007// Called from our JobQueue
3008void
3009PeerImp::checkPropose(
3010 bool isTrusted,
3012 RCLCxPeerPos peerPos)
3013{
3014 JLOG(p_journal_.trace()) << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal";
3015
3016 XRPL_ASSERT(packet, "xrpl::PeerImp::checkPropose : non-null packet");
3017
3018 if (!cluster() && !peerPos.checkSign())
3019 {
3020 std::string const desc{"Proposal fails sig check"};
3021 JLOG(p_journal_.warn()) << desc;
3022 charge(Resource::feeInvalidSignature, desc);
3023 return;
3024 }
3025
3026 bool relay = false;
3027
3028 if (isTrusted)
3029 {
3030 relay = app_.getOPs().processTrustedProposal(peerPos);
3031 }
3032 else
3033 {
3034 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3035 }
3036
3037 if (relay)
3038 {
3039 // haveMessage contains peers, which are suppressed; i.e. the peers
3040 // are the source of the message, consequently the message should
3041 // not be relayed to these peers. But the message must be counted
3042 // as part of the squelch logic.
3043 auto haveMessage =
3044 app_.getOverlay().relay(*packet, peerPos.suppressionID(), peerPos.publicKey());
3045 if (!haveMessage.empty())
3046 {
3047 overlay_.updateSlotAndSquelch(
3048 peerPos.suppressionID(),
3049 peerPos.publicKey(),
3050 std::move(haveMessage),
3051 protocol::mtPROPOSE_LEDGER);
3052 }
3053 }
3054}
3055
3056void
3057PeerImp::checkValidation(
3059 uint256 const& key,
3061{
3062 if (!val->isValid())
3063 {
3064 std::string const desc{"Validation forwarded by peer is invalid"};
3065 JLOG(p_journal_.debug()) << desc;
3066 charge(Resource::feeInvalidSignature, desc);
3067 return;
3068 }
3069
3070 // FIXME it should be safe to remove this try/catch. Investigate codepaths.
3071 try
3072 {
3073 if (app_.getOPs().recvValidation(val, std::to_string(id())) || cluster())
3074 {
3075 // haveMessage contains peers, which are suppressed; i.e. the peers
3076 // are the source of the message, consequently the message should
3077 // not be relayed to these peers. But the message must be counted
3078 // as part of the squelch logic.
3079 auto haveMessage = overlay_.relay(*packet, key, val->getSignerPublic());
3080 if (!haveMessage.empty())
3081 {
3082 overlay_.updateSlotAndSquelch(
3083 key, val->getSignerPublic(), std::move(haveMessage), protocol::mtVALIDATION);
3084 }
3085 }
3086 }
3087 catch (std::exception const& ex)
3088 {
3089 JLOG(p_journal_.trace()) << "Exception processing validation: " << ex.what();
3090 using namespace std::string_literals;
3091 charge(Resource::feeMalformedRequest, "validation "s + ex.what());
3092 }
3093}
3094
3095// Returns the set of peers that can help us get
3096// the TX tree with the specified root hash.
3097//
3099getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
3100{
3102 int retScore = 0;
3103
3105 if (p->hasTxSet(rootHash) && p.get() != skip)
3106 {
3107 auto score = p->getScore(true);
3108 if (!ret || (score > retScore))
3109 {
3110 ret = std::move(p);
3111 retScore = score;
3112 }
3113 }
3114 });
3115
3116 return ret;
3117}
3118
3119// Returns a random peer weighted by how likely to
3120// have the ledger and how responsive it is.
3121//
3124 OverlayImpl& ov,
3125 uint256 const& ledgerHash,
3126 LedgerIndex ledger,
3127 PeerImp const* skip)
3128{
3130 int retScore = 0;
3131
3133 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3134 {
3135 auto score = p->getScore(true);
3136 if (!ret || (score > retScore))
3137 {
3138 ret = std::move(p);
3139 retScore = score;
3140 }
3141 }
3142 });
3143
3144 return ret;
3145}
3146
3147void
3148PeerImp::sendLedgerBase(
3149 std::shared_ptr<Ledger const> const& ledger,
3150 protocol::TMLedgerData& ledgerData)
3151{
3152 JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
3153
3154 Serializer s(sizeof(LedgerHeader));
3155 addRaw(ledger->header(), s);
3156 ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
3157
3158 auto const& stateMap{ledger->stateMap()};
3159 if (stateMap.getHash() != beast::zero)
3160 {
3161 // Return account state root node if possible
3162 Serializer root(768);
3163
3164 stateMap.serializeRoot(root);
3165 ledgerData.add_nodes()->set_nodedata(root.getDataPtr(), root.getLength());
3166
3167 if (ledger->header().txHash != beast::zero)
3168 {
3169 auto const& txMap{ledger->txMap()};
3170 if (txMap.getHash() != beast::zero)
3171 {
3172 // Return TX root node if possible
3173 root.erase();
3174 txMap.serializeRoot(root);
3175 ledgerData.add_nodes()->set_nodedata(root.getDataPtr(), root.getLength());
3176 }
3177 }
3178 }
3179
3180 auto message{std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3181 send(message);
3182}
3183
3185PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
3186{
3187 JLOG(p_journal_.trace()) << "getLedger: Ledger";
3188
3190
3191 if (m->has_ledgerhash())
3192 {
3193 // Attempt to find ledger by hash
3194 uint256 const ledgerHash{m->ledgerhash()};
3195 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3196 if (!ledger)
3197 {
3198 JLOG(p_journal_.trace()) << "getLedger: Don't have ledger with hash " << ledgerHash;
3199
3200 if (m->has_querytype() && !m->has_requestcookie())
3201 {
3202 // Attempt to relay the request to a peer
3203 if (auto const peer = getPeerWithLedger(
3204 overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0, this))
3205 {
3206 m->set_requestcookie(id());
3207 peer->send(std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3208 JLOG(p_journal_.debug()) << "getLedger: Request relayed to peer";
3209 return ledger;
3210 }
3211
3212 JLOG(p_journal_.trace()) << "getLedger: Failed to find peer to relay request";
3213 }
3214 }
3215 }
3216 else if (m->has_ledgerseq())
3217 {
3218 // Attempt to find ledger by sequence
3219 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3220 {
3221 JLOG(p_journal_.debug()) << "getLedger: Early ledger sequence request";
3222 }
3223 else
3224 {
3225 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3226 if (!ledger)
3227 {
3228 JLOG(p_journal_.debug())
3229 << "getLedger: Don't have ledger with sequence " << m->ledgerseq();
3230 }
3231 }
3232 }
3233 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3234 {
3235 ledger = app_.getLedgerMaster().getClosedLedger();
3236 }
3237
3238 if (ledger)
3239 {
3240 // Validate retrieved ledger sequence
3241 auto const ledgerSeq{ledger->header().seq};
3242 if (m->has_ledgerseq())
3243 {
3244 if (ledgerSeq != m->ledgerseq())
3245 {
3246 // Do not resource charge a peer responding to a relay
3247 if (!m->has_requestcookie())
3248 charge(Resource::feeMalformedRequest, "get_ledger ledgerSeq");
3249
3250 ledger.reset();
3251 JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence " << ledgerSeq;
3252 }
3253 }
3254 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3255 {
3256 ledger.reset();
3257 JLOG(p_journal_.debug()) << "getLedger: Early ledger sequence request " << ledgerSeq;
3258 }
3259 }
3260 else
3261 {
3262 JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
3263 }
3264
3265 return ledger;
3266}
3267
3269PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
3270{
3271 JLOG(p_journal_.trace()) << "getTxSet: TX set";
3272
3273 uint256 const txSetHash{m->ledgerhash()};
3274 std::shared_ptr<SHAMap> shaMap{app_.getInboundTransactions().getSet(txSetHash, false)};
3275 if (!shaMap)
3276 {
3277 if (m->has_querytype() && !m->has_requestcookie())
3278 {
3279 // Attempt to relay the request to a peer
3280 if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
3281 {
3282 m->set_requestcookie(id());
3283 peer->send(std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3284 JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
3285 }
3286 else
3287 {
3288 JLOG(p_journal_.debug()) << "getTxSet: Failed to find relay peer";
3289 }
3290 }
3291 else
3292 {
3293 JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
3294 }
3295 }
3296
3297 return shaMap;
3298}
3299
3300void
3301PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
3302{
3303 // Do not resource charge a peer responding to a relay
3304 if (!m->has_requestcookie())
3305 charge(Resource::feeModerateBurdenPeer, "received a get ledger request");
3306
3309 SHAMap const* map{nullptr};
3310 protocol::TMLedgerData ledgerData;
3311 bool fatLeaves{true};
3312 auto const itype{m->itype()};
3313
3314 if (itype == protocol::liTS_CANDIDATE)
3315 {
3316 if (sharedMap = getTxSet(m); !sharedMap)
3317 return;
3318 map = sharedMap.get();
3319
3320 // Fill out the reply
3321 ledgerData.set_ledgerseq(0);
3322 ledgerData.set_ledgerhash(m->ledgerhash());
3323 ledgerData.set_type(protocol::liTS_CANDIDATE);
3324 if (m->has_requestcookie())
3325 ledgerData.set_requestcookie(m->requestcookie());
3326
3327 // We'll already have most transactions
3328 fatLeaves = false;
3329 }
3330 else
3331 {
3332 if (send_queue_.size() >= Tuning::dropSendQueue)
3333 {
3334 JLOG(p_journal_.debug()) << "processLedgerRequest: Large send queue";
3335 return;
3336 }
3337 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3338 {
3339 JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy";
3340 return;
3341 }
3342
3343 if (ledger = getLedger(m); !ledger)
3344 return;
3345
3346 // Fill out the reply
3347 auto const ledgerHash{ledger->header().hash};
3348 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3349 ledgerData.set_ledgerseq(ledger->header().seq);
3350 ledgerData.set_type(itype);
3351 if (m->has_requestcookie())
3352 ledgerData.set_requestcookie(m->requestcookie());
3353
3354 switch (itype)
3355 {
3356 case protocol::liBASE:
3357 sendLedgerBase(ledger, ledgerData);
3358 return;
3359
3360 case protocol::liTX_NODE:
3361 map = &ledger->txMap();
3362 JLOG(p_journal_.trace())
3363 << "processLedgerRequest: TX map hash " << to_string(map->getHash());
3364 break;
3365
3366 case protocol::liAS_NODE:
3367 map = &ledger->stateMap();
3368 JLOG(p_journal_.trace())
3369 << "processLedgerRequest: Account state map hash " << to_string(map->getHash());
3370 break;
3371
3372 default:
3373 // This case should not be possible here
3374 JLOG(p_journal_.error()) << "processLedgerRequest: Invalid ledger info type";
3375 return;
3376 }
3377 }
3378
3379 if (map == nullptr)
3380 {
3381 JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map";
3382 return;
3383 }
3384
3385 // Add requested node data to reply
3386 if (m->nodeids_size() > 0)
3387 {
3388 std::uint32_t const defaultDepth = isHighLatency() ? 2 : 1;
3389 auto const queryDepth{m->has_querydepth() ? m->querydepth() : defaultDepth};
3390
3392
3393 for (int i = 0;
3394 i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3395 ++i)
3396 {
3397 auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
3398
3399 data.clear();
3400 data.reserve(Tuning::softMaxReplyNodes);
3401
3402 try
3403 {
3404 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3405 {
3406 JLOG(p_journal_.trace())
3407 << "processLedgerRequest: getNodeFat got " << data.size() << " nodes";
3408
3409 for (auto const& d : data)
3410 {
3411 if (ledgerData.nodes_size() >= Tuning::hardMaxReplyNodes)
3412 break;
3413 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3414 node->set_nodeid(d.first.getRawString());
3415 node->set_nodedata(d.second.data(), d.second.size());
3416 }
3417 }
3418 else
3419 {
3420 JLOG(p_journal_.warn()) << "processLedgerRequest: getNodeFat returns false";
3421 }
3422 }
3423 catch (std::exception const& e)
3424 {
3425 std::string info;
3426 switch (itype)
3427 {
3428 case protocol::liBASE:
3429 // This case should not be possible here
3430 info = "Ledger base";
3431 break;
3432
3433 case protocol::liTX_NODE:
3434 info = "TX node";
3435 break;
3436
3437 case protocol::liAS_NODE:
3438 info = "AS node";
3439 break;
3440
3441 case protocol::liTS_CANDIDATE:
3442 info = "TS candidate";
3443 break;
3444
3445 default:
3446 info = "Invalid";
3447 break;
3448 }
3449
3450 if (!m->has_ledgerhash())
3451 info += ", no hash specified";
3452
3453 JLOG(p_journal_.warn())
3454 << "processLedgerRequest: getNodeFat with nodeId " << *shaMapNodeId
3455 << " and ledger info type " << info << " throws exception: " << e.what();
3456 }
3457 }
3458
3459 JLOG(p_journal_.info()) << "processLedgerRequest: Got request for " << m->nodeids_size()
3460 << " nodes at depth " << queryDepth << ", return "
3461 << ledgerData.nodes_size() << " nodes";
3462 }
3463
3464 if (ledgerData.nodes_size() == 0)
3465 return;
3466
3467 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3468}
3469
3470int
3471PeerImp::getScore(bool haveItem) const
3472{
3473 // Random component of score, used to break ties and avoid
3474 // overloading the "best" peer
3475 static int const spRandomMax = 9999;
3476
3477 // Score for being very likely to have the thing we are
3478 // look for; should be roughly spRandomMax
3479 static int const spHaveItem = 10000;
3480
3481 // Score reduction for each millisecond of latency; should
3482 // be roughly spRandomMax divided by the maximum reasonable
3483 // latency
3484 static int const spLatency = 30;
3485
3486 // Penalty for unknown latency; should be roughly spRandomMax
3487 static int const spNoLatency = 8000;
3488
3489 int score = rand_int(spRandomMax);
3490
3491 if (haveItem)
3492 score += spHaveItem;
3493
3495 {
3496 std::lock_guard const sl(recentLock_);
3497 latency = latency_;
3498 }
3499
3500 if (latency)
3501 {
3502 score -= latency->count() * spLatency;
3503 }
3504 else
3505 {
3506 score -= spNoLatency;
3507 }
3508
3509 return score;
3510}
3511
3512bool
3513PeerImp::isHighLatency() const
3514{
3515 std::lock_guard const sl(recentLock_);
3516 return latency_ >= peerHighLatency;
3517}
3518
3519void
3520PeerImp::Metrics::add_message(std::uint64_t bytes)
3521{
3522 using namespace std::chrono_literals;
3523 std::unique_lock const lock{mutex_};
3524
3525 totalBytes_ += bytes;
3526 accumBytes_ += bytes;
3527 auto const timeElapsed = clock_type::now() - intervalStart_;
3528 auto const timeElapsedInSecs = std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3529
3530 if (timeElapsedInSecs >= 1s)
3531 {
3532 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3533 rollingAvg_.push_back(avgBytes);
3534
3535 auto const totalBytes = std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3536 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3537
3538 intervalStart_ = clock_type::now();
3539 accumBytes_ = 0;
3540 }
3541}
3542
3544PeerImp::Metrics::average_bytes() const
3545{
3546 std::shared_lock const lock{mutex_};
3547 return rollingAvgBytes_;
3548}
3549
3551PeerImp::Metrics::total_bytes() const
3552{
3553 std::shared_lock const lock{mutex_};
3554 return totalBytes_;
3555}
3556
3557} // namespace xrpl
T accumulate(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:55
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:48
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Definition Journal.h:287
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual Config & config()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
Definition Cluster.cpp:38
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
Definition Cluster.cpp:64
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:30
int MAX_TRANSACTIONS
Definition Config.h:211
std::chrono::seconds MAX_DIVERGED_TIME
Definition Config.h:269
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:250
std::chrono::seconds MAX_UNKNOWN_TIME
Definition Config.h:266
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:233
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:105
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:142
std::chrono::seconds getValidatedLedgerAge()
LedgerIndex getValidLedgerIndex()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:34
virtual bool isNeedNetworkLedger()=0
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
PeerFinder::Manager & peerFinder()
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void onPeerDeactivate(Peer::id_t id)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
Definition PeerImp.h:95
std::optional< std::chrono::milliseconds > latency_
Definition PeerImp.h:156
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
Definition PeerImp.cpp:310
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
Definition PeerImp.cpp:371
std::unique_ptr< LoadEvent > load_event_
Definition PeerImp.h:232
beast::Journal const p_journal_
Definition PeerImp.h:118
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Definition PeerImp.cpp:1160
ProtocolVersion protocol_
Definition PeerImp.h:137
bool txReduceRelayEnabled_
Definition PeerImp.h:244
void close()
Forcibly closes the underlying socket connection.
Definition PeerImp.cpp:658
bool readPending_
Definition PeerImp.h:226
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
Definition PeerImp.cpp:1352
http_request_type request_
Definition PeerImp.h:214
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
Definition PeerImp.cpp:329
std::string name() const
Definition PeerImp.cpp:880
bool txReduceRelayEnabled() const override
Definition PeerImp.h:486
std::shared_ptr< PeerFinder::Slot > const slot_
Definition PeerImp.h:212
boost::beast::http::fields const & headers_
Definition PeerImp.h:216
Compressed compressionEnabled_
Definition PeerImp.h:237
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
Definition PeerImp.cpp:710
boost::system::error_code error_code
Definition PeerImp.h:102
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
Definition PeerImp.cpp:597
LedgerIndex minLedger_
Definition PeerImp.h:148
id_t const id_
Definition PeerImp.h:112
std::string const & fingerprint() const override
Definition PeerImp.h:667
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
Definition PeerImp.cpp:342
bool ledgerReplayEnabled_
Definition PeerImp.h:246
void sendTxQueue() override
Send aggregated transactions' hashes.
Definition PeerImp.cpp:289
uint256 closedLedgerHash_
Definition PeerImp.h:150
reduce_relay::Squelch< UptimeClock > squelch_
Definition PeerImp.h:161
stream_type & stream_
Definition PeerImp.h:121
PeerImp(PeerImp const &)=delete
void cycleStatus() override
Definition PeerImp.cpp:540
std::shared_mutex nameMutex_
Definition PeerImp.h:144
socket_type & socket_
Definition PeerImp.h:120
beast::IP::Endpoint const remote_address_
Definition PeerImp.h:129
int large_sendq_
Definition PeerImp.h:231
std::string domain() const
Definition PeerImp.cpp:887
std::atomic< Tracking > tracking_
Definition PeerImp.h:139
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
Definition PeerImp.cpp:524
clock_type::duration uptime() const
Definition PeerImp.h:417
LedgerIndex maxLedger_
Definition PeerImp.h:149
Application & app_
Definition PeerImp.h:111
PublicKey const publicKey_
Definition PeerImp.h:142
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Definition PeerImp.cpp:1113
Json::Value json() override
Definition PeerImp.cpp:379
bool cluster() const override
Returns true if this connection is a member of the cluster.
Definition PeerImp.cpp:365
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:1041
virtual void run()
Definition PeerImp.cpp:134
OverlayImpl & overlay_
Definition PeerImp.h:133
std::queue< std::shared_ptr< Message > > send_queue_
Definition PeerImp.h:217
virtual ~PeerImp()
Definition PeerImp.cpp:111
static std::string makePrefix(std::string const &fingerprint)
Definition PeerImp.cpp:702
ChargeWithContext fee_
Definition PeerImp.h:211
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
Definition PeerImp.cpp:634
void onMessageUnknown(std::uint16_t type)
Definition PeerImp.cpp:1107
protocol::TMStatusChange last_status_
Definition PeerImp.h:209
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:942
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Definition PeerImp.h:98
std::unique_ptr< stream_type > stream_ptr_
Definition PeerImp.h:119
struct xrpl::PeerImp::@22 metrics_
boost::beast::multi_buffer read_buffer_
Definition PeerImp.h:213
void send(std::shared_ptr< Message > const &m) override
Definition PeerImp.cpp:219
hash_set< uint256 > txQueue_
Definition PeerImp.h:242
bool supportsFeature(ProtocolFeature f) const override
Definition PeerImp.cpp:493
void doProtocolStart()
Definition PeerImp.cpp:897
void shutdown()
Initiates the peer disconnection sequence.
Definition PeerImp.cpp:619
bool const inbound_
Definition PeerImp.h:134
boost::asio::strand< boost::asio::executor > strand_
Definition PeerImp.h:122
waitable_timer timer_
Definition PeerImp.h:125
clock_type::time_point lastPingTime_
Definition PeerImp.h:158
boost::circular_buffer< uint256 > recentLedgers_
Definition PeerImp.h:153
std::string name_
Definition PeerImp.h:143
bool hasTxSet(uint256 const &hash) const override
Definition PeerImp.cpp:533
bool writePending_
Definition PeerImp.h:229
boost::circular_buffer< uint256 > recentTxSets_
Definition PeerImp.h:154
clock_type::time_point trackingTime_
Definition PeerImp.h:140
beast::Journal const journal_
Definition PeerImp.h:117
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
Definition PeerImp.cpp:782
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
Definition PeerImp.cpp:550
uint256 previousLedgerHash_
Definition PeerImp.h:151
Resource::Consumer usage_
Definition PeerImp.h:210
bool shutdownStarted_
Definition PeerImp.h:223
std::optional< std::uint32_t > lastPingSeq_
Definition PeerImp.h:157
bool crawl() const
Returns true if this connection will publicly share its IP address.
Definition PeerImp.cpp:356
void stop() override
Definition PeerImp.cpp:196
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
Definition PeerImp.cpp:682
bool shutdown_
Definition PeerImp.h:220
void doAccept()
Definition PeerImp.cpp:796
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
Definition PeerImp.cpp:559
std::mutex recentLock_
Definition PeerImp.h:208
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
Definition PeerImp.cpp:510
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
Definition PeerImp.h:108
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Definition PeerImp.cpp:1153
Represents a peer connection in the overlay.
A public key.
Definition PublicKey.h:42
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
bool checkSign() const
Verify the signing hash of the proposal.
A consumption charge.
Definition Charge.h:10
An endpoint that consumes resources.
Definition Consumer.h:16
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition Consumer.cpp:106
int balance()
Returns the credit balance representing consumption.
Definition Consumer.cpp:118
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
Definition Consumer.cpp:88
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:77
void const * getDataPtr() const
Definition Serializer.h:197
int getLength() const
Definition Serializer.h:207
std::size_t size() const noexcept
Definition Serializer.h:50
void const * data() const noexcept
Definition Serializer.h:56
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & getValidators()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual HashRouter & getHashRouter()=0
virtual Cluster & getCluster()=0
virtual TimeKeeper & getTimeKeeper()=0
An immutable linear range of bytes.
Definition Slice.h:26
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:44
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition base_uint.h:476
pointer data()
Definition base_uint.h:101
static constexpr std::size_t size()
Definition base_uint.h:499
T emplace_back(T... args)
T empty(T... args)
T find(T... args)
T for_each(T... args)
T get(T... args)
T is_same_v
T load(T... args)
T lock(T... args)
T max(T... args)
T min(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
unsigned int UInt
STL namespace.
Charge const feeInvalidData
Charge const feeModerateBurdenPeer
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeTrivialPeer
Charge const feeUselessData
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ sendQueueLogFreq
How often to log send queue size.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ maxQueryDepth
The maximum number of levels to search.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:162
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
constexpr FlagValue tfInnerBatchTxn
Definition TxFlags.h:41
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:204
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
static constexpr char FEATURE_COMPR[]
Definition Handshake.h:118
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:94
@ INVALID
Definition Transaction.h:29
@ INCLUDED
Definition Transaction.h:30
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:10
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules)
Checks transaction signature and local checks.
Definition apply.cpp:21
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
std::string base64_decode(std::string_view data)
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:92
Number root(Number f, unsigned d)
Definition Number.cpp:958
static bool stringIsUint256Sized(std::string const &pBuffStr)
Definition PeerImp.cpp:128
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
Definition PeerImp.cpp:3123
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
@ jtMISSING_TXN
Definition Job.h:42
@ jtLEDGER_REQ
Definition Job.h:38
@ jtTXN_DATA
Definition Job.h:48
@ jtPACK
Definition Job.h:22
@ jtPROPOSAL_ut
Definition Job.h:39
@ jtREPLAY_REQ
Definition Job.h:37
@ jtREQUESTED_TXN
Definition Job.h:43
@ jtVALIDATION_ut
Definition Job.h:33
@ jtPEER
Definition Job.h:59
@ jtTRANSACTION
Definition Job.h:41
@ jtVALIDATION_t
Definition Job.h:50
@ jtPROPOSAL_t
Definition Job.h:53
@ jtMANIFEST
Definition Job.h:34
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
Definition Handshake.h:171
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
Definition PublicKey.h:242
HashRouterFlags
Definition HashRouter.h:14
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
@ manifest
Manifest.
@ proposal
proposal for signing
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
Definition apply.cpp:96
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
static constexpr char FEATURE_LEDGER_REPLAY[]
Definition Handshake.h:124
static constexpr char FEATURE_VPRR[]
Definition Handshake.h:120
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:809
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
Definition PeerImp.cpp:3099
static constexpr char FEATURE_TXRR[]
Definition Handshake.h:122
T nth_element(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Information about the notional ledger backing the view.
std::optional< std::uint32_t > networkID
Definition Overlay.h:52
beast::IP::Address public_ip
Definition Overlay.h:49
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Definition PeerImp.h:196
Describes a single consumer.
Definition Gossip.h:17
beast::IP::Endpoint address
Definition Gossip.h:21
Data format for exchanging consumption information across peers.
Definition Gossip.h:12
std::vector< Item > items
Definition Gossip.h:24
T tie(T... args)
T to_string(T... args)
T what(T... args)