rippled
Loading...
Searching...
No Matches
PeerImp.cpp
1#include <xrpld/app/consensus/RCLValidations.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionMaster.h>
6#include <xrpld/app/misc/HashRouter.h>
7#include <xrpld/app/misc/LoadFeeTrack.h>
8#include <xrpld/app/misc/NetworkOPs.h>
9#include <xrpld/app/misc/Transaction.h>
10#include <xrpld/app/misc/ValidatorList.h>
11#include <xrpld/app/tx/apply.h>
12#include <xrpld/overlay/Cluster.h>
13#include <xrpld/overlay/detail/PeerImp.h>
14#include <xrpld/overlay/detail/Tuning.h>
15
16#include <xrpl/basics/UptimeClock.h>
17#include <xrpl/basics/base64.h>
18#include <xrpl/basics/random.h>
19#include <xrpl/basics/safe_cast.h>
20#include <xrpl/core/PerfLog.h>
21#include <xrpl/protocol/TxFlags.h>
22#include <xrpl/protocol/digest.h>
23
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/beast/core/ostream.hpp>
26
27#include <algorithm>
28#include <chrono>
29#include <memory>
30#include <mutex>
31#include <numeric>
32#include <sstream>
33
34using namespace std::chrono_literals;
35
36namespace xrpl {
37
38namespace {
40std::chrono::milliseconds constexpr peerHighLatency{300};
41
43std::chrono::seconds constexpr peerTimerInterval{60};
44
46std::chrono::seconds constexpr shutdownTimerInterval{5};
47
48} // namespace
49
50// TODO: Remove this exclusion once unit tests are added after the hotfix
51// release.
52
54 Application& app,
55 id_t id,
57 http_request_type&& request,
58 PublicKey const& publicKey,
60 Resource::Consumer consumer,
62 OverlayImpl& overlay)
63 : Child(overlay)
64 , app_(app)
65 , id_(id)
66 , fingerprint_(
67 getFingerprint(slot->remote_endpoint(), publicKey, to_string(id)))
68 , prefix_(makePrefix(fingerprint_))
69 , sink_(app_.journal("Peer"), prefix_)
70 , p_sink_(app_.journal("Protocol"), prefix_)
71 , journal_(sink_)
72 , p_journal_(p_sink_)
73 , stream_ptr_(std::move(stream_ptr))
74 , socket_(stream_ptr_->next_layer().socket())
75 , stream_(*stream_ptr_)
76 , strand_(boost::asio::make_strand(socket_.get_executor()))
77 , timer_(waitable_timer{socket_.get_executor()})
78 , remote_address_(slot->remote_endpoint())
79 , overlay_(overlay)
80 , inbound_(true)
81 , protocol_(protocol)
82 , tracking_(Tracking::unknown)
83 , trackingTime_(clock_type::now())
84 , publicKey_(publicKey)
85 , lastPingTime_(clock_type::now())
86 , creationTime_(clock_type::now())
87 , squelch_(app_.journal("Squelch"))
88 , usage_(consumer)
89 , fee_{Resource::feeTrivialPeer, ""}
90 , slot_(slot)
91 , request_(std::move(request))
92 , headers_(request_)
93 , compressionEnabled_(
95 headers_,
97 "lz4",
98 app_.config().COMPRESSION)
99 ? Compressed::On
100 : Compressed::Off)
101 , txReduceRelayEnabled_(peerFeatureEnabled(
102 headers_,
104 app_.config().TX_REDUCE_RELAY_ENABLE))
105 , ledgerReplayEnabled_(peerFeatureEnabled(
106 headers_,
108 app_.config().LEDGER_REPLAY))
109 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
110{
111 JLOG(journal_.info())
112 << "compression enabled " << (compressionEnabled_ == Compressed::On)
113 << " vp reduce-relay base squelch enabled "
115 headers_,
118 << " tx reduce-relay enabled " << txReduceRelayEnabled_;
119}
120
122{
123 bool const inCluster{cluster()};
124
129
130 if (inCluster)
131 {
132 JLOG(journal_.warn()) << name() << " left cluster";
133 }
134}
135
136// Helper function to check for valid uint256 values in protobuf buffers
137static bool
139{
140 return pBuffStr.size() == uint256::size();
141}
142
143void
145{
146 if (!strand_.running_in_this_thread())
148
149 auto parseLedgerHash =
151 if (uint256 ret; ret.parseHex(value))
152 return ret;
153
154 if (auto const s = base64_decode(value); s.size() == uint256::size())
155 return uint256{s};
156
157 return std::nullopt;
158 };
159
161 std::optional<uint256> previous;
162
163 if (auto const iter = headers_.find("Closed-Ledger");
164 iter != headers_.end())
165 {
166 closed = parseLedgerHash(iter->value());
167
168 if (!closed)
169 fail("Malformed handshake data (1)");
170 }
171
172 if (auto const iter = headers_.find("Previous-Ledger");
173 iter != headers_.end())
174 {
175 previous = parseLedgerHash(iter->value());
176
177 if (!previous)
178 fail("Malformed handshake data (2)");
179 }
180
181 if (previous && !closed)
182 fail("Malformed handshake data (3)");
183
184 {
186 if (closed)
187 closedLedgerHash_ = *closed;
188 if (previous)
189 previousLedgerHash_ = *previous;
190 }
191
192 if (inbound_)
193 doAccept();
194 else
196
197 // Anything else that needs to be done with the connection should be
198 // done in doProtocolStart
199}
200
201void
203{
204 if (!strand_.running_in_this_thread())
206
207 if (!socket_.is_open())
208 return;
209
210 // The rationale for using different severity levels is that
211 // outbound connections are under our control and may be logged
212 // at a higher level, but inbound connections are more numerous and
213 // uncontrolled so to prevent log flooding the severity is reduced.
214 JLOG(journal_.debug()) << "stop: Stop";
215
216 shutdown();
217}
218
219//------------------------------------------------------------------------------
220
221void
223{
224 if (!strand_.running_in_this_thread())
225 return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
226
227 if (!socket_.is_open())
228 return;
229
230 // we are in progress of closing the connection
231 if (shutdown_)
232 return tryAsyncShutdown();
233
234 auto validator = m->getValidatorKey();
235 if (validator && !squelch_.expireSquelch(*validator))
236 {
239 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
240 return;
241 }
242
243 // report categorized outgoing traffic
245 safe_cast<TrafficCount::category>(m->getCategory()),
246 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
247
248 // report total outgoing traffic
251 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
252
253 auto sendq_size = send_queue_.size();
254
255 if (sendq_size < Tuning::targetSendQueue)
256 {
257 // To detect a peer that does not read from their
258 // side of the connection, we expect a peer to have
259 // a small senq periodically
260 large_sendq_ = 0;
261 }
262 else if (auto sink = journal_.debug();
263 sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
264 {
265 std::string const n = name();
266 sink << n << " sendq: " << sendq_size;
267 }
268
269 send_queue_.push(m);
270
271 if (sendq_size != 0)
272 return;
273
274 writePending_ = true;
275 boost::asio::async_write(
276 stream_,
277 boost::asio::buffer(
278 send_queue_.front()->getBuffer(compressionEnabled_)),
279 bind_executor(
280 strand_,
281 std::bind(
284 std::placeholders::_1,
285 std::placeholders::_2)));
286}
287
288void
290{
291 if (!strand_.running_in_this_thread())
292 return post(
294
295 if (!txQueue_.empty())
296 {
297 protocol::TMHaveTransactions ht;
298 std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
299 ht.add_hashes(hash.data(), hash.size());
300 });
301 JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
302 txQueue_.clear();
303 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
304 }
305}
306
307void
309{
310 if (!strand_.running_in_this_thread())
311 return post(
313
315 {
316 JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap";
317 sendTxQueue();
318 }
319
320 txQueue_.insert(hash);
321 JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
322}
323
324void
326{
327 if (!strand_.running_in_this_thread())
328 return post(
329 strand_,
331
332 auto removed = txQueue_.erase(hash);
333 JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
334}
335
336void
338{
339 if ((usage_.charge(fee, context) == Resource::drop) &&
340 usage_.disconnect(p_journal_) && strand_.running_in_this_thread())
341 {
342 // Sever the connection
344 fail("charge: Resources");
345 }
346}
347
348//------------------------------------------------------------------------------
349
350bool
352{
353 auto const iter = headers_.find("Crawl");
354 if (iter == headers_.end())
355 return false;
356 return boost::iequals(iter->value(), "public");
357}
358
359bool
361{
362 return static_cast<bool>(app_.cluster().member(publicKey_));
363}
364
367{
368 if (inbound_)
369 return headers_["User-Agent"];
370 return headers_["Server"];
371}
372
375{
377
378 ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
379 ret[jss::address] = remote_address_.to_string();
380
381 if (inbound_)
382 ret[jss::inbound] = true;
383
384 if (cluster())
385 {
386 ret[jss::cluster] = true;
387
388 if (auto const n = name(); !n.empty())
389 // Could move here if Json::Value supported moving from a string
390 ret[jss::name] = n;
391 }
392
393 if (auto const d = domain(); !d.empty())
394 ret[jss::server_domain] = std::string{d};
395
396 if (auto const nid = headers_["Network-ID"]; !nid.empty())
397 ret[jss::network_id] = std::string{nid};
398
399 ret[jss::load] = usage_.balance();
400
401 if (auto const version = getVersion(); !version.empty())
402 ret[jss::version] = std::string{version};
403
404 ret[jss::protocol] = to_string(protocol_);
405
406 {
408 if (latency_)
409 ret[jss::latency] = static_cast<Json::UInt>(latency_->count());
410 }
411
412 ret[jss::uptime] = static_cast<Json::UInt>(
413 std::chrono::duration_cast<std::chrono::seconds>(uptime()).count());
414
415 std::uint32_t minSeq, maxSeq;
416 ledgerRange(minSeq, maxSeq);
417
418 if ((minSeq != 0) || (maxSeq != 0))
419 ret[jss::complete_ledgers] =
420 std::to_string(minSeq) + " - " + std::to_string(maxSeq);
421
422 switch (tracking_.load())
423 {
425 ret[jss::track] = "diverged";
426 break;
427
429 ret[jss::track] = "unknown";
430 break;
431
433 // Nothing to do here
434 break;
435 }
436
437 uint256 closedLedgerHash;
438 protocol::TMStatusChange last_status;
439 {
441 closedLedgerHash = closedLedgerHash_;
442 last_status = last_status_;
443 }
444
445 if (closedLedgerHash != beast::zero)
446 ret[jss::ledger] = to_string(closedLedgerHash);
447
448 if (last_status.has_newstatus())
449 {
450 switch (last_status.newstatus())
451 {
452 case protocol::nsCONNECTING:
453 ret[jss::status] = "connecting";
454 break;
455
456 case protocol::nsCONNECTED:
457 ret[jss::status] = "connected";
458 break;
459
460 case protocol::nsMONITORING:
461 ret[jss::status] = "monitoring";
462 break;
463
464 case protocol::nsVALIDATING:
465 ret[jss::status] = "validating";
466 break;
467
468 case protocol::nsSHUTTING:
469 ret[jss::status] = "shutting";
470 break;
471
472 default:
473 JLOG(p_journal_.warn())
474 << "Unknown status: " << last_status.newstatus();
475 }
476 }
477
478 ret[jss::metrics] = Json::Value(Json::objectValue);
479 ret[jss::metrics][jss::total_bytes_recv] =
480 std::to_string(metrics_.recv.total_bytes());
481 ret[jss::metrics][jss::total_bytes_sent] =
482 std::to_string(metrics_.sent.total_bytes());
483 ret[jss::metrics][jss::avg_bps_recv] =
484 std::to_string(metrics_.recv.average_bytes());
485 ret[jss::metrics][jss::avg_bps_sent] =
486 std::to_string(metrics_.sent.average_bytes());
487
488 return ret;
489}
490
491bool
493{
494 switch (f)
495 {
497 return protocol_ >= make_protocol(2, 1);
499 return protocol_ >= make_protocol(2, 2);
502 }
503 return false;
504}
505
506//------------------------------------------------------------------------------
507
508bool
510{
511 {
513 if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
515 return true;
516 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
517 recentLedgers_.end())
518 return true;
519 }
520 return false;
521}
522
523void
525{
527
528 minSeq = minLedger_;
529 maxSeq = maxLedger_;
530}
531
532bool
533PeerImp::hasTxSet(uint256 const& hash) const
534{
536 return std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
537 recentTxSets_.end();
538}
539
540void
542{
543 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
544 // guarded by recentLock_.
548}
549
550bool
552{
554 return (tracking_ != Tracking::diverged) && (uMin >= minLedger_) &&
555 (uMax <= maxLedger_);
556}
557
558//------------------------------------------------------------------------------
559
560void
562{
563 XRPL_ASSERT(
564 strand_.running_in_this_thread(),
565 "xrpl::PeerImp::fail : strand in this thread");
566
567 if (!socket_.is_open())
568 return;
569
570 JLOG(journal_.warn()) << name << ": " << ec.message();
571
572 shutdown();
573}
574
575void
577{
578 if (!strand_.running_in_this_thread())
579 return post(
580 strand_,
581 std::bind(
582 (void(Peer::*)(std::string const&)) & PeerImp::fail,
584 reason));
585
586 if (!socket_.is_open())
587 return;
588
589 // Call to name() locks, log only if the message will be outputted
591 {
592 std::string const n = name();
593 JLOG(journal_.warn()) << n << " failed: " << reason;
594 }
595
596 shutdown();
597}
598
599void
601{
602 XRPL_ASSERT(
603 strand_.running_in_this_thread(),
604 "xrpl::PeerImp::tryAsyncShutdown : strand in this thread");
605
607 return;
608
610 return;
611
612 shutdownStarted_ = true;
613
614 setTimer(shutdownTimerInterval);
615
616 // gracefully shutdown the SSL socket, performing a shutdown handshake
617 stream_.async_shutdown(bind_executor(
618 strand_,
619 std::bind(
620 &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
621}
622
623void
625{
626 XRPL_ASSERT(
627 strand_.running_in_this_thread(),
628 "xrpl::PeerImp::shutdown: strand in this thread");
629
630 if (!socket_.is_open() || shutdown_)
631 return;
632
633 shutdown_ = true;
634
635 boost::beast::get_lowest_layer(stream_).cancel();
636
638}
639
640void
642{
643 cancelTimer();
644 if (ec)
645 {
646 // - eof: the stream was cleanly closed
647 // - operation_aborted: an expired timer (slow shutdown)
648 // - stream_truncated: the tcp connection closed (no handshake) it could
649 // occur if a peer does not perform a graceful disconnect
650 // - broken_pipe: the peer is gone
651 bool shouldLog =
652 (ec != boost::asio::error::eof &&
653 ec != boost::asio::error::operation_aborted &&
654 ec.message().find("application data after close notify") ==
655 std::string::npos);
656
657 if (shouldLog)
658 {
659 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
660 }
661 }
662
663 close();
664}
665
666void
668{
669 XRPL_ASSERT(
670 strand_.running_in_this_thread(),
671 "xrpl::PeerImp::close : strand in this thread");
672
673 if (!socket_.is_open())
674 return;
675
676 cancelTimer();
677
678 error_code ec;
679 socket_.close(ec);
680
682
683 // The rationale for using different severity levels is that
684 // outbound connections are under our control and may be logged
685 // at a higher level, but inbound connections are more numerous and
686 // uncontrolled so to prevent log flooding the severity is reduced.
687 JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed";
688}
689
690//------------------------------------------------------------------------------
691
692void
694{
695 try
696 {
697 timer_.expires_after(interval);
698 }
699 catch (std::exception const& ex)
700 {
701 JLOG(journal_.error()) << "setTimer: " << ex.what();
702 return shutdown();
703 }
704
705 timer_.async_wait(bind_executor(
706 strand_,
707 std::bind(
708 &PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
709}
710
711//------------------------------------------------------------------------------
712
715{
717 ss << "[" << fingerprint << "] ";
718 return ss.str();
719}
720
721void
723{
724 XRPL_ASSERT(
725 strand_.running_in_this_thread(),
726 "xrpl::PeerImp::onTimer : strand in this thread");
727
728 if (!socket_.is_open())
729 return;
730
731 if (ec)
732 {
733 // do not initiate shutdown, timers are frequently cancelled
734 if (ec == boost::asio::error::operation_aborted)
735 return;
736
737 // This should never happen
738 JLOG(journal_.error()) << "onTimer: " << ec.message();
739 return close();
740 }
741
742 // the timer expired before the shutdown completed
743 // force close the connection
744 if (shutdown_)
745 {
746 JLOG(journal_.debug()) << "onTimer: shutdown timer expired";
747 return close();
748 }
749
751 return fail("Large send queue");
752
753 if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
754 {
755 clock_type::duration duration;
756
757 {
759 duration = clock_type::now() - trackingTime_;
760 }
761
762 if ((t == Tracking::diverged &&
763 (duration > app_.config().MAX_DIVERGED_TIME)) ||
764 (t == Tracking::unknown &&
765 (duration > app_.config().MAX_UNKNOWN_TIME)))
766 {
768 return fail("Not useful");
769 }
770 }
771
772 // Already waiting for PONG
773 if (lastPingSeq_)
774 return fail("Ping Timeout");
775
777 lastPingSeq_ = rand_int<std::uint32_t>();
778
779 protocol::TMPing message;
780 message.set_type(protocol::TMPing::ptPING);
781 message.set_seq(*lastPingSeq_);
782
783 send(std::make_shared<Message>(message, protocol::mtPING));
784
785 setTimer(peerTimerInterval);
786}
787
788void
790{
791 try
792 {
793 timer_.cancel();
794 }
795 catch (std::exception const& ex)
796 {
797 JLOG(journal_.error()) << "cancelTimer: " << ex.what();
798 }
799}
800
801//------------------------------------------------------------------------------
802void
804{
805 XRPL_ASSERT(
806 read_buffer_.size() == 0,
807 "xrpl::PeerImp::doAccept : empty read buffer");
808
809 JLOG(journal_.debug()) << "doAccept";
810
811 // a shutdown was initiated before the handshake, there is nothing to do
812 if (shutdown_)
813 return tryAsyncShutdown();
814
815 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
816
817 // This shouldn't fail since we already computed
818 // the shared value successfully in OverlayImpl
819 if (!sharedValue)
820 return fail("makeSharedValue: Unexpected failure");
821
822 JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_);
823
824 if (auto member = app_.cluster().member(publicKey_))
825 {
826 {
828 name_ = *member;
829 }
830 JLOG(journal_.info()) << "Cluster name: " << *member;
831 }
832
834
835 // XXX Set timer: connection is in grace period to be useful.
836 // XXX Set timer: connection idle (idle may vary depending on connection
837 // type.)
838
840
841 boost::beast::ostream(*write_buffer) << makeResponse(
843 request_,
846 *sharedValue,
848 protocol_,
849 app_);
850
851 // Write the whole buffer and only start protocol when that's done.
852 boost::asio::async_write(
853 stream_,
854 write_buffer->data(),
855 boost::asio::transfer_all(),
856 bind_executor(
857 strand_,
858 [this, write_buffer, self = shared_from_this()](
859 error_code ec, std::size_t bytes_transferred) {
860 if (!socket_.is_open())
861 return;
862 if (ec == boost::asio::error::operation_aborted)
863 return tryAsyncShutdown();
864 if (ec)
865 return fail("onWriteResponse", ec);
866 if (write_buffer->size() == bytes_transferred)
867 return doProtocolStart();
868 return fail("Failed to write header");
869 }));
870}
871
874{
875 std::shared_lock read_lock{nameMutex_};
876 return name_;
877}
878
881{
882 return headers_["Server-Domain"];
883}
884
885//------------------------------------------------------------------------------
886
887// Protocol logic
888
889void
891{
892 // a shutdown was initiated before the handshare, there is nothing to do
893 if (shutdown_)
894 return tryAsyncShutdown();
895
897
898 // Send all the validator lists that have been loaded
900 {
902 [&](std::string const& manifest,
903 std::uint32_t version,
905 PublicKey const& pubKey,
906 std::size_t maxSequence,
907 uint256 const& hash) {
909 *this,
910 0,
911 pubKey,
912 maxSequence,
913 version,
914 manifest,
915 blobInfos,
917 p_journal_);
918
919 // Don't send it next time.
921 });
922 }
923
924 if (auto m = overlay_.getManifestsMessage())
925 send(m);
926
927 setTimer(peerTimerInterval);
928}
929
930// Called repeatedly with protocol message data
931void
933{
934 XRPL_ASSERT(
935 strand_.running_in_this_thread(),
936 "xrpl::PeerImp::onReadMessage : strand in this thread");
937
938 readPending_ = false;
939
940 if (!socket_.is_open())
941 return;
942
943 if (ec)
944 {
945 if (ec == boost::asio::error::eof)
946 {
947 JLOG(journal_.debug()) << "EOF";
948 return shutdown();
949 }
950
951 if (ec == boost::asio::error::operation_aborted)
952 return tryAsyncShutdown();
953
954 return fail("onReadMessage", ec);
955 }
956 // we started shutdown, no reason to process further data
957 if (shutdown_)
958 return tryAsyncShutdown();
959
960 if (auto stream = journal_.trace())
961 {
962 stream << "onReadMessage: "
963 << (bytes_transferred > 0
964 ? to_string(bytes_transferred) + " bytes"
965 : "");
966 }
967
968 metrics_.recv.add_message(bytes_transferred);
969
970 read_buffer_.commit(bytes_transferred);
971
972 auto hint = Tuning::readBufferBytes;
973
974 while (read_buffer_.size() > 0)
975 {
976 std::size_t bytes_consumed;
977
978 using namespace std::chrono_literals;
979 std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
980 [&]() {
981 return invokeProtocolMessage(read_buffer_.data(), *this, hint);
982 },
983 "invokeProtocolMessage",
984 350ms,
985 journal_);
986
987 if (!socket_.is_open())
988 return;
989
990 // the error_code is produced by invokeProtocolMessage
991 // it could be due to a bad message
992 if (ec)
993 return fail("onReadMessage", ec);
994
995 if (bytes_consumed == 0)
996 break;
997
998 read_buffer_.consume(bytes_consumed);
999 }
1000
1001 // check if a shutdown was initiated while processing messages
1002 if (shutdown_)
1003 return tryAsyncShutdown();
1004
1005 readPending_ = true;
1006
1007 XRPL_ASSERT(
1008 !shutdownStarted_, "xrpl::PeerImp::onReadMessage : shutdown started");
1009
1010 // Timeout on writes only
1011 stream_.async_read_some(
1013 bind_executor(
1014 strand_,
1015 std::bind(
1018 std::placeholders::_1,
1019 std::placeholders::_2)));
1020}
1021
1022void
1024{
1025 XRPL_ASSERT(
1026 strand_.running_in_this_thread(),
1027 "xrpl::PeerImp::onWriteMessage : strand in this thread");
1028
1029 writePending_ = false;
1030
1031 if (!socket_.is_open())
1032 return;
1033
1034 if (ec)
1035 {
1036 if (ec == boost::asio::error::operation_aborted)
1037 return tryAsyncShutdown();
1038
1039 return fail("onWriteMessage", ec);
1040 }
1041
1042 if (auto stream = journal_.trace())
1043 {
1044 stream << "onWriteMessage: "
1045 << (bytes_transferred > 0
1046 ? to_string(bytes_transferred) + " bytes"
1047 : "");
1048 }
1049
1050 metrics_.sent.add_message(bytes_transferred);
1051
1052 XRPL_ASSERT(
1053 !send_queue_.empty(),
1054 "xrpl::PeerImp::onWriteMessage : non-empty send buffer");
1055 send_queue_.pop();
1056
1057 if (shutdown_)
1058 return tryAsyncShutdown();
1059
1060 if (!send_queue_.empty())
1061 {
1062 writePending_ = true;
1063 XRPL_ASSERT(
1065 "xrpl::PeerImp::onWriteMessage : shutdown started");
1066
1067 // Timeout on writes only
1068 return boost::asio::async_write(
1069 stream_,
1070 boost::asio::buffer(
1071 send_queue_.front()->getBuffer(compressionEnabled_)),
1072 bind_executor(
1073 strand_,
1074 std::bind(
1077 std::placeholders::_1,
1078 std::placeholders::_2)));
1079 }
1080}
1081
1082//------------------------------------------------------------------------------
1083//
1084// ProtocolHandler
1085//
1086//------------------------------------------------------------------------------
1087
1088void
1090{
1091 // TODO
1092}
1093
1094void
1096 std::uint16_t type,
1098 std::size_t size,
1099 std::size_t uncompressed_size,
1100 bool isCompressed)
1101{
1102 auto const name = protocolMessageName(type);
1105
1106 auto const category = TrafficCount::categorize(
1107 *m, static_cast<protocol::MessageType>(type), true);
1108
1109 // report total incoming traffic
1111 TrafficCount::category::total, static_cast<int>(size));
1112
1113 // increase the traffic received for a specific category
1114 overlay_.reportInboundTraffic(category, static_cast<int>(size));
1115
1116 using namespace protocol;
1117 if ((type == MessageType::mtTRANSACTION ||
1118 type == MessageType::mtHAVE_TRANSACTIONS ||
1119 type == MessageType::mtTRANSACTIONS ||
1120 // GET_OBJECTS
1122 // GET_LEDGER
1125 // LEDGER_DATA
1129 {
1131 static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
1132 }
1133 JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
1134 << uncompressed_size << " " << isCompressed;
1135}
1136
1137void
1145
1146void
1148{
1149 auto const s = m->list_size();
1150
1151 if (s == 0)
1152 {
1154 return;
1155 }
1156
1157 if (s > 100)
1159
1161 jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
1162 overlay_.onManifests(m, that);
1163 });
1164}
1165
1166void
1168{
1169 if (m->type() == protocol::TMPing::ptPING)
1170 {
1171 // We have received a ping request, reply with a pong
1173 m->set_type(protocol::TMPing::ptPONG);
1174 send(std::make_shared<Message>(*m, protocol::mtPING));
1175 return;
1176 }
1177
1178 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1179 {
1180 // Only reset the ping sequence if we actually received a
1181 // PONG with the correct cookie. That way, any peers which
1182 // respond with incorrect cookies will eventually time out.
1183 if (m->seq() == lastPingSeq_)
1184 {
1186
1187 // Update latency estimate
1188 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1190
1192
1193 if (latency_)
1194 latency_ = (*latency_ * 7 + rtt) / 8;
1195 else
1196 latency_ = rtt;
1197 }
1198
1199 return;
1200 }
1201}
1202
1203void
1205{
1206 // VFALCO NOTE I think we should drop the peer immediately
1207 if (!cluster())
1208 {
1209 fee_.update(Resource::feeUselessData, "unknown cluster");
1210 return;
1211 }
1212
1213 for (int i = 0; i < m->clusternodes().size(); ++i)
1214 {
1215 protocol::TMClusterNode const& node = m->clusternodes(i);
1216
1218 if (node.has_nodename())
1219 name = node.nodename();
1220
1221 auto const publicKey =
1222 parseBase58<PublicKey>(TokenType::NodePublic, node.publickey());
1223
1224 // NIKB NOTE We should drop the peer immediately if
1225 // they send us a public key we can't parse
1226 if (publicKey)
1227 {
1228 auto const reportTime =
1229 NetClock::time_point{NetClock::duration{node.reporttime()}};
1230
1232 *publicKey, name, node.nodeload(), reportTime);
1233 }
1234 }
1235
1236 int loadSources = m->loadsources().size();
1237 if (loadSources != 0)
1238 {
1239 Resource::Gossip gossip;
1240 gossip.items.reserve(loadSources);
1241 for (int i = 0; i < m->loadsources().size(); ++i)
1242 {
1243 protocol::TMLoadSource const& node = m->loadsources(i);
1245 item.address = beast::IP::Endpoint::from_string(node.name());
1246 item.balance = node.cost();
1247 if (item.address != beast::IP::Endpoint())
1248 gossip.items.push_back(item);
1249 }
1251 }
1252
1253 // Calculate the cluster fee:
1254 auto const thresh = app_.timeKeeper().now() - 90s;
1255 std::uint32_t clusterFee = 0;
1256
1258 fees.reserve(app_.cluster().size());
1259
1260 app_.cluster().for_each([&fees, thresh](ClusterNode const& status) {
1261 if (status.getReportTime() >= thresh)
1262 fees.push_back(status.getLoadFee());
1263 });
1264
1265 if (!fees.empty())
1266 {
1267 auto const index = fees.size() / 2;
1268 std::nth_element(fees.begin(), fees.begin() + index, fees.end());
1269 clusterFee = fees[index];
1270 }
1271
1272 app_.getFeeTrack().setClusterFee(clusterFee);
1273}
1274
1275void
1277{
1278 // Don't allow endpoints from peers that are not known tracking or are
1279 // not using a version of the message that we support:
1280 if (tracking_.load() != Tracking::converged || m->version() != 2)
1281 return;
1282
1283 // The number is arbitrary and doesn't have any real significance or
1284 // implication for the protocol.
1285 if (m->endpoints_v2().size() >= 1024)
1286 {
1287 fee_.update(Resource::feeUselessData, "endpoints too large");
1288 return;
1289 }
1290
1292 endpoints.reserve(m->endpoints_v2().size());
1293
1294 auto malformed = 0;
1295 for (auto const& tm : m->endpoints_v2())
1296 {
1297 auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
1298
1299 if (!result)
1300 {
1301 JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
1302 << tm.endpoint() << "}";
1303 malformed++;
1304 continue;
1305 }
1306
1307 // If hops == 0, this Endpoint describes the peer we are connected
1308 // to -- in that case, we take the remote address seen on the
1309 // socket and store that in the IP::Endpoint. If this is the first
1310 // time, then we'll verify that their listener can receive incoming
1311 // by performing a connectivity test. if hops > 0, then we just
1312 // take the address/port we were given
1313 if (tm.hops() == 0)
1314 result = remote_address_.at_port(result->port());
1315
1316 endpoints.emplace_back(*result, tm.hops());
1317 }
1318
1319 // Charge the peer for each malformed endpoint. As there still may be
1320 // multiple valid endpoints we don't return early.
1321 if (malformed > 0)
1322 {
1323 fee_.update(
1324 Resource::feeInvalidData * malformed,
1325 std::to_string(malformed) + " malformed endpoints");
1326 }
1327
1328 if (!endpoints.empty())
1329 overlay_.peerFinder().on_endpoints(slot_, endpoints);
1330}
1331
1332void
1337
1338void
1341 bool eraseTxQueue,
1342 bool batch)
1343{
1344 XRPL_ASSERT(
1345 eraseTxQueue != batch,
1346 ("xrpl::PeerImp::handleTransaction : valid inputs"));
1348 return;
1349
1351 {
1352 // If we've never been in synch, there's nothing we can do
1353 // with a transaction
1354 JLOG(p_journal_.debug()) << "Ignoring incoming transaction: "
1355 << "Need network ledger";
1356 return;
1357 }
1358
1359 SerialIter sit(makeSlice(m->rawtransaction()));
1360
1361 try
1362 {
1363 auto stx = std::make_shared<STTx const>(sit);
1364 uint256 txID = stx->getTransactionID();
1365
1366 // Charge strongly for attempting to relay a txn with tfInnerBatchTxn
1367 // LCOV_EXCL_START
1368 /*
1369 There is no need to check whether the featureBatch amendment is
1370 enabled.
1371
1372 * If the `tfInnerBatchTxn` flag is set, and the amendment is
1373 enabled, then it's an invalid transaction because inner batch
1374 transactions should not be relayed.
1375 * If the `tfInnerBatchTxn` flag is set, and the amendment is *not*
1376 enabled, then the transaction is malformed because it's using an
1377 "unknown" flag. There's no need to waste the resources to send it
1378 to the transaction engine.
1379
1380 We don't normally check transaction validity at this level, but
1381 since we _need_ to check it when the amendment is enabled, we may as
1382 well drop it if the flag is set regardless.
1383 */
1384 if (stx->isFlag(tfInnerBatchTxn))
1385 {
1386 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
1387 "tfInnerBatchTxn (handleTransaction).";
1388 fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
1389 return;
1390 }
1391 // LCOV_EXCL_STOP
1392
1393 HashRouterFlags flags;
1394 constexpr std::chrono::seconds tx_interval = 10s;
1395
1396 if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
1397 {
1398 // we have seen this transaction recently
1399 if (any(flags & HashRouterFlags::BAD))
1400 {
1401 fee_.update(Resource::feeUselessData, "known bad");
1402 JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
1403 }
1404
1405 // Erase only if the server has seen this tx. If the server has not
1406 // seen this tx then the tx could not has been queued for this peer.
1407 else if (eraseTxQueue && txReduceRelayEnabled())
1408 removeTxQueue(txID);
1409
1413
1414 return;
1415 }
1416
1417 JLOG(p_journal_.debug()) << "Got tx " << txID;
1418
1419 bool checkSignature = true;
1420 if (cluster())
1421 {
1422 if (!m->has_deferred() || !m->deferred())
1423 {
1424 // Skip local checks if a server we trust
1425 // put the transaction in its open ledger
1426 flags |= HashRouterFlags::TRUSTED;
1427 }
1428
1429 // for non-validator nodes only -- localPublicKey is set for
1430 // validators only
1432 {
1433 // For now, be paranoid and have each validator
1434 // check each transaction, regardless of source
1435 checkSignature = false;
1436 }
1437 }
1438
1440 {
1441 JLOG(p_journal_.trace())
1442 << "No new transactions until synchronized";
1443 }
1444 else if (
1447 {
1449 JLOG(p_journal_.info()) << "Transaction queue is full";
1450 }
1451 else
1452 {
1455 "recvTransaction->checkTransaction",
1457 flags,
1458 checkSignature,
1459 batch,
1460 stx]() {
1461 if (auto peer = weak.lock())
1462 peer->checkTransaction(
1463 flags, checkSignature, stx, batch);
1464 });
1465 }
1466 }
1467 catch (std::exception const& ex)
1468 {
1469 JLOG(p_journal_.warn())
1470 << "Transaction invalid: " << strHex(m->rawtransaction())
1471 << ". Exception: " << ex.what();
1472 }
1473}
1474
1475void
1477{
1478 auto badData = [&](std::string const& msg) {
1479 fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
1480 JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
1481 };
1482 auto const itype{m->itype()};
1483
1484 // Verify ledger info type
1485 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1486 return badData("Invalid ledger info type");
1487
1488 auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
1489 if (m->has_ltype())
1490 return m->ltype();
1491 return std::nullopt;
1492 }();
1493
1494 if (itype == protocol::liTS_CANDIDATE)
1495 {
1496 if (!m->has_ledgerhash())
1497 return badData("Invalid TX candidate set, missing TX set hash");
1498 }
1499 else if (
1500 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1501 !(ltype && *ltype == protocol::ltCLOSED))
1502 {
1503 return badData("Invalid request");
1504 }
1505
1506 // Verify ledger type
1507 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1508 return badData("Invalid ledger type");
1509
1510 // Verify ledger hash
1511 if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash()))
1512 return badData("Invalid ledger hash");
1513
1514 // Verify ledger sequence
1515 if (m->has_ledgerseq())
1516 {
1517 auto const ledgerSeq{m->ledgerseq()};
1518
1519 // Check if within a reasonable range
1520 using namespace std::chrono_literals;
1522 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1523 {
1524 return badData(
1525 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1526 }
1527 }
1528
1529 // Verify ledger node IDs
1530 if (itype != protocol::liBASE)
1531 {
1532 if (m->nodeids_size() <= 0)
1533 return badData("Invalid ledger node IDs");
1534
1535 for (auto const& nodeId : m->nodeids())
1536 {
1538 return badData("Invalid SHAMap node ID");
1539 }
1540 }
1541
1542 // Verify query type
1543 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1544 return badData("Invalid query type");
1545
1546 // Verify query depth
1547 if (m->has_querydepth())
1548 {
1549 if (m->querydepth() > Tuning::maxQueryDepth ||
1550 itype == protocol::liBASE)
1551 {
1552 return badData("Invalid query depth");
1553 }
1554 }
1555
1556 // Queue a job to process the request
1558 app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
1559 if (auto peer = weak.lock())
1560 peer->processLedgerRequest(m);
1561 });
1562}
1563
1564void
1566{
1567 JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
1569 {
1570 fee_.update(
1571 Resource::feeMalformedRequest, "proof_path_request disabled");
1572 return;
1573 }
1574
1575 fee_.update(
1576 Resource::feeModerateBurdenPeer, "received a proof path request");
1579 jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
1580 if (auto peer = weak.lock())
1581 {
1582 auto reply =
1583 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1584 if (reply.has_error())
1585 {
1586 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1587 peer->charge(
1588 Resource::feeMalformedRequest,
1589 "proof_path_request");
1590 else
1591 peer->charge(
1592 Resource::feeRequestNoReply, "proof_path_request");
1593 }
1594 else
1595 {
1596 peer->send(std::make_shared<Message>(
1597 reply, protocol::mtPROOF_PATH_RESPONSE));
1598 }
1599 }
1600 });
1601}
1602
1603void
1605{
1606 if (!ledgerReplayEnabled_)
1607 {
1608 fee_.update(
1609 Resource::feeMalformedRequest, "proof_path_response disabled");
1610 return;
1611 }
1612
1613 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1614 {
1615 fee_.update(Resource::feeInvalidData, "proof_path_response");
1616 }
1617}
1618
1619void
1621{
1622 JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
1623 if (!ledgerReplayEnabled_)
1624 {
1625 fee_.update(
1626 Resource::feeMalformedRequest, "replay_delta_request disabled");
1627 return;
1628 }
1629
1630 fee_.fee = Resource::feeModerateBurdenPeer;
1631 std::weak_ptr<PeerImp> weak = shared_from_this();
1632 app_.getJobQueue().addJob(
1633 jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
1634 if (auto peer = weak.lock())
1635 {
1636 auto reply =
1637 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1638 if (reply.has_error())
1639 {
1640 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1641 peer->charge(
1642 Resource::feeMalformedRequest,
1643 "replay_delta_request");
1644 else
1645 peer->charge(
1646 Resource::feeRequestNoReply,
1647 "replay_delta_request");
1648 }
1649 else
1650 {
1651 peer->send(std::make_shared<Message>(
1652 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1653 }
1654 }
1655 });
1656}
1657
1658void
1660{
1661 if (!ledgerReplayEnabled_)
1662 {
1663 fee_.update(
1664 Resource::feeMalformedRequest, "replay_delta_response disabled");
1665 return;
1666 }
1667
1668 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1669 {
1670 fee_.update(Resource::feeInvalidData, "replay_delta_response");
1671 }
1672}
1673
1674void
1676{
1677 auto badData = [&](std::string const& msg) {
1678 fee_.update(Resource::feeInvalidData, msg);
1679 JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
1680 };
1681
1682 // Verify ledger hash
1683 if (!stringIsUint256Sized(m->ledgerhash()))
1684 return badData("Invalid ledger hash");
1685
1686 // Verify ledger sequence
1687 {
1688 auto const ledgerSeq{m->ledgerseq()};
1689 if (m->type() == protocol::liTS_CANDIDATE)
1690 {
1691 if (ledgerSeq != 0)
1692 {
1693 return badData(
1694 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1695 }
1696 }
1697 else
1698 {
1699 // Check if within a reasonable range
1700 using namespace std::chrono_literals;
1701 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1702 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1703 {
1704 return badData(
1705 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1706 }
1707 }
1708 }
1709
1710 // Verify ledger info type
1711 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1712 return badData("Invalid ledger info type");
1713
1714 // Verify reply error
1715 if (m->has_error() &&
1716 (m->error() < protocol::reNO_LEDGER ||
1717 m->error() > protocol::reBAD_REQUEST))
1718 {
1719 return badData("Invalid reply error");
1720 }
1721
1722 // Verify ledger nodes.
1723 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1724 {
1725 return badData(
1726 "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
1727 }
1728
1729 // If there is a request cookie, attempt to relay the message
1730 if (m->has_requestcookie())
1731 {
1732 if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1733 {
1734 m->clear_requestcookie();
1735 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1736 }
1737 else
1738 {
1739 JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
1740 }
1741 return;
1742 }
1743
1744 uint256 const ledgerHash{m->ledgerhash()};
1745
1746 // Otherwise check if received data for a candidate transaction set
1747 if (m->type() == protocol::liTS_CANDIDATE)
1748 {
1749 std::weak_ptr<PeerImp> weak{shared_from_this()};
1750 app_.getJobQueue().addJob(
1751 jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() {
1752 if (auto peer = weak.lock())
1753 {
1754 peer->app_.getInboundTransactions().gotData(
1755 ledgerHash, peer, m);
1756 }
1757 });
1758 return;
1759 }
1760
1761 // Consume the message
1762 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1763}
1764
1765void
1767{
1768 protocol::TMProposeSet& set = *m;
1769
1770 auto const sig = makeSlice(set.signature());
1771
1772 // Preliminary check for the validity of the signature: A DER encoded
1773 // signature can't be longer than 72 bytes.
1774 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1775 (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
1776 {
1777 JLOG(p_journal_.warn()) << "Proposal: malformed";
1778 fee_.update(
1779 Resource::feeInvalidSignature,
1780 " signature can't be longer than 72 bytes");
1781 return;
1782 }
1783
1784 if (!stringIsUint256Sized(set.currenttxhash()) ||
1785 !stringIsUint256Sized(set.previousledger()))
1786 {
1787 JLOG(p_journal_.warn()) << "Proposal: malformed";
1788 fee_.update(Resource::feeMalformedRequest, "bad hashes");
1789 return;
1790 }
1791
1792 // RH TODO: when isTrusted = false we should probably also cache a key
1793 // suppression for 30 seconds to avoid doing a relatively expensive lookup
1794 // every time a spam packet is received
1795 PublicKey const publicKey{makeSlice(set.nodepubkey())};
1796 auto const isTrusted = app_.validators().trusted(publicKey);
1797
1798 // If the operator has specified that untrusted proposals be dropped then
1799 // this happens here I.e. before further wasting CPU verifying the signature
1800 // of an untrusted key
1801 if (!isTrusted)
1802 {
1803 // report untrusted proposal messages
1804 overlay_.reportInboundTraffic(
1805 TrafficCount::category::proposal_untrusted,
1806 Message::messageSize(*m));
1807
1808 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1809 return;
1810 }
1811
1812 uint256 const proposeHash{set.currenttxhash()};
1813 uint256 const prevLedger{set.previousledger()};
1814
1815 NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
1816
1817 uint256 const suppression = proposalUniqueId(
1818 proposeHash,
1819 prevLedger,
1820 set.proposeseq(),
1821 closeTime,
1822 publicKey.slice(),
1823 sig);
1824
1825 if (auto [added, relayed] =
1826 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1827 !added)
1828 {
1829 // Count unique messages (Slots has it's own 'HashRouter'), which a peer
1830 // receives within IDLED seconds since the message has been relayed.
1831 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
1832 overlay_.updateSlotAndSquelch(
1833 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1834
1835 // report duplicate proposal messages
1836 overlay_.reportInboundTraffic(
1837 TrafficCount::category::proposal_duplicate,
1838 Message::messageSize(*m));
1839
1840 JLOG(p_journal_.trace()) << "Proposal: duplicate";
1841
1842 return;
1843 }
1844
1845 if (!isTrusted)
1846 {
1847 if (tracking_.load() == Tracking::diverged)
1848 {
1849 JLOG(p_journal_.debug())
1850 << "Proposal: Dropping untrusted (peer divergence)";
1851 return;
1852 }
1853
1854 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1855 {
1856 JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
1857 return;
1858 }
1859 }
1860
1861 JLOG(p_journal_.trace())
1862 << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
1863
1864 auto proposal = RCLCxPeerPos(
1865 publicKey,
1866 sig,
1867 suppression,
1869 prevLedger,
1870 set.proposeseq(),
1871 proposeHash,
1872 closeTime,
1873 app_.timeKeeper().closeTime(),
1874 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1875
1876 std::weak_ptr<PeerImp> weak = shared_from_this();
1877 app_.getJobQueue().addJob(
1878 isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
1879 "recvPropose->checkPropose",
1880 [weak, isTrusted, m, proposal]() {
1881 if (auto peer = weak.lock())
1882 peer->checkPropose(isTrusted, m, proposal);
1883 });
1884}
1885
1886void
1888{
1889 JLOG(p_journal_.trace()) << "Status: Change";
1890
1891 if (!m->has_networktime())
1892 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1893
1894 {
1895 std::lock_guard sl(recentLock_);
1896 if (!last_status_.has_newstatus() || m->has_newstatus())
1897 last_status_ = *m;
1898 else
1899 {
1900 // preserve old status
1901 protocol::NodeStatus status = last_status_.newstatus();
1902 last_status_ = *m;
1903 m->set_newstatus(status);
1904 }
1905 }
1906
1907 if (m->newevent() == protocol::neLOST_SYNC)
1908 {
1909 bool outOfSync{false};
1910 {
1911 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1912 // guarded by recentLock_.
1913 std::lock_guard sl(recentLock_);
1914 if (!closedLedgerHash_.isZero())
1915 {
1916 outOfSync = true;
1917 closedLedgerHash_.zero();
1918 }
1919 previousLedgerHash_.zero();
1920 }
1921 if (outOfSync)
1922 {
1923 JLOG(p_journal_.debug()) << "Status: Out of sync";
1924 }
1925 return;
1926 }
1927
1928 {
1929 uint256 closedLedgerHash{};
1930 bool const peerChangedLedgers{
1931 m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
1932
1933 {
1934 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1935 // guarded by recentLock_.
1936 std::lock_guard sl(recentLock_);
1937 if (peerChangedLedgers)
1938 {
1939 closedLedgerHash_ = m->ledgerhash();
1940 closedLedgerHash = closedLedgerHash_;
1941 addLedger(closedLedgerHash, sl);
1942 }
1943 else
1944 {
1945 closedLedgerHash_.zero();
1946 }
1947
1948 if (m->has_ledgerhashprevious() &&
1949 stringIsUint256Sized(m->ledgerhashprevious()))
1950 {
1951 previousLedgerHash_ = m->ledgerhashprevious();
1952 addLedger(previousLedgerHash_, sl);
1953 }
1954 else
1955 {
1956 previousLedgerHash_.zero();
1957 }
1958 }
1959 if (peerChangedLedgers)
1960 {
1961 JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
1962 }
1963 else
1964 {
1965 JLOG(p_journal_.debug()) << "Status: No ledger";
1966 }
1967 }
1968
1969 if (m->has_firstseq() && m->has_lastseq())
1970 {
1971 std::lock_guard sl(recentLock_);
1972
1973 minLedger_ = m->firstseq();
1974 maxLedger_ = m->lastseq();
1975
1976 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1977 minLedger_ = maxLedger_ = 0;
1978 }
1979
1980 if (m->has_ledgerseq() &&
1981 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1982 {
1983 checkTracking(
1984 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1985 }
1986
1987 app_.getOPs().pubPeerStatus([=, this]() -> Json::Value {
1989
1990 if (m->has_newstatus())
1991 {
1992 switch (m->newstatus())
1993 {
1994 case protocol::nsCONNECTING:
1995 j[jss::status] = "CONNECTING";
1996 break;
1997 case protocol::nsCONNECTED:
1998 j[jss::status] = "CONNECTED";
1999 break;
2000 case protocol::nsMONITORING:
2001 j[jss::status] = "MONITORING";
2002 break;
2003 case protocol::nsVALIDATING:
2004 j[jss::status] = "VALIDATING";
2005 break;
2006 case protocol::nsSHUTTING:
2007 j[jss::status] = "SHUTTING";
2008 break;
2009 }
2010 }
2011
2012 if (m->has_newevent())
2013 {
2014 switch (m->newevent())
2015 {
2016 case protocol::neCLOSING_LEDGER:
2017 j[jss::action] = "CLOSING_LEDGER";
2018 break;
2019 case protocol::neACCEPTED_LEDGER:
2020 j[jss::action] = "ACCEPTED_LEDGER";
2021 break;
2022 case protocol::neSWITCHED_LEDGER:
2023 j[jss::action] = "SWITCHED_LEDGER";
2024 break;
2025 case protocol::neLOST_SYNC:
2026 j[jss::action] = "LOST_SYNC";
2027 break;
2028 }
2029 }
2030
2031 if (m->has_ledgerseq())
2032 {
2033 j[jss::ledger_index] = m->ledgerseq();
2034 }
2035
2036 if (m->has_ledgerhash())
2037 {
2038 uint256 closedLedgerHash{};
2039 {
2040 std::lock_guard sl(recentLock_);
2041 closedLedgerHash = closedLedgerHash_;
2042 }
2043 j[jss::ledger_hash] = to_string(closedLedgerHash);
2044 }
2045
2046 if (m->has_networktime())
2047 {
2048 j[jss::date] = Json::UInt(m->networktime());
2049 }
2050
2051 if (m->has_firstseq() && m->has_lastseq())
2052 {
2053 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2054 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2055 }
2056
2057 return j;
2058 });
2059}
2060
2061void
2062PeerImp::checkTracking(std::uint32_t validationSeq)
2063{
2064 std::uint32_t serverSeq;
2065 {
2066 // Extract the sequence number of the highest
2067 // ledger this peer has
2068 std::lock_guard sl(recentLock_);
2069
2070 serverSeq = maxLedger_;
2071 }
2072 if (serverSeq != 0)
2073 {
2074 // Compare the peer's ledger sequence to the
2075 // sequence of a recently-validated ledger
2076 checkTracking(serverSeq, validationSeq);
2077 }
2078}
2079
2080void
2081PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
2082{
2083 int diff = std::max(seq1, seq2) - std::min(seq1, seq2);
2084
2085 if (diff < Tuning::convergedLedgerLimit)
2086 {
2087 // The peer's ledger sequence is close to the validation's
2088 tracking_ = Tracking::converged;
2089 }
2090
2091 if ((diff > Tuning::divergedLedgerLimit) &&
2092 (tracking_.load() != Tracking::diverged))
2093 {
2094 // The peer's ledger sequence is way off the validation's
2095 std::lock_guard sl(recentLock_);
2096
2097 tracking_ = Tracking::diverged;
2098 trackingTime_ = clock_type::now();
2099 }
2100}
2101
2102void
2104{
2105 if (!stringIsUint256Sized(m->hash()))
2106 {
2107 fee_.update(Resource::feeMalformedRequest, "bad hash");
2108 return;
2109 }
2110
2111 uint256 const hash{m->hash()};
2112
2113 if (m->status() == protocol::tsHAVE)
2114 {
2115 std::lock_guard sl(recentLock_);
2116
2117 if (std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2118 recentTxSets_.end())
2119 {
2120 fee_.update(Resource::feeUselessData, "duplicate (tsHAVE)");
2121 return;
2122 }
2123
2124 recentTxSets_.push_back(hash);
2125 }
2126}
2127
2128void
2129PeerImp::onValidatorListMessage(
2130 std::string const& messageType,
2131 std::string const& manifest,
2132 std::uint32_t version,
2133 std::vector<ValidatorBlobInfo> const& blobs)
2134{
2135 // If there are no blobs, the message is malformed (possibly because of
2136 // ValidatorList class rules), so charge accordingly and skip processing.
2137 if (blobs.empty())
2138 {
2139 JLOG(p_journal_.warn()) << "Ignored malformed " << messageType;
2140 // This shouldn't ever happen with a well-behaved peer
2141 fee_.update(Resource::feeHeavyBurdenPeer, "no blobs");
2142 return;
2143 }
2144
2145 auto const hash = sha512Half(manifest, blobs, version);
2146
2147 JLOG(p_journal_.debug()) << "Received " << messageType;
2148
2149 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2150 {
2151 JLOG(p_journal_.debug())
2152 << messageType << ": received duplicate " << messageType;
2153 // Charging this fee here won't hurt the peer in the normal
2154 // course of operation (ie. refresh every 5 minutes), but
2155 // will add up if the peer is misbehaving.
2156 fee_.update(Resource::feeUselessData, "duplicate");
2157 return;
2158 }
2159
2160 auto const applyResult = app_.validators().applyListsAndBroadcast(
2161 manifest,
2162 version,
2163 blobs,
2164 remote_address_.to_string(),
2165 hash,
2166 app_.overlay(),
2167 app_.getHashRouter(),
2168 app_.getOPs());
2169
2170 JLOG(p_journal_.debug())
2171 << "Processed " << messageType << " version " << version << " from "
2172 << (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
2173 : "unknown or invalid publisher")
2174 << " with best result " << to_string(applyResult.bestDisposition());
2175
2176 // Act based on the best result
2177 switch (applyResult.bestDisposition())
2178 {
2179 // New list
2180 case ListDisposition::accepted:
2181 // Newest list is expired, and that needs to be broadcast, too
2182 case ListDisposition::expired:
2183 // Future list
2184 case ListDisposition::pending: {
2185 std::lock_guard<std::mutex> sl(recentLock_);
2186
2187 XRPL_ASSERT(
2188 applyResult.publisherKey,
2189 "xrpl::PeerImp::onValidatorListMessage : publisher key is "
2190 "set");
2191 auto const& pubKey = *applyResult.publisherKey;
2192#ifndef NDEBUG
2193 if (auto const iter = publisherListSequences_.find(pubKey);
2194 iter != publisherListSequences_.end())
2195 {
2196 XRPL_ASSERT(
2197 iter->second < applyResult.sequence,
2198 "xrpl::PeerImp::onValidatorListMessage : lower sequence");
2199 }
2200#endif
2201 publisherListSequences_[pubKey] = applyResult.sequence;
2202 }
2203 break;
2204 case ListDisposition::same_sequence:
2205 case ListDisposition::known_sequence:
2206#ifndef NDEBUG
2207 {
2208 std::lock_guard<std::mutex> sl(recentLock_);
2209 XRPL_ASSERT(
2210 applyResult.sequence && applyResult.publisherKey,
2211 "xrpl::PeerImp::onValidatorListMessage : nonzero sequence "
2212 "and set publisher key");
2213 XRPL_ASSERT(
2214 publisherListSequences_[*applyResult.publisherKey] <=
2215 applyResult.sequence,
2216 "xrpl::PeerImp::onValidatorListMessage : maximum sequence");
2217 }
2218#endif // !NDEBUG
2219
2220 break;
2221 case ListDisposition::stale:
2222 case ListDisposition::untrusted:
2223 case ListDisposition::invalid:
2224 case ListDisposition::unsupported_version:
2225 break;
2226 // LCOV_EXCL_START
2227 default:
2228 UNREACHABLE(
2229 "xrpl::PeerImp::onValidatorListMessage : invalid best list "
2230 "disposition");
2231 // LCOV_EXCL_STOP
2232 }
2233
2234 // Charge based on the worst result
2235 switch (applyResult.worstDisposition())
2236 {
2237 case ListDisposition::accepted:
2238 case ListDisposition::expired:
2239 case ListDisposition::pending:
2240 // No charges for good data
2241 break;
2242 case ListDisposition::same_sequence:
2243 case ListDisposition::known_sequence:
2244 // Charging this fee here won't hurt the peer in the normal
2245 // course of operation (ie. refresh every 5 minutes), but
2246 // will add up if the peer is misbehaving.
2247 fee_.update(
2248 Resource::feeUselessData,
2249 " duplicate (same_sequence or known_sequence)");
2250 break;
2251 case ListDisposition::stale:
2252 // There are very few good reasons for a peer to send an
2253 // old list, particularly more than once.
2254 fee_.update(Resource::feeInvalidData, "expired");
2255 break;
2256 case ListDisposition::untrusted:
2257 // Charging this fee here won't hurt the peer in the normal
2258 // course of operation (ie. refresh every 5 minutes), but
2259 // will add up if the peer is misbehaving.
2260 fee_.update(Resource::feeUselessData, "untrusted");
2261 break;
2262 case ListDisposition::invalid:
2263 // This shouldn't ever happen with a well-behaved peer
2264 fee_.update(
2265 Resource::feeInvalidSignature, "invalid list disposition");
2266 break;
2267 case ListDisposition::unsupported_version:
2268 // During a version transition, this may be legitimate.
2269 // If it happens frequently, that's probably bad.
2270 fee_.update(Resource::feeInvalidData, "version");
2271 break;
2272 // LCOV_EXCL_START
2273 default:
2274 UNREACHABLE(
2275 "xrpl::PeerImp::onValidatorListMessage : invalid worst list "
2276 "disposition");
2277 // LCOV_EXCL_STOP
2278 }
2279
2280 // Log based on all the results.
2281 for (auto const& [disp, count] : applyResult.dispositions)
2282 {
2283 switch (disp)
2284 {
2285 // New list
2286 case ListDisposition::accepted:
2287 JLOG(p_journal_.debug())
2288 << "Applied " << count << " new " << messageType;
2289 break;
2290 // Newest list is expired, and that needs to be broadcast, too
2291 case ListDisposition::expired:
2292 JLOG(p_journal_.debug())
2293 << "Applied " << count << " expired " << messageType;
2294 break;
2295 // Future list
2296 case ListDisposition::pending:
2297 JLOG(p_journal_.debug())
2298 << "Processed " << count << " future " << messageType;
2299 break;
2300 case ListDisposition::same_sequence:
2301 JLOG(p_journal_.warn())
2302 << "Ignored " << count << " " << messageType
2303 << "(s) with current sequence";
2304 break;
2305 case ListDisposition::known_sequence:
2306 JLOG(p_journal_.warn())
2307 << "Ignored " << count << " " << messageType
2308 << "(s) with future sequence";
2309 break;
2310 case ListDisposition::stale:
2311 JLOG(p_journal_.warn())
2312 << "Ignored " << count << "stale " << messageType;
2313 break;
2314 case ListDisposition::untrusted:
2315 JLOG(p_journal_.warn())
2316 << "Ignored " << count << " untrusted " << messageType;
2317 break;
2318 case ListDisposition::unsupported_version:
2319 JLOG(p_journal_.warn())
2320 << "Ignored " << count << "unsupported version "
2321 << messageType;
2322 break;
2323 case ListDisposition::invalid:
2324 JLOG(p_journal_.warn())
2325 << "Ignored " << count << "invalid " << messageType;
2326 break;
2327 // LCOV_EXCL_START
2328 default:
2329 UNREACHABLE(
2330 "xrpl::PeerImp::onValidatorListMessage : invalid list "
2331 "disposition");
2332 // LCOV_EXCL_STOP
2333 }
2334 }
2335}
2336
2337void
2339{
2340 try
2341 {
2342 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2343 {
2344 JLOG(p_journal_.debug())
2345 << "ValidatorList: received validator list from peer using "
2346 << "protocol version " << to_string(protocol_)
2347 << " which shouldn't support this feature.";
2348 fee_.update(Resource::feeUselessData, "unsupported peer");
2349 return;
2350 }
2351 onValidatorListMessage(
2352 "ValidatorList",
2353 m->manifest(),
2354 m->version(),
2355 ValidatorList::parseBlobs(*m));
2356 }
2357 catch (std::exception const& e)
2358 {
2359 JLOG(p_journal_.warn()) << "ValidatorList: Exception, " << e.what();
2360 using namespace std::string_literals;
2361 fee_.update(Resource::feeInvalidData, e.what());
2362 }
2363}
2364
2365void
2366PeerImp::onMessage(
2368{
2369 try
2370 {
2371 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2372 {
2373 JLOG(p_journal_.debug())
2374 << "ValidatorListCollection: received validator list from peer "
2375 << "using protocol version " << to_string(protocol_)
2376 << " which shouldn't support this feature.";
2377 fee_.update(Resource::feeUselessData, "unsupported peer");
2378 return;
2379 }
2380 else if (m->version() < 2)
2381 {
2382 JLOG(p_journal_.debug())
2383 << "ValidatorListCollection: received invalid validator list "
2384 "version "
2385 << m->version() << " from peer using protocol version "
2386 << to_string(protocol_);
2387 fee_.update(Resource::feeInvalidData, "wrong version");
2388 return;
2389 }
2390 onValidatorListMessage(
2391 "ValidatorListCollection",
2392 m->manifest(),
2393 m->version(),
2394 ValidatorList::parseBlobs(*m));
2395 }
2396 catch (std::exception const& e)
2397 {
2398 JLOG(p_journal_.warn())
2399 << "ValidatorListCollection: Exception, " << e.what();
2400 using namespace std::string_literals;
2401 fee_.update(Resource::feeInvalidData, e.what());
2402 }
2403}
2404
2405void
2407{
2408 if (m->validation().size() < 50)
2409 {
2410 JLOG(p_journal_.warn()) << "Validation: Too small";
2411 fee_.update(Resource::feeMalformedRequest, "too small");
2412 return;
2413 }
2414
2415 try
2416 {
2417 auto const closeTime = app_.timeKeeper().closeTime();
2418
2420 {
2421 SerialIter sit(makeSlice(m->validation()));
2423 std::ref(sit),
2424 [this](PublicKey const& pk) {
2425 return calcNodeID(
2426 app_.validatorManifests().getMasterKey(pk));
2427 },
2428 false);
2429 val->setSeen(closeTime);
2430 }
2431
2432 if (!isCurrent(
2433 app_.getValidations().parms(),
2434 app_.timeKeeper().closeTime(),
2435 val->getSignTime(),
2436 val->getSeenTime()))
2437 {
2438 JLOG(p_journal_.trace()) << "Validation: Not current";
2439 fee_.update(Resource::feeUselessData, "not current");
2440 return;
2441 }
2442
2443 // RH TODO: when isTrusted = false we should probably also cache a key
2444 // suppression for 30 seconds to avoid doing a relatively expensive
2445 // lookup every time a spam packet is received
2446 auto const isTrusted =
2447 app_.validators().trusted(val->getSignerPublic());
2448
2449 // If the operator has specified that untrusted validations be
2450 // dropped then this happens here I.e. before further wasting CPU
2451 // verifying the signature of an untrusted key
2452 if (!isTrusted)
2453 {
2454 // increase untrusted validations received
2455 overlay_.reportInboundTraffic(
2456 TrafficCount::category::validation_untrusted,
2457 Message::messageSize(*m));
2458
2459 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2460 return;
2461 }
2462
2463 auto key = sha512Half(makeSlice(m->validation()));
2464
2465 auto [added, relayed] =
2466 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2467
2468 if (!added)
2469 {
2470 // Count unique messages (Slots has it's own 'HashRouter'), which a
2471 // peer receives within IDLED seconds since the message has been
2472 // relayed.
2473 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
2474 overlay_.updateSlotAndSquelch(
2475 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2476
2477 // increase duplicate validations received
2478 overlay_.reportInboundTraffic(
2479 TrafficCount::category::validation_duplicate,
2480 Message::messageSize(*m));
2481
2482 JLOG(p_journal_.trace()) << "Validation: duplicate";
2483 return;
2484 }
2485
2486 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2487 {
2488 JLOG(p_journal_.debug())
2489 << "Dropping untrusted validation from diverged peer";
2490 }
2491 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2492 {
2493 std::string const name = [isTrusted, val]() {
2494 std::string ret =
2495 isTrusted ? "Trusted validation" : "Untrusted validation";
2496
2497#ifdef DEBUG
2498 ret += " " +
2499 std::to_string(val->getFieldU32(sfLedgerSequence)) + ": " +
2500 to_string(val->getNodeID());
2501#endif
2502
2503 return ret;
2504 }();
2505
2506 std::weak_ptr<PeerImp> weak = shared_from_this();
2507 app_.getJobQueue().addJob(
2508 isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
2509 name,
2510 [weak, val, m, key]() {
2511 if (auto peer = weak.lock())
2512 peer->checkValidation(val, key, m);
2513 });
2514 }
2515 else
2516 {
2517 JLOG(p_journal_.debug())
2518 << "Dropping untrusted validation for load";
2519 }
2520 }
2521 catch (std::exception const& e)
2522 {
2523 JLOG(p_journal_.warn())
2524 << "Exception processing validation: " << e.what();
2525 using namespace std::string_literals;
2526 fee_.update(Resource::feeMalformedRequest, e.what());
2527 }
2528}
2529
2530void
2532{
2533 protocol::TMGetObjectByHash& packet = *m;
2534
2535 JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
2536 << " " << packet.objects_size();
2537
2538 if (packet.query())
2539 {
2540 // this is a query
2541 if (send_queue_.size() >= Tuning::dropSendQueue)
2542 {
2543 JLOG(p_journal_.debug()) << "GetObject: Large send queue";
2544 return;
2545 }
2546
2547 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2548 {
2549 doFetchPack(m);
2550 return;
2551 }
2552
2553 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2554 {
2555 if (!txReduceRelayEnabled())
2556 {
2557 JLOG(p_journal_.error())
2558 << "TMGetObjectByHash: tx reduce-relay is disabled";
2559 fee_.update(Resource::feeMalformedRequest, "disabled");
2560 return;
2561 }
2562
2563 std::weak_ptr<PeerImp> weak = shared_from_this();
2564 app_.getJobQueue().addJob(
2565 jtREQUESTED_TXN, "doTransactions", [weak, m]() {
2566 if (auto peer = weak.lock())
2567 peer->doTransactions(m);
2568 });
2569 return;
2570 }
2571
2572 protocol::TMGetObjectByHash reply;
2573
2574 reply.set_query(false);
2575
2576 if (packet.has_seq())
2577 reply.set_seq(packet.seq());
2578
2579 reply.set_type(packet.type());
2580
2581 if (packet.has_ledgerhash())
2582 {
2583 if (!stringIsUint256Sized(packet.ledgerhash()))
2584 {
2585 fee_.update(Resource::feeMalformedRequest, "ledger hash");
2586 return;
2587 }
2588
2589 reply.set_ledgerhash(packet.ledgerhash());
2590 }
2591
2592 fee_.update(
2593 Resource::feeModerateBurdenPeer,
2594 " received a get object by hash request");
2595
2596 // This is a very minimal implementation
2597 for (int i = 0; i < packet.objects_size(); ++i)
2598 {
2599 auto const& obj = packet.objects(i);
2600 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2601 {
2602 uint256 const hash{obj.hash()};
2603 // VFALCO TODO Move this someplace more sensible so we dont
2604 // need to inject the NodeStore interfaces.
2605 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2606 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2607 if (nodeObject)
2608 {
2609 protocol::TMIndexedObject& newObj = *reply.add_objects();
2610 newObj.set_hash(hash.begin(), hash.size());
2611 newObj.set_data(
2612 &nodeObject->getData().front(),
2613 nodeObject->getData().size());
2614
2615 if (obj.has_nodeid())
2616 newObj.set_index(obj.nodeid());
2617 if (obj.has_ledgerseq())
2618 newObj.set_ledgerseq(obj.ledgerseq());
2619
2620 // VFALCO NOTE "seq" in the message is obsolete
2621 }
2622 }
2623 }
2624
2625 JLOG(p_journal_.trace()) << "GetObj: " << reply.objects_size() << " of "
2626 << packet.objects_size();
2627 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2628 }
2629 else
2630 {
2631 // this is a reply
2632 std::uint32_t pLSeq = 0;
2633 bool pLDo = true;
2634 bool progress = false;
2635
2636 for (int i = 0; i < packet.objects_size(); ++i)
2637 {
2638 protocol::TMIndexedObject const& obj = packet.objects(i);
2639
2640 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2641 {
2642 if (obj.has_ledgerseq())
2643 {
2644 if (obj.ledgerseq() != pLSeq)
2645 {
2646 if (pLDo && (pLSeq != 0))
2647 {
2648 JLOG(p_journal_.debug())
2649 << "GetObj: Full fetch pack for " << pLSeq;
2650 }
2651 pLSeq = obj.ledgerseq();
2652 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2653
2654 if (!pLDo)
2655 {
2656 JLOG(p_journal_.debug())
2657 << "GetObj: Late fetch pack for " << pLSeq;
2658 }
2659 else
2660 progress = true;
2661 }
2662 }
2663
2664 if (pLDo)
2665 {
2666 uint256 const hash{obj.hash()};
2667
2668 app_.getLedgerMaster().addFetchPack(
2669 hash,
2671 obj.data().begin(), obj.data().end()));
2672 }
2673 }
2674 }
2675
2676 if (pLDo && (pLSeq != 0))
2677 {
2678 JLOG(p_journal_.debug())
2679 << "GetObj: Partial fetch pack for " << pLSeq;
2680 }
2681 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2682 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2683 }
2684}
2685
2686void
2688{
2689 if (!txReduceRelayEnabled())
2690 {
2691 JLOG(p_journal_.error())
2692 << "TMHaveTransactions: tx reduce-relay is disabled";
2693 fee_.update(Resource::feeMalformedRequest, "disabled");
2694 return;
2695 }
2696
2697 std::weak_ptr<PeerImp> weak = shared_from_this();
2698 app_.getJobQueue().addJob(
2699 jtMISSING_TXN, "handleHaveTransactions", [weak, m]() {
2700 if (auto peer = weak.lock())
2701 peer->handleHaveTransactions(m);
2702 });
2703}
2704
2705void
2706PeerImp::handleHaveTransactions(
2708{
2709 protocol::TMGetObjectByHash tmBH;
2710 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2711 tmBH.set_query(true);
2712
2713 JLOG(p_journal_.trace())
2714 << "received TMHaveTransactions " << m->hashes_size();
2715
2716 for (std::uint32_t i = 0; i < m->hashes_size(); i++)
2717 {
2718 if (!stringIsUint256Sized(m->hashes(i)))
2719 {
2720 JLOG(p_journal_.error())
2721 << "TMHaveTransactions with invalid hash size";
2722 fee_.update(Resource::feeMalformedRequest, "hash size");
2723 return;
2724 }
2725
2726 uint256 hash(m->hashes(i));
2727
2728 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2729
2730 JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn;
2731
2732 if (!txn)
2733 {
2734 JLOG(p_journal_.debug()) << "adding transaction to request";
2735
2736 auto obj = tmBH.add_objects();
2737 obj->set_hash(hash.data(), hash.size());
2738 }
2739 else
2740 {
2741 // Erase only if a peer has seen this tx. If the peer has not
2742 // seen this tx then the tx could not has been queued for this
2743 // peer.
2744 removeTxQueue(hash);
2745 }
2746 }
2747
2748 JLOG(p_journal_.trace())
2749 << "transaction request object is " << tmBH.objects_size();
2750
2751 if (tmBH.objects_size() > 0)
2752 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2753}
2754
2755void
2757{
2758 if (!txReduceRelayEnabled())
2759 {
2760 JLOG(p_journal_.error())
2761 << "TMTransactions: tx reduce-relay is disabled";
2762 fee_.update(Resource::feeMalformedRequest, "disabled");
2763 return;
2764 }
2765
2766 JLOG(p_journal_.trace())
2767 << "received TMTransactions " << m->transactions_size();
2768
2769 overlay_.addTxMetrics(m->transactions_size());
2770
2771 for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
2772 handleTransaction(
2774 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2775 false,
2776 true);
2777}
2778
2779void
2780PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
2781{
2782 using on_message_fn =
2784 if (!strand_.running_in_this_thread())
2785 return post(
2786 strand_,
2787 std::bind(
2788 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2789
2790 if (!m->has_validatorpubkey())
2791 {
2792 fee_.update(Resource::feeInvalidData, "squelch no pubkey");
2793 return;
2794 }
2795 auto validator = m->validatorpubkey();
2796 auto const slice{makeSlice(validator)};
2797 if (!publicKeyType(slice))
2798 {
2799 fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
2800 return;
2801 }
2802 PublicKey key(slice);
2803
2804 // Ignore the squelch for validator's own messages.
2805 if (key == app_.getValidationPublicKey())
2806 {
2807 JLOG(p_journal_.debug())
2808 << "onMessage: TMSquelch discarding validator's squelch " << slice;
2809 return;
2810 }
2811
2812 std::uint32_t duration =
2813 m->has_squelchduration() ? m->squelchduration() : 0;
2814 if (!m->squelch())
2815 squelch_.removeSquelch(key);
2816 else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
2817 fee_.update(Resource::feeInvalidData, "squelch duration");
2818
2819 JLOG(p_journal_.debug())
2820 << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
2821}
2822
2823//--------------------------------------------------------------------------
2824
2825void
2826PeerImp::addLedger(
2827 uint256 const& hash,
2828 std::lock_guard<std::mutex> const& lockedRecentLock)
2829{
2830 // lockedRecentLock is passed as a reminder that recentLock_ must be
2831 // locked by the caller.
2832 (void)lockedRecentLock;
2833
2834 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2835 recentLedgers_.end())
2836 return;
2837
2838 recentLedgers_.push_back(hash);
2839}
2840
2841void
2842PeerImp::doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
2843{
2844 // VFALCO TODO Invert this dependency using an observer and shared state
2845 // object. Don't queue fetch pack jobs if we're under load or we already
2846 // have some queued.
2847 if (app_.getFeeTrack().isLoadedLocal() ||
2848 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2849 (app_.getJobQueue().getJobCount(jtPACK) > 10))
2850 {
2851 JLOG(p_journal_.info()) << "Too busy to make fetch pack";
2852 return;
2853 }
2854
2855 if (!stringIsUint256Sized(packet->ledgerhash()))
2856 {
2857 JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
2858 fee_.update(Resource::feeMalformedRequest, "hash size");
2859 return;
2860 }
2861
2862 fee_.fee = Resource::feeHeavyBurdenPeer;
2863
2864 uint256 const hash{packet->ledgerhash()};
2865
2866 std::weak_ptr<PeerImp> weak = shared_from_this();
2867 auto elapsed = UptimeClock::now();
2868 auto const pap = &app_;
2869 app_.getJobQueue().addJob(
2870 jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2871 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2872 });
2873}
2874
2875void
2876PeerImp::doTransactions(
2878{
2879 protocol::TMTransactions reply;
2880
2881 JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
2882 << packet->objects_size();
2883
2884 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2885 {
2886 JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
2887 fee_.update(Resource::feeMalformedRequest, "too big");
2888 return;
2889 }
2890
2891 for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
2892 {
2893 auto const& obj = packet->objects(i);
2894
2895 if (!stringIsUint256Sized(obj.hash()))
2896 {
2897 fee_.update(Resource::feeMalformedRequest, "hash size");
2898 return;
2899 }
2900
2901 uint256 hash(obj.hash());
2902
2903 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2904
2905 if (!txn)
2906 {
2907 JLOG(p_journal_.error()) << "doTransactions, transaction not found "
2908 << Slice(hash.data(), hash.size());
2909 fee_.update(Resource::feeMalformedRequest, "tx not found");
2910 return;
2911 }
2912
2913 Serializer s;
2914 auto tx = reply.add_transactions();
2915 auto sttx = txn->getSTransaction();
2916 sttx->add(s);
2917 tx->set_rawtransaction(s.data(), s.size());
2918 tx->set_status(
2919 txn->getStatus() == INCLUDED ? protocol::tsCURRENT
2920 : protocol::tsNEW);
2921 tx->set_receivetimestamp(
2922 app_.timeKeeper().now().time_since_epoch().count());
2923 tx->set_deferred(txn->getSubmitResult().queued);
2924 }
2925
2926 if (reply.transactions_size() > 0)
2927 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2928}
2929
2930void
2931PeerImp::checkTransaction(
2932 HashRouterFlags flags,
2933 bool checkSignature,
2934 std::shared_ptr<STTx const> const& stx,
2935 bool batch)
2936{
2937 // VFALCO TODO Rewrite to not use exceptions
2938 try
2939 {
2940 // charge strongly for relaying batch txns
2941 // LCOV_EXCL_START
2942 /*
2943 There is no need to check whether the featureBatch amendment is
2944 enabled.
2945
2946 * If the `tfInnerBatchTxn` flag is set, and the amendment is
2947 enabled, then it's an invalid transaction because inner batch
2948 transactions should not be relayed.
2949 * If the `tfInnerBatchTxn` flag is set, and the amendment is *not*
2950 enabled, then the transaction is malformed because it's using an
2951 "unknown" flag. There's no need to waste the resources to send it
2952 to the transaction engine.
2953
2954 We don't normally check transaction validity at this level, but
2955 since we _need_ to check it when the amendment is enabled, we may as
2956 well drop it if the flag is set regardless.
2957 */
2958 if (stx->isFlag(tfInnerBatchTxn))
2959 {
2960 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
2961 "tfInnerBatchTxn (checkSignature).";
2962 charge(Resource::feeModerateBurdenPeer, "inner batch txn");
2963 return;
2964 }
2965 // LCOV_EXCL_STOP
2966
2967 // Expired?
2968 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2969 (stx->getFieldU32(sfLastLedgerSequence) <
2970 app_.getLedgerMaster().getValidLedgerIndex()))
2971 {
2972 JLOG(p_journal_.info())
2973 << "Marking transaction " << stx->getTransactionID()
2974 << "as BAD because it's expired";
2975 app_.getHashRouter().setFlags(
2976 stx->getTransactionID(), HashRouterFlags::BAD);
2977 charge(Resource::feeUselessData, "expired tx");
2978 return;
2979 }
2980
2981 if (isPseudoTx(*stx))
2982 {
2983 // Don't do anything with pseudo transactions except put them in the
2984 // TransactionMaster cache
2985 std::string reason;
2986 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2987 XRPL_ASSERT(
2988 tx->getStatus() == NEW,
2989 "xrpl::PeerImp::checkTransaction Transaction created "
2990 "correctly");
2991 if (tx->getStatus() == NEW)
2992 {
2993 JLOG(p_journal_.debug())
2994 << "Processing " << (batch ? "batch" : "unsolicited")
2995 << " pseudo-transaction tx " << tx->getID();
2996
2997 app_.getMasterTransaction().canonicalize(&tx);
2998 // Tell the overlay about it, but don't relay it.
2999 auto const toSkip =
3000 app_.getHashRouter().shouldRelay(tx->getID());
3001 if (toSkip)
3002 {
3003 JLOG(p_journal_.debug())
3004 << "Passing skipped pseudo pseudo-transaction tx "
3005 << tx->getID();
3006 app_.overlay().relay(tx->getID(), {}, *toSkip);
3007 }
3008 if (!batch)
3009 {
3010 JLOG(p_journal_.debug())
3011 << "Charging for pseudo-transaction tx " << tx->getID();
3012 charge(Resource::feeUselessData, "pseudo tx");
3013 }
3014
3015 return;
3016 }
3017 }
3018
3019 if (checkSignature)
3020 {
3021 // Check the signature before handing off to the job queue.
3022 if (auto [valid, validReason] = checkValidity(
3023 app_.getHashRouter(),
3024 *stx,
3025 app_.getLedgerMaster().getValidatedRules(),
3026 app_.config());
3027 valid != Validity::Valid)
3028 {
3029 if (!validReason.empty())
3030 {
3031 JLOG(p_journal_.debug())
3032 << "Exception checking transaction: " << validReason;
3033 }
3034
3035 // Probably not necessary to set HashRouterFlags::BAD, but
3036 // doesn't hurt.
3037 app_.getHashRouter().setFlags(
3038 stx->getTransactionID(), HashRouterFlags::BAD);
3039 charge(
3040 Resource::feeInvalidSignature,
3041 "check transaction signature failure");
3042 return;
3043 }
3044 }
3045 else
3046 {
3048 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3049 }
3050
3051 std::string reason;
3052 auto tx = std::make_shared<Transaction>(stx, reason, app_);
3053
3054 if (tx->getStatus() == INVALID)
3055 {
3056 if (!reason.empty())
3057 {
3058 JLOG(p_journal_.debug())
3059 << "Exception checking transaction: " << reason;
3060 }
3061 app_.getHashRouter().setFlags(
3062 stx->getTransactionID(), HashRouterFlags::BAD);
3063 charge(Resource::feeInvalidSignature, "tx (impossible)");
3064 return;
3065 }
3066
3067 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
3068 app_.getOPs().processTransaction(
3069 tx, trusted, false, NetworkOPs::FailHard::no);
3070 }
3071 catch (std::exception const& ex)
3072 {
3073 JLOG(p_journal_.warn())
3074 << "Exception in " << __func__ << ": " << ex.what();
3075 app_.getHashRouter().setFlags(
3076 stx->getTransactionID(), HashRouterFlags::BAD);
3077 using namespace std::string_literals;
3078 charge(Resource::feeInvalidData, "tx "s + ex.what());
3079 }
3080}
3081
3082// Called from our JobQueue
3083void
3084PeerImp::checkPropose(
3085 bool isTrusted,
3087 RCLCxPeerPos peerPos)
3088{
3089 JLOG(p_journal_.trace())
3090 << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal";
3091
3092 XRPL_ASSERT(packet, "xrpl::PeerImp::checkPropose : non-null packet");
3093
3094 if (!cluster() && !peerPos.checkSign())
3095 {
3096 std::string desc{"Proposal fails sig check"};
3097 JLOG(p_journal_.warn()) << desc;
3098 charge(Resource::feeInvalidSignature, desc);
3099 return;
3100 }
3101
3102 bool relay;
3103
3104 if (isTrusted)
3105 relay = app_.getOPs().processTrustedProposal(peerPos);
3106 else
3107 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3108
3109 if (relay)
3110 {
3111 // haveMessage contains peers, which are suppressed; i.e. the peers
3112 // are the source of the message, consequently the message should
3113 // not be relayed to these peers. But the message must be counted
3114 // as part of the squelch logic.
3115 auto haveMessage = app_.overlay().relay(
3116 *packet, peerPos.suppressionID(), peerPos.publicKey());
3117 if (!haveMessage.empty())
3118 overlay_.updateSlotAndSquelch(
3119 peerPos.suppressionID(),
3120 peerPos.publicKey(),
3121 std::move(haveMessage),
3122 protocol::mtPROPOSE_LEDGER);
3123 }
3124}
3125
3126void
3127PeerImp::checkValidation(
3129 uint256 const& key,
3131{
3132 if (!val->isValid())
3133 {
3134 std::string desc{"Validation forwarded by peer is invalid"};
3135 JLOG(p_journal_.debug()) << desc;
3136 charge(Resource::feeInvalidSignature, desc);
3137 return;
3138 }
3139
3140 // FIXME it should be safe to remove this try/catch. Investigate codepaths.
3141 try
3142 {
3143 if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
3144 cluster())
3145 {
3146 // haveMessage contains peers, which are suppressed; i.e. the peers
3147 // are the source of the message, consequently the message should
3148 // not be relayed to these peers. But the message must be counted
3149 // as part of the squelch logic.
3150 auto haveMessage =
3151 overlay_.relay(*packet, key, val->getSignerPublic());
3152 if (!haveMessage.empty())
3153 {
3154 overlay_.updateSlotAndSquelch(
3155 key,
3156 val->getSignerPublic(),
3157 std::move(haveMessage),
3158 protocol::mtVALIDATION);
3159 }
3160 }
3161 }
3162 catch (std::exception const& ex)
3163 {
3164 JLOG(p_journal_.trace())
3165 << "Exception processing validation: " << ex.what();
3166 using namespace std::string_literals;
3167 charge(Resource::feeMalformedRequest, "validation "s + ex.what());
3168 }
3169}
3170
3171// Returns the set of peers that can help us get
3172// the TX tree with the specified root hash.
3173//
3175getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
3176{
3178 int retScore = 0;
3179
3181 if (p->hasTxSet(rootHash) && p.get() != skip)
3182 {
3183 auto score = p->getScore(true);
3184 if (!ret || (score > retScore))
3185 {
3186 ret = std::move(p);
3187 retScore = score;
3188 }
3189 }
3190 });
3191
3192 return ret;
3193}
3194
3195// Returns a random peer weighted by how likely to
3196// have the ledger and how responsive it is.
3197//
3200 OverlayImpl& ov,
3201 uint256 const& ledgerHash,
3202 LedgerIndex ledger,
3203 PeerImp const* skip)
3204{
3206 int retScore = 0;
3207
3209 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3210 {
3211 auto score = p->getScore(true);
3212 if (!ret || (score > retScore))
3213 {
3214 ret = std::move(p);
3215 retScore = score;
3216 }
3217 }
3218 });
3219
3220 return ret;
3221}
3222
3223void
3224PeerImp::sendLedgerBase(
3225 std::shared_ptr<Ledger const> const& ledger,
3226 protocol::TMLedgerData& ledgerData)
3227{
3228 JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
3229
3230 Serializer s(sizeof(LedgerHeader));
3231 addRaw(ledger->header(), s);
3232 ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
3233
3234 auto const& stateMap{ledger->stateMap()};
3235 if (stateMap.getHash() != beast::zero)
3236 {
3237 // Return account state root node if possible
3238 Serializer root(768);
3239
3240 stateMap.serializeRoot(root);
3241 ledgerData.add_nodes()->set_nodedata(
3242 root.getDataPtr(), root.getLength());
3243
3244 if (ledger->header().txHash != beast::zero)
3245 {
3246 auto const& txMap{ledger->txMap()};
3247 if (txMap.getHash() != beast::zero)
3248 {
3249 // Return TX root node if possible
3250 root.erase();
3251 txMap.serializeRoot(root);
3252 ledgerData.add_nodes()->set_nodedata(
3253 root.getDataPtr(), root.getLength());
3254 }
3255 }
3256 }
3257
3258 auto message{
3259 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3260 send(message);
3261}
3262
3264PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
3265{
3266 JLOG(p_journal_.trace()) << "getLedger: Ledger";
3267
3269
3270 if (m->has_ledgerhash())
3271 {
3272 // Attempt to find ledger by hash
3273 uint256 const ledgerHash{m->ledgerhash()};
3274 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3275 if (!ledger)
3276 {
3277 JLOG(p_journal_.trace())
3278 << "getLedger: Don't have ledger with hash " << ledgerHash;
3279
3280 if (m->has_querytype() && !m->has_requestcookie())
3281 {
3282 // Attempt to relay the request to a peer
3283 if (auto const peer = getPeerWithLedger(
3284 overlay_,
3285 ledgerHash,
3286 m->has_ledgerseq() ? m->ledgerseq() : 0,
3287 this))
3288 {
3289 m->set_requestcookie(id());
3290 peer->send(
3291 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3292 JLOG(p_journal_.debug())
3293 << "getLedger: Request relayed to peer";
3294 return ledger;
3295 }
3296
3297 JLOG(p_journal_.trace())
3298 << "getLedger: Failed to find peer to relay request";
3299 }
3300 }
3301 }
3302 else if (m->has_ledgerseq())
3303 {
3304 // Attempt to find ledger by sequence
3305 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3306 {
3307 JLOG(p_journal_.debug())
3308 << "getLedger: Early ledger sequence request";
3309 }
3310 else
3311 {
3312 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3313 if (!ledger)
3314 {
3315 JLOG(p_journal_.debug())
3316 << "getLedger: Don't have ledger with sequence "
3317 << m->ledgerseq();
3318 }
3319 }
3320 }
3321 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3322 {
3323 ledger = app_.getLedgerMaster().getClosedLedger();
3324 }
3325
3326 if (ledger)
3327 {
3328 // Validate retrieved ledger sequence
3329 auto const ledgerSeq{ledger->header().seq};
3330 if (m->has_ledgerseq())
3331 {
3332 if (ledgerSeq != m->ledgerseq())
3333 {
3334 // Do not resource charge a peer responding to a relay
3335 if (!m->has_requestcookie())
3336 charge(
3337 Resource::feeMalformedRequest, "get_ledger ledgerSeq");
3338
3339 ledger.reset();
3340 JLOG(p_journal_.warn())
3341 << "getLedger: Invalid ledger sequence " << ledgerSeq;
3342 }
3343 }
3344 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3345 {
3346 ledger.reset();
3347 JLOG(p_journal_.debug())
3348 << "getLedger: Early ledger sequence request " << ledgerSeq;
3349 }
3350 }
3351 else
3352 {
3353 JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
3354 }
3355
3356 return ledger;
3357}
3358
3360PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
3361{
3362 JLOG(p_journal_.trace()) << "getTxSet: TX set";
3363
3364 uint256 const txSetHash{m->ledgerhash()};
3366 app_.getInboundTransactions().getSet(txSetHash, false)};
3367 if (!shaMap)
3368 {
3369 if (m->has_querytype() && !m->has_requestcookie())
3370 {
3371 // Attempt to relay the request to a peer
3372 if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
3373 {
3374 m->set_requestcookie(id());
3375 peer->send(
3376 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3377 JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
3378 }
3379 else
3380 {
3381 JLOG(p_journal_.debug())
3382 << "getTxSet: Failed to find relay peer";
3383 }
3384 }
3385 else
3386 {
3387 JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
3388 }
3389 }
3390
3391 return shaMap;
3392}
3393
3394void
3395PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
3396{
3397 // Do not resource charge a peer responding to a relay
3398 if (!m->has_requestcookie())
3399 charge(
3400 Resource::feeModerateBurdenPeer, "received a get ledger request");
3401
3404 SHAMap const* map{nullptr};
3405 protocol::TMLedgerData ledgerData;
3406 bool fatLeaves{true};
3407 auto const itype{m->itype()};
3408
3409 if (itype == protocol::liTS_CANDIDATE)
3410 {
3411 if (sharedMap = getTxSet(m); !sharedMap)
3412 return;
3413 map = sharedMap.get();
3414
3415 // Fill out the reply
3416 ledgerData.set_ledgerseq(0);
3417 ledgerData.set_ledgerhash(m->ledgerhash());
3418 ledgerData.set_type(protocol::liTS_CANDIDATE);
3419 if (m->has_requestcookie())
3420 ledgerData.set_requestcookie(m->requestcookie());
3421
3422 // We'll already have most transactions
3423 fatLeaves = false;
3424 }
3425 else
3426 {
3427 if (send_queue_.size() >= Tuning::dropSendQueue)
3428 {
3429 JLOG(p_journal_.debug())
3430 << "processLedgerRequest: Large send queue";
3431 return;
3432 }
3433 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3434 {
3435 JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy";
3436 return;
3437 }
3438
3439 if (ledger = getLedger(m); !ledger)
3440 return;
3441
3442 // Fill out the reply
3443 auto const ledgerHash{ledger->header().hash};
3444 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3445 ledgerData.set_ledgerseq(ledger->header().seq);
3446 ledgerData.set_type(itype);
3447 if (m->has_requestcookie())
3448 ledgerData.set_requestcookie(m->requestcookie());
3449
3450 switch (itype)
3451 {
3452 case protocol::liBASE:
3453 sendLedgerBase(ledger, ledgerData);
3454 return;
3455
3456 case protocol::liTX_NODE:
3457 map = &ledger->txMap();
3458 JLOG(p_journal_.trace()) << "processLedgerRequest: TX map hash "
3459 << to_string(map->getHash());
3460 break;
3461
3462 case protocol::liAS_NODE:
3463 map = &ledger->stateMap();
3464 JLOG(p_journal_.trace())
3465 << "processLedgerRequest: Account state map hash "
3466 << to_string(map->getHash());
3467 break;
3468
3469 default:
3470 // This case should not be possible here
3471 JLOG(p_journal_.error())
3472 << "processLedgerRequest: Invalid ledger info type";
3473 return;
3474 }
3475 }
3476
3477 if (!map)
3478 {
3479 JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map";
3480 return;
3481 }
3482
3483 // Add requested node data to reply
3484 if (m->nodeids_size() > 0)
3485 {
3486 auto const queryDepth{
3487 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3488
3490
3491 for (int i = 0; i < m->nodeids_size() &&
3492 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3493 ++i)
3494 {
3495 auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
3496
3497 data.clear();
3498 data.reserve(Tuning::softMaxReplyNodes);
3499
3500 try
3501 {
3502 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3503 {
3504 JLOG(p_journal_.trace())
3505 << "processLedgerRequest: getNodeFat got "
3506 << data.size() << " nodes";
3507
3508 for (auto const& d : data)
3509 {
3510 if (ledgerData.nodes_size() >=
3511 Tuning::hardMaxReplyNodes)
3512 break;
3513 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3514 node->set_nodeid(d.first.getRawString());
3515 node->set_nodedata(d.second.data(), d.second.size());
3516 }
3517 }
3518 else
3519 {
3520 JLOG(p_journal_.warn())
3521 << "processLedgerRequest: getNodeFat returns false";
3522 }
3523 }
3524 catch (std::exception const& e)
3525 {
3526 std::string info;
3527 switch (itype)
3528 {
3529 case protocol::liBASE:
3530 // This case should not be possible here
3531 info = "Ledger base";
3532 break;
3533
3534 case protocol::liTX_NODE:
3535 info = "TX node";
3536 break;
3537
3538 case protocol::liAS_NODE:
3539 info = "AS node";
3540 break;
3541
3542 case protocol::liTS_CANDIDATE:
3543 info = "TS candidate";
3544 break;
3545
3546 default:
3547 info = "Invalid";
3548 break;
3549 }
3550
3551 if (!m->has_ledgerhash())
3552 info += ", no hash specified";
3553
3554 JLOG(p_journal_.warn())
3555 << "processLedgerRequest: getNodeFat with nodeId "
3556 << *shaMapNodeId << " and ledger info type " << info
3557 << " throws exception: " << e.what();
3558 }
3559 }
3560
3561 JLOG(p_journal_.info())
3562 << "processLedgerRequest: Got request for " << m->nodeids_size()
3563 << " nodes at depth " << queryDepth << ", return "
3564 << ledgerData.nodes_size() << " nodes";
3565 }
3566
3567 if (ledgerData.nodes_size() == 0)
3568 return;
3569
3570 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3571}
3572
3573int
3574PeerImp::getScore(bool haveItem) const
3575{
3576 // Random component of score, used to break ties and avoid
3577 // overloading the "best" peer
3578 static int const spRandomMax = 9999;
3579
3580 // Score for being very likely to have the thing we are
3581 // look for; should be roughly spRandomMax
3582 static int const spHaveItem = 10000;
3583
3584 // Score reduction for each millisecond of latency; should
3585 // be roughly spRandomMax divided by the maximum reasonable
3586 // latency
3587 static int const spLatency = 30;
3588
3589 // Penalty for unknown latency; should be roughly spRandomMax
3590 static int const spNoLatency = 8000;
3591
3592 int score = rand_int(spRandomMax);
3593
3594 if (haveItem)
3595 score += spHaveItem;
3596
3598 {
3599 std::lock_guard sl(recentLock_);
3600 latency = latency_;
3601 }
3602
3603 if (latency)
3604 score -= latency->count() * spLatency;
3605 else
3606 score -= spNoLatency;
3607
3608 return score;
3609}
3610
3611bool
3612PeerImp::isHighLatency() const
3613{
3614 std::lock_guard sl(recentLock_);
3615 return latency_ >= peerHighLatency;
3616}
3617
3618void
3619PeerImp::Metrics::add_message(std::uint64_t bytes)
3620{
3621 using namespace std::chrono_literals;
3622 std::unique_lock lock{mutex_};
3623
3624 totalBytes_ += bytes;
3625 accumBytes_ += bytes;
3626 auto const timeElapsed = clock_type::now() - intervalStart_;
3627 auto const timeElapsedInSecs =
3628 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3629
3630 if (timeElapsedInSecs >= 1s)
3631 {
3632 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3633 rollingAvg_.push_back(avgBytes);
3634
3635 auto const totalBytes =
3636 std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3637 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3638
3639 intervalStart_ = clock_type::now();
3640 accumBytes_ = 0;
3641 }
3642}
3643
3645PeerImp::Metrics::average_bytes() const
3646{
3647 std::shared_lock lock{mutex_};
3648 return rollingAvgBytes_;
3649}
3650
3652PeerImp::Metrics::total_bytes() const
3653{
3654 std::shared_lock lock{mutex_};
3655 return totalBytes_;
3656}
3657
3658} // namespace xrpl
T accumulate(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:131
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:56
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:49
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Definition Journal.h:295
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual HashRouter & getHashRouter()=0
virtual Cluster & cluster()=0
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual TimeKeeper & timeKeeper()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual JobQueue & getJobQueue()=0
virtual ValidatorList & validators()=0
virtual NetworkOPs & getOPs()=0
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
Definition Cluster.cpp:38
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
Definition Cluster.cpp:64
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:30
int MAX_TRANSACTIONS
Definition Config.h:208
std::chrono::seconds MAX_DIVERGED_TIME
Definition Config.h:266
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:247
std::chrono::seconds MAX_UNKNOWN_TIME
Definition Config.h:263
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:230
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:148
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:121
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:158
std::chrono::seconds getValidatedLedgerAge()
LedgerIndex getValidLedgerIndex()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:36
virtual bool isNeedNetworkLedger()=0
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
PeerFinder::Manager & peerFinder()
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void onPeerDeactivate(Peer::id_t id)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
Definition PeerImp.h:99
std::optional< std::chrono::milliseconds > latency_
Definition PeerImp.h:161
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
Definition PeerImp.cpp:308
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
Definition PeerImp.cpp:366
std::unique_ptr< LoadEvent > load_event_
Definition PeerImp.h:239
beast::Journal const p_journal_
Definition PeerImp.h:123
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Definition PeerImp.cpp:1147
ProtocolVersion protocol_
Definition PeerImp.h:142
bool txReduceRelayEnabled_
Definition PeerImp.h:251
void close()
Forcibly closes the underlying socket connection.
Definition PeerImp.cpp:667
bool readPending_
Definition PeerImp.h:233
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
Definition PeerImp.cpp:1339
http_request_type request_
Definition PeerImp.h:221
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
Definition PeerImp.cpp:325
std::string name() const
Definition PeerImp.cpp:873
bool txReduceRelayEnabled() const override
Definition PeerImp.h:494
std::shared_ptr< PeerFinder::Slot > const slot_
Definition PeerImp.h:219
boost::beast::http::fields const & headers_
Definition PeerImp.h:223
Compressed compressionEnabled_
Definition PeerImp.h:244
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
Definition PeerImp.cpp:722
boost::system::error_code error_code
Definition PeerImp.h:106
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
Definition PeerImp.cpp:600
LedgerIndex minLedger_
Definition PeerImp.h:153
id_t const id_
Definition PeerImp.h:117
std::string const & fingerprint() const override
Definition PeerImp.h:676
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
Definition PeerImp.cpp:337
bool ledgerReplayEnabled_
Definition PeerImp.h:253
void sendTxQueue() override
Send aggregated transactions' hashes.
Definition PeerImp.cpp:289
uint256 closedLedgerHash_
Definition PeerImp.h:155
reduce_relay::Squelch< UptimeClock > squelch_
Definition PeerImp.h:166
stream_type & stream_
Definition PeerImp.h:126
PeerImp(PeerImp const &)=delete
void cycleStatus() override
Definition PeerImp.cpp:541
std::shared_mutex nameMutex_
Definition PeerImp.h:149
socket_type & socket_
Definition PeerImp.h:125
beast::IP::Endpoint const remote_address_
Definition PeerImp.h:134
int large_sendq_
Definition PeerImp.h:238
std::string domain() const
Definition PeerImp.cpp:880
std::atomic< Tracking > tracking_
Definition PeerImp.h:144
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
Definition PeerImp.cpp:524
clock_type::duration uptime() const
Definition PeerImp.h:424
LedgerIndex maxLedger_
Definition PeerImp.h:154
Application & app_
Definition PeerImp.h:116
PublicKey const publicKey_
Definition PeerImp.h:147
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Definition PeerImp.cpp:1095
Json::Value json() override
Definition PeerImp.cpp:374
bool cluster() const override
Returns true if this connection is a member of the cluster.
Definition PeerImp.cpp:360
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:1023
virtual void run()
Definition PeerImp.cpp:144
OverlayImpl & overlay_
Definition PeerImp.h:138
std::queue< std::shared_ptr< Message > > send_queue_
Definition PeerImp.h:224
virtual ~PeerImp()
Definition PeerImp.cpp:121
static std::string makePrefix(std::string const &fingerprint)
Definition PeerImp.cpp:714
ChargeWithContext fee_
Definition PeerImp.h:218
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
Definition PeerImp.cpp:641
void onMessageUnknown(std::uint16_t type)
Definition PeerImp.cpp:1089
protocol::TMStatusChange last_status_
Definition PeerImp.h:216
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:932
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Definition PeerImp.h:102
std::unique_ptr< stream_type > stream_ptr_
Definition PeerImp.h:124
struct xrpl::PeerImp::@22 metrics_
boost::beast::multi_buffer read_buffer_
Definition PeerImp.h:220
void send(std::shared_ptr< Message > const &m) override
Definition PeerImp.cpp:222
hash_set< uint256 > txQueue_
Definition PeerImp.h:249
bool supportsFeature(ProtocolFeature f) const override
Definition PeerImp.cpp:492
void doProtocolStart()
Definition PeerImp.cpp:890
void shutdown()
Initiates the peer disconnection sequence.
Definition PeerImp.cpp:624
bool const inbound_
Definition PeerImp.h:139
boost::asio::strand< boost::asio::executor > strand_
Definition PeerImp.h:127
waitable_timer timer_
Definition PeerImp.h:130
clock_type::time_point lastPingTime_
Definition PeerImp.h:163
boost::circular_buffer< uint256 > recentLedgers_
Definition PeerImp.h:158
std::string name_
Definition PeerImp.h:148
bool hasTxSet(uint256 const &hash) const override
Definition PeerImp.cpp:533
bool writePending_
Definition PeerImp.h:236
boost::circular_buffer< uint256 > recentTxSets_
Definition PeerImp.h:159
clock_type::time_point trackingTime_
Definition PeerImp.h:145
beast::Journal const journal_
Definition PeerImp.h:122
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
Definition PeerImp.cpp:789
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
Definition PeerImp.cpp:551
uint256 previousLedgerHash_
Definition PeerImp.h:156
Resource::Consumer usage_
Definition PeerImp.h:217
bool shutdownStarted_
Definition PeerImp.h:230
std::optional< std::uint32_t > lastPingSeq_
Definition PeerImp.h:162
bool crawl() const
Returns true if this connection will publicly share its IP address.
Definition PeerImp.cpp:351
void stop() override
Definition PeerImp.cpp:202
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
Definition PeerImp.cpp:693
bool shutdown_
Definition PeerImp.h:227
void doAccept()
Definition PeerImp.cpp:803
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
Definition PeerImp.cpp:561
std::mutex recentLock_
Definition PeerImp.h:215
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
Definition PeerImp.cpp:509
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
Definition PeerImp.h:113
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Definition PeerImp.cpp:1138
Represents a peer connection in the overlay.
A public key.
Definition PublicKey.h:43
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
bool checkSign() const
Verify the signing hash of the proposal.
A consumption charge.
Definition Charge.h:11
An endpoint that consumes resources.
Definition Consumer.h:17
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition Consumer.cpp:105
int balance()
Returns the credit balance representing consumption.
Definition Consumer.cpp:118
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
Definition Consumer.cpp:87
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:78
void const * getDataPtr() const
Definition Serializer.h:204
int getLength() const
Definition Serializer.h:214
std::size_t size() const noexcept
Definition Serializer.h:53
void const * data() const noexcept
Definition Serializer.h:59
An immutable linear range of bytes.
Definition Slice.h:27
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:45
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition base_uint.h:484
pointer data()
Definition base_uint.h:106
static constexpr std::size_t size()
Definition base_uint.h:507
T emplace_back(T... args)
T empty(T... args)
T find(T... args)
T for_each(T... args)
T get(T... args)
T is_same_v
T load(T... args)
T lock(T... args)
T max(T... args)
T min(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
unsigned int UInt
STL namespace.
Charge const feeInvalidData
Charge const feeModerateBurdenPeer
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeTrivialPeer
Charge const feeUselessData
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ sendQueueLogFreq
How often to log send queue size.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ maxQueryDepth
The maximum number of levels to search.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:167
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:205
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
static constexpr char FEATURE_COMPR[]
Definition Handshake.h:122
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:100
@ INVALID
Definition Transaction.h:29
@ INCLUDED
Definition Transaction.h:30
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:42
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:14
std::string base64_decode(std::string_view data)
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:95
Number root(Number f, unsigned d)
Definition Number.cpp:644
static bool stringIsUint256Sized(std::string const &pBuffStr)
Definition PeerImp.cpp:138
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
Definition PeerImp.cpp:3199
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
@ jtMISSING_TXN
Definition Job.h:43
@ jtLEDGER_REQ
Definition Job.h:39
@ jtTXN_DATA
Definition Job.h:49
@ jtPACK
Definition Job.h:23
@ jtPROPOSAL_ut
Definition Job.h:40
@ jtREPLAY_REQ
Definition Job.h:38
@ jtREQUESTED_TXN
Definition Job.h:44
@ jtVALIDATION_ut
Definition Job.h:34
@ jtPEER
Definition Job.h:60
@ jtTRANSACTION
Definition Job.h:42
@ jtVALIDATION_t
Definition Job.h:51
@ jtPROPOSAL_t
Definition Job.h:54
@ jtMANIFEST
Definition Job.h:35
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
Definition Handshake.h:179
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
Definition PublicKey.h:246
HashRouterFlags
Definition HashRouter.h:15
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:25
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
@ manifest
Manifest.
@ proposal
proposal for signing
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
Definition apply.cpp:100
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:225
static constexpr char FEATURE_LEDGER_REPLAY[]
Definition Handshake.h:128
static constexpr char FEATURE_VPRR[]
Definition Handshake.h:124
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:810
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
Definition PeerImp.cpp:3175
static constexpr char FEATURE_TXRR[]
Definition Handshake.h:126
T nth_element(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Information about the notional ledger backing the view.
std::optional< std::uint32_t > networkID
Definition Overlay.h:53
beast::IP::Address public_ip
Definition Overlay.h:50
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Definition PeerImp.h:201
Describes a single consumer.
Definition Gossip.h:18
beast::IP::Endpoint address
Definition Gossip.h:22
Data format for exchanging consumption information across peers.
Definition Gossip.h:13
std::vector< Item > items
Definition Gossip.h:25
T tie(T... args)
T to_string(T... args)
T what(T... args)