rippled
Loading...
Searching...
No Matches
PeerImp.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/perflog/PerfLog.h>
35
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/base64.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/protocol/TxFlags.h>
41#include <xrpl/protocol/digest.h>
42
43#include <boost/algorithm/string/predicate.hpp>
44#include <boost/beast/core/ostream.hpp>
45
46#include <algorithm>
47#include <chrono>
48#include <memory>
49#include <mutex>
50#include <numeric>
51#include <sstream>
52
53using namespace std::chrono_literals;
54
55namespace ripple {
56
57namespace {
59std::chrono::milliseconds constexpr peerHighLatency{300};
60
62std::chrono::seconds constexpr peerTimerInterval{60};
63
65std::chrono::seconds constexpr shutdownTimerInterval{5};
66
67} // namespace
68
69// TODO: Remove this exclusion once unit tests are added after the hotfix
70// release.
71
73 Application& app,
74 id_t id,
76 http_request_type&& request,
77 PublicKey const& publicKey,
79 Resource::Consumer consumer,
81 OverlayImpl& overlay)
82 : Child(overlay)
83 , app_(app)
84 , id_(id)
85 , fingerprint_(
86 getFingerprint(slot->remote_endpoint(), publicKey, to_string(id)))
87 , prefix_(makePrefix(fingerprint_))
88 , sink_(app_.journal("Peer"), prefix_)
89 , p_sink_(app_.journal("Protocol"), prefix_)
90 , journal_(sink_)
91 , p_journal_(p_sink_)
92 , stream_ptr_(std::move(stream_ptr))
93 , socket_(stream_ptr_->next_layer().socket())
94 , stream_(*stream_ptr_)
95 , strand_(boost::asio::make_strand(socket_.get_executor()))
96 , timer_(waitable_timer{socket_.get_executor()})
97 , remote_address_(slot->remote_endpoint())
98 , overlay_(overlay)
99 , inbound_(true)
100 , protocol_(protocol)
101 , tracking_(Tracking::unknown)
102 , trackingTime_(clock_type::now())
103 , publicKey_(publicKey)
104 , lastPingTime_(clock_type::now())
105 , creationTime_(clock_type::now())
106 , squelch_(app_.journal("Squelch"))
107 , usage_(consumer)
108 , fee_{Resource::feeTrivialPeer, ""}
109 , slot_(slot)
110 , request_(std::move(request))
111 , headers_(request_)
112 , compressionEnabled_(
114 headers_,
116 "lz4",
117 app_.config().COMPRESSION)
118 ? Compressed::On
119 : Compressed::Off)
120 , txReduceRelayEnabled_(peerFeatureEnabled(
121 headers_,
123 app_.config().TX_REDUCE_RELAY_ENABLE))
124 , ledgerReplayEnabled_(peerFeatureEnabled(
125 headers_,
127 app_.config().LEDGER_REPLAY))
128 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
129{
130 JLOG(journal_.info())
131 << "compression enabled " << (compressionEnabled_ == Compressed::On)
132 << " vp reduce-relay base squelch enabled "
134 headers_,
137 << " tx reduce-relay enabled " << txReduceRelayEnabled_;
138}
139
141{
142 bool const inCluster{cluster()};
143
148
149 if (inCluster)
150 {
151 JLOG(journal_.warn()) << name() << " left cluster";
152 }
153}
154
155// Helper function to check for valid uint256 values in protobuf buffers
156static bool
158{
159 return pBuffStr.size() == uint256::size();
160}
161
162void
164{
165 if (!strand_.running_in_this_thread())
167
168 auto parseLedgerHash =
170 if (uint256 ret; ret.parseHex(value))
171 return ret;
172
173 if (auto const s = base64_decode(value); s.size() == uint256::size())
174 return uint256{s};
175
176 return std::nullopt;
177 };
178
180 std::optional<uint256> previous;
181
182 if (auto const iter = headers_.find("Closed-Ledger");
183 iter != headers_.end())
184 {
185 closed = parseLedgerHash(iter->value());
186
187 if (!closed)
188 fail("Malformed handshake data (1)");
189 }
190
191 if (auto const iter = headers_.find("Previous-Ledger");
192 iter != headers_.end())
193 {
194 previous = parseLedgerHash(iter->value());
195
196 if (!previous)
197 fail("Malformed handshake data (2)");
198 }
199
200 if (previous && !closed)
201 fail("Malformed handshake data (3)");
202
203 {
205 if (closed)
206 closedLedgerHash_ = *closed;
207 if (previous)
208 previousLedgerHash_ = *previous;
209 }
210
211 if (inbound_)
212 doAccept();
213 else
215
216 // Anything else that needs to be done with the connection should be
217 // done in doProtocolStart
218}
219
220void
222{
223 if (!strand_.running_in_this_thread())
225
226 if (!socket_.is_open())
227 return;
228
229 // The rationale for using different severity levels is that
230 // outbound connections are under our control and may be logged
231 // at a higher level, but inbound connections are more numerous and
232 // uncontrolled so to prevent log flooding the severity is reduced.
233 JLOG(journal_.debug()) << "stop: Stop";
234
235 shutdown();
236}
237
238//------------------------------------------------------------------------------
239
240void
242{
243 if (!strand_.running_in_this_thread())
244 return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
245
246 if (!socket_.is_open())
247 return;
248
249 // we are in progress of closing the connection
250 if (shutdown_)
251 return tryAsyncShutdown();
252
253 auto validator = m->getValidatorKey();
254 if (validator && !squelch_.expireSquelch(*validator))
255 {
258 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
259 return;
260 }
261
262 // report categorized outgoing traffic
264 safe_cast<TrafficCount::category>(m->getCategory()),
265 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
266
267 // report total outgoing traffic
270 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
271
272 auto sendq_size = send_queue_.size();
273
274 if (sendq_size < Tuning::targetSendQueue)
275 {
276 // To detect a peer that does not read from their
277 // side of the connection, we expect a peer to have
278 // a small senq periodically
279 large_sendq_ = 0;
280 }
281 else if (auto sink = journal_.debug();
282 sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
283 {
284 std::string const n = name();
285 sink << n << " sendq: " << sendq_size;
286 }
287
288 send_queue_.push(m);
289
290 if (sendq_size != 0)
291 return;
292
293 writePending_ = true;
294 boost::asio::async_write(
295 stream_,
296 boost::asio::buffer(
297 send_queue_.front()->getBuffer(compressionEnabled_)),
298 bind_executor(
299 strand_,
300 std::bind(
303 std::placeholders::_1,
304 std::placeholders::_2)));
305}
306
307void
309{
310 if (!strand_.running_in_this_thread())
311 return post(
313
314 if (!txQueue_.empty())
315 {
316 protocol::TMHaveTransactions ht;
317 std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
318 ht.add_hashes(hash.data(), hash.size());
319 });
320 JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
321 txQueue_.clear();
322 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
323 }
324}
325
326void
328{
329 if (!strand_.running_in_this_thread())
330 return post(
332
334 {
335 JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap";
336 sendTxQueue();
337 }
338
339 txQueue_.insert(hash);
340 JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
341}
342
343void
345{
346 if (!strand_.running_in_this_thread())
347 return post(
348 strand_,
350
351 auto removed = txQueue_.erase(hash);
352 JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
353}
354
355void
357{
358 if ((usage_.charge(fee, context) == Resource::drop) &&
359 usage_.disconnect(p_journal_) && strand_.running_in_this_thread())
360 {
361 // Sever the connection
363 fail("charge: Resources");
364 }
365}
366
367//------------------------------------------------------------------------------
368
369bool
371{
372 auto const iter = headers_.find("Crawl");
373 if (iter == headers_.end())
374 return false;
375 return boost::iequals(iter->value(), "public");
376}
377
378bool
380{
381 return static_cast<bool>(app_.cluster().member(publicKey_));
382}
383
386{
387 if (inbound_)
388 return headers_["User-Agent"];
389 return headers_["Server"];
390}
391
394{
396
397 ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
398 ret[jss::address] = remote_address_.to_string();
399
400 if (inbound_)
401 ret[jss::inbound] = true;
402
403 if (cluster())
404 {
405 ret[jss::cluster] = true;
406
407 if (auto const n = name(); !n.empty())
408 // Could move here if Json::Value supported moving from a string
409 ret[jss::name] = n;
410 }
411
412 if (auto const d = domain(); !d.empty())
413 ret[jss::server_domain] = std::string{d};
414
415 if (auto const nid = headers_["Network-ID"]; !nid.empty())
416 ret[jss::network_id] = std::string{nid};
417
418 ret[jss::load] = usage_.balance();
419
420 if (auto const version = getVersion(); !version.empty())
421 ret[jss::version] = std::string{version};
422
423 ret[jss::protocol] = to_string(protocol_);
424
425 {
427 if (latency_)
428 ret[jss::latency] = static_cast<Json::UInt>(latency_->count());
429 }
430
431 ret[jss::uptime] = static_cast<Json::UInt>(
432 std::chrono::duration_cast<std::chrono::seconds>(uptime()).count());
433
434 std::uint32_t minSeq, maxSeq;
435 ledgerRange(minSeq, maxSeq);
436
437 if ((minSeq != 0) || (maxSeq != 0))
438 ret[jss::complete_ledgers] =
439 std::to_string(minSeq) + " - " + std::to_string(maxSeq);
440
441 switch (tracking_.load())
442 {
444 ret[jss::track] = "diverged";
445 break;
446
448 ret[jss::track] = "unknown";
449 break;
450
452 // Nothing to do here
453 break;
454 }
455
456 uint256 closedLedgerHash;
457 protocol::TMStatusChange last_status;
458 {
460 closedLedgerHash = closedLedgerHash_;
461 last_status = last_status_;
462 }
463
464 if (closedLedgerHash != beast::zero)
465 ret[jss::ledger] = to_string(closedLedgerHash);
466
467 if (last_status.has_newstatus())
468 {
469 switch (last_status.newstatus())
470 {
471 case protocol::nsCONNECTING:
472 ret[jss::status] = "connecting";
473 break;
474
475 case protocol::nsCONNECTED:
476 ret[jss::status] = "connected";
477 break;
478
479 case protocol::nsMONITORING:
480 ret[jss::status] = "monitoring";
481 break;
482
483 case protocol::nsVALIDATING:
484 ret[jss::status] = "validating";
485 break;
486
487 case protocol::nsSHUTTING:
488 ret[jss::status] = "shutting";
489 break;
490
491 default:
492 JLOG(p_journal_.warn())
493 << "Unknown status: " << last_status.newstatus();
494 }
495 }
496
497 ret[jss::metrics] = Json::Value(Json::objectValue);
498 ret[jss::metrics][jss::total_bytes_recv] =
499 std::to_string(metrics_.recv.total_bytes());
500 ret[jss::metrics][jss::total_bytes_sent] =
501 std::to_string(metrics_.sent.total_bytes());
502 ret[jss::metrics][jss::avg_bps_recv] =
503 std::to_string(metrics_.recv.average_bytes());
504 ret[jss::metrics][jss::avg_bps_sent] =
505 std::to_string(metrics_.sent.average_bytes());
506
507 return ret;
508}
509
510bool
512{
513 switch (f)
514 {
516 return protocol_ >= make_protocol(2, 1);
518 return protocol_ >= make_protocol(2, 2);
521 }
522 return false;
523}
524
525//------------------------------------------------------------------------------
526
527bool
529{
530 {
532 if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
534 return true;
535 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
536 recentLedgers_.end())
537 return true;
538 }
539 return false;
540}
541
542void
544{
546
547 minSeq = minLedger_;
548 maxSeq = maxLedger_;
549}
550
551bool
552PeerImp::hasTxSet(uint256 const& hash) const
553{
555 return std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
556 recentTxSets_.end();
557}
558
559void
561{
562 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
563 // guarded by recentLock_.
567}
568
569bool
571{
573 return (tracking_ != Tracking::diverged) && (uMin >= minLedger_) &&
574 (uMax <= maxLedger_);
575}
576
577//------------------------------------------------------------------------------
578
579void
581{
582 XRPL_ASSERT(
583 strand_.running_in_this_thread(),
584 "ripple::PeerImp::fail : strand in this thread");
585
586 if (!socket_.is_open())
587 return;
588
589 JLOG(journal_.warn()) << name << ": " << ec.message();
590
591 shutdown();
592}
593
594void
596{
597 if (!strand_.running_in_this_thread())
598 return post(
599 strand_,
600 std::bind(
601 (void(Peer::*)(std::string const&)) & PeerImp::fail,
603 reason));
604
605 if (!socket_.is_open())
606 return;
607
608 // Call to name() locks, log only if the message will be outputed
610 {
611 std::string const n = name();
612 JLOG(journal_.warn()) << n << " failed: " << reason;
613 }
614
615 shutdown();
616}
617
618void
620{
621 XRPL_ASSERT(
622 strand_.running_in_this_thread(),
623 "ripple::PeerImp::tryAsyncShutdown : strand in this thread");
624
626 return;
627
629 return;
630
631 shutdownStarted_ = true;
632
633 setTimer(shutdownTimerInterval);
634
635 // gracefully shutdown the SSL socket, performing a shutdown handshake
636 stream_.async_shutdown(bind_executor(
637 strand_,
638 std::bind(
639 &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
640}
641
642void
644{
645 XRPL_ASSERT(
646 strand_.running_in_this_thread(),
647 "ripple::PeerImp::shutdown: strand in this thread");
648
649 if (!socket_.is_open() || shutdown_)
650 return;
651
652 shutdown_ = true;
653
654 boost::beast::get_lowest_layer(stream_).cancel();
655
657}
658
659void
661{
662 cancelTimer();
663 if (ec)
664 {
665 // - eof: the stream was cleanly closed
666 // - operation_aborted: an expired timer (slow shutdown)
667 // - stream_truncated: the tcp connection closed (no handshake) it could
668 // occur if a peer does not perform a graceful disconnect
669 // - broken_pipe: the peer is gone
670 bool shouldLog =
671 (ec != boost::asio::error::eof &&
672 ec != boost::asio::error::operation_aborted &&
673 ec.message().find("application data after close notify") ==
674 std::string::npos);
675
676 if (shouldLog)
677 {
678 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
679 }
680 }
681
682 close();
683}
684
685void
687{
688 XRPL_ASSERT(
689 strand_.running_in_this_thread(),
690 "ripple::PeerImp::close : strand in this thread");
691
692 if (!socket_.is_open())
693 return;
694
695 cancelTimer();
696
697 error_code ec;
698 socket_.close(ec);
699
701
702 // The rationale for using different severity levels is that
703 // outbound connections are under our control and may be logged
704 // at a higher level, but inbound connections are more numerous and
705 // uncontrolled so to prevent log flooding the severity is reduced.
706 JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed";
707}
708
709//------------------------------------------------------------------------------
710
711void
713{
714 try
715 {
716 timer_.expires_after(interval);
717 }
718 catch (std::exception const& ex)
719 {
720 JLOG(journal_.error()) << "setTimer: " << ex.what();
721 return shutdown();
722 }
723
724 timer_.async_wait(bind_executor(
725 strand_,
726 std::bind(
727 &PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
728}
729
730//------------------------------------------------------------------------------
731
734{
736 ss << "[" << fingerprint << "] ";
737 return ss.str();
738}
739
740void
742{
743 XRPL_ASSERT(
744 strand_.running_in_this_thread(),
745 "ripple::PeerImp::onTimer : strand in this thread");
746
747 if (!socket_.is_open())
748 return;
749
750 if (ec)
751 {
752 // do not initiate shutdown, timers are frequently cancelled
753 if (ec == boost::asio::error::operation_aborted)
754 return;
755
756 // This should never happen
757 JLOG(journal_.error()) << "onTimer: " << ec.message();
758 return close();
759 }
760
761 // the timer expired before the shutdown completed
762 // force close the connection
763 if (shutdown_)
764 {
765 JLOG(journal_.debug()) << "onTimer: shutdown timer expired";
766 return close();
767 }
768
770 return fail("Large send queue");
771
772 if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
773 {
774 clock_type::duration duration;
775
776 {
778 duration = clock_type::now() - trackingTime_;
779 }
780
781 if ((t == Tracking::diverged &&
782 (duration > app_.config().MAX_DIVERGED_TIME)) ||
783 (t == Tracking::unknown &&
784 (duration > app_.config().MAX_UNKNOWN_TIME)))
785 {
787 return fail("Not useful");
788 }
789 }
790
791 // Already waiting for PONG
792 if (lastPingSeq_)
793 return fail("Ping Timeout");
794
796 lastPingSeq_ = rand_int<std::uint32_t>();
797
798 protocol::TMPing message;
799 message.set_type(protocol::TMPing::ptPING);
800 message.set_seq(*lastPingSeq_);
801
802 send(std::make_shared<Message>(message, protocol::mtPING));
803
804 setTimer(peerTimerInterval);
805}
806
807void
809{
810 try
811 {
812 timer_.cancel();
813 }
814 catch (std::exception const& ex)
815 {
816 JLOG(journal_.error()) << "cancelTimer: " << ex.what();
817 }
818}
819
820//------------------------------------------------------------------------------
821void
823{
824 XRPL_ASSERT(
825 read_buffer_.size() == 0,
826 "ripple::PeerImp::doAccept : empty read buffer");
827
828 JLOG(journal_.debug()) << "doAccept";
829
830 // a shutdown was initiated before the handshake, there is nothing to do
831 if (shutdown_)
832 return tryAsyncShutdown();
833
834 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
835
836 // This shouldn't fail since we already computed
837 // the shared value successfully in OverlayImpl
838 if (!sharedValue)
839 return fail("makeSharedValue: Unexpected failure");
840
841 JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_);
842
843 if (auto member = app_.cluster().member(publicKey_))
844 {
845 {
847 name_ = *member;
848 }
849 JLOG(journal_.info()) << "Cluster name: " << *member;
850 }
851
853
854 // XXX Set timer: connection is in grace period to be useful.
855 // XXX Set timer: connection idle (idle may vary depending on connection
856 // type.)
857
859
860 boost::beast::ostream(*write_buffer) << makeResponse(
862 request_,
865 *sharedValue,
867 protocol_,
868 app_);
869
870 // Write the whole buffer and only start protocol when that's done.
871 boost::asio::async_write(
872 stream_,
873 write_buffer->data(),
874 boost::asio::transfer_all(),
875 bind_executor(
876 strand_,
877 [this, write_buffer, self = shared_from_this()](
878 error_code ec, std::size_t bytes_transferred) {
879 if (!socket_.is_open())
880 return;
881 if (ec == boost::asio::error::operation_aborted)
882 return tryAsyncShutdown();
883 if (ec)
884 return fail("onWriteResponse", ec);
885 if (write_buffer->size() == bytes_transferred)
886 return doProtocolStart();
887 return fail("Failed to write header");
888 }));
889}
890
893{
894 std::shared_lock read_lock{nameMutex_};
895 return name_;
896}
897
900{
901 return headers_["Server-Domain"];
902}
903
904//------------------------------------------------------------------------------
905
906// Protocol logic
907
908void
910{
911 // a shutdown was initiated before the handshare, there is nothing to do
912 if (shutdown_)
913 return tryAsyncShutdown();
914
916
917 // Send all the validator lists that have been loaded
919 {
921 [&](std::string const& manifest,
922 std::uint32_t version,
924 PublicKey const& pubKey,
925 std::size_t maxSequence,
926 uint256 const& hash) {
928 *this,
929 0,
930 pubKey,
931 maxSequence,
932 version,
933 manifest,
934 blobInfos,
936 p_journal_);
937
938 // Don't send it next time.
940 });
941 }
942
943 if (auto m = overlay_.getManifestsMessage())
944 send(m);
945
946 setTimer(peerTimerInterval);
947}
948
949// Called repeatedly with protocol message data
950void
952{
953 XRPL_ASSERT(
954 strand_.running_in_this_thread(),
955 "ripple::PeerImp::onReadMessage : strand in this thread");
956
957 readPending_ = false;
958
959 if (!socket_.is_open())
960 return;
961
962 if (ec)
963 {
964 if (ec == boost::asio::error::eof)
965 {
966 JLOG(journal_.debug()) << "EOF";
967 return shutdown();
968 }
969
970 if (ec == boost::asio::error::operation_aborted)
971 return tryAsyncShutdown();
972
973 return fail("onReadMessage", ec);
974 }
975 // we started shutdown, no reason to process further data
976 if (shutdown_)
977 return tryAsyncShutdown();
978
979 if (auto stream = journal_.trace())
980 {
981 stream << "onReadMessage: "
982 << (bytes_transferred > 0
983 ? to_string(bytes_transferred) + " bytes"
984 : "");
985 }
986
987 metrics_.recv.add_message(bytes_transferred);
988
989 read_buffer_.commit(bytes_transferred);
990
991 auto hint = Tuning::readBufferBytes;
992
993 while (read_buffer_.size() > 0)
994 {
995 std::size_t bytes_consumed;
996
997 using namespace std::chrono_literals;
998 std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
999 [&]() {
1000 return invokeProtocolMessage(read_buffer_.data(), *this, hint);
1001 },
1002 "invokeProtocolMessage",
1003 350ms,
1004 journal_);
1005
1006 if (!socket_.is_open())
1007 return;
1008
1009 // the error_code is produced by invokeProtocolMessage
1010 // it could be due to a bad message
1011 if (ec)
1012 return fail("onReadMessage", ec);
1013
1014 if (bytes_consumed == 0)
1015 break;
1016
1017 read_buffer_.consume(bytes_consumed);
1018 }
1019
1020 // check if a shutdown was initiated while processing messages
1021 if (shutdown_)
1022 return tryAsyncShutdown();
1023
1024 readPending_ = true;
1025
1026 XRPL_ASSERT(
1027 !shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started");
1028
1029 // Timeout on writes only
1030 stream_.async_read_some(
1032 bind_executor(
1033 strand_,
1034 std::bind(
1037 std::placeholders::_1,
1038 std::placeholders::_2)));
1039}
1040
1041void
1043{
1044 XRPL_ASSERT(
1045 strand_.running_in_this_thread(),
1046 "ripple::PeerImp::onWriteMessage : strand in this thread");
1047
1048 writePending_ = false;
1049
1050 if (!socket_.is_open())
1051 return;
1052
1053 if (ec)
1054 {
1055 if (ec == boost::asio::error::operation_aborted)
1056 return tryAsyncShutdown();
1057
1058 return fail("onWriteMessage", ec);
1059 }
1060
1061 if (auto stream = journal_.trace())
1062 {
1063 stream << "onWriteMessage: "
1064 << (bytes_transferred > 0
1065 ? to_string(bytes_transferred) + " bytes"
1066 : "");
1067 }
1068
1069 metrics_.sent.add_message(bytes_transferred);
1070
1071 XRPL_ASSERT(
1072 !send_queue_.empty(),
1073 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
1074 send_queue_.pop();
1075
1076 if (shutdown_)
1077 return tryAsyncShutdown();
1078
1079 if (!send_queue_.empty())
1080 {
1081 writePending_ = true;
1082 XRPL_ASSERT(
1084 "ripple::PeerImp::onWriteMessage : shutdown started");
1085
1086 // Timeout on writes only
1087 return boost::asio::async_write(
1088 stream_,
1089 boost::asio::buffer(
1090 send_queue_.front()->getBuffer(compressionEnabled_)),
1091 bind_executor(
1092 strand_,
1093 std::bind(
1096 std::placeholders::_1,
1097 std::placeholders::_2)));
1098 }
1099}
1100
1101//------------------------------------------------------------------------------
1102//
1103// ProtocolHandler
1104//
1105//------------------------------------------------------------------------------
1106
1107void
1109{
1110 // TODO
1111}
1112
1113void
1115 std::uint16_t type,
1117 std::size_t size,
1118 std::size_t uncompressed_size,
1119 bool isCompressed)
1120{
1121 auto const name = protocolMessageName(type);
1124
1125 auto const category = TrafficCount::categorize(
1126 *m, static_cast<protocol::MessageType>(type), true);
1127
1128 // report total incoming traffic
1130 TrafficCount::category::total, static_cast<int>(size));
1131
1132 // increase the traffic received for a specific category
1133 overlay_.reportInboundTraffic(category, static_cast<int>(size));
1134
1135 using namespace protocol;
1136 if ((type == MessageType::mtTRANSACTION ||
1137 type == MessageType::mtHAVE_TRANSACTIONS ||
1138 type == MessageType::mtTRANSACTIONS ||
1139 // GET_OBJECTS
1141 // GET_LEDGER
1144 // LEDGER_DATA
1148 {
1150 static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
1151 }
1152 JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
1153 << uncompressed_size << " " << isCompressed;
1154}
1155
1156void
1164
1165void
1167{
1168 auto const s = m->list_size();
1169
1170 if (s == 0)
1171 {
1173 return;
1174 }
1175
1176 if (s > 100)
1178
1180 jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
1181 overlay_.onManifests(m, that);
1182 });
1183}
1184
1185void
1187{
1188 if (m->type() == protocol::TMPing::ptPING)
1189 {
1190 // We have received a ping request, reply with a pong
1192 m->set_type(protocol::TMPing::ptPONG);
1193 send(std::make_shared<Message>(*m, protocol::mtPING));
1194 return;
1195 }
1196
1197 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1198 {
1199 // Only reset the ping sequence if we actually received a
1200 // PONG with the correct cookie. That way, any peers which
1201 // respond with incorrect cookies will eventually time out.
1202 if (m->seq() == lastPingSeq_)
1203 {
1205
1206 // Update latency estimate
1207 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1209
1211
1212 if (latency_)
1213 latency_ = (*latency_ * 7 + rtt) / 8;
1214 else
1215 latency_ = rtt;
1216 }
1217
1218 return;
1219 }
1220}
1221
1222void
1224{
1225 // VFALCO NOTE I think we should drop the peer immediately
1226 if (!cluster())
1227 {
1228 fee_.update(Resource::feeUselessData, "unknown cluster");
1229 return;
1230 }
1231
1232 for (int i = 0; i < m->clusternodes().size(); ++i)
1233 {
1234 protocol::TMClusterNode const& node = m->clusternodes(i);
1235
1237 if (node.has_nodename())
1238 name = node.nodename();
1239
1240 auto const publicKey =
1241 parseBase58<PublicKey>(TokenType::NodePublic, node.publickey());
1242
1243 // NIKB NOTE We should drop the peer immediately if
1244 // they send us a public key we can't parse
1245 if (publicKey)
1246 {
1247 auto const reportTime =
1248 NetClock::time_point{NetClock::duration{node.reporttime()}};
1249
1251 *publicKey, name, node.nodeload(), reportTime);
1252 }
1253 }
1254
1255 int loadSources = m->loadsources().size();
1256 if (loadSources != 0)
1257 {
1258 Resource::Gossip gossip;
1259 gossip.items.reserve(loadSources);
1260 for (int i = 0; i < m->loadsources().size(); ++i)
1261 {
1262 protocol::TMLoadSource const& node = m->loadsources(i);
1264 item.address = beast::IP::Endpoint::from_string(node.name());
1265 item.balance = node.cost();
1266 if (item.address != beast::IP::Endpoint())
1267 gossip.items.push_back(item);
1268 }
1270 }
1271
1272 // Calculate the cluster fee:
1273 auto const thresh = app_.timeKeeper().now() - 90s;
1274 std::uint32_t clusterFee = 0;
1275
1277 fees.reserve(app_.cluster().size());
1278
1279 app_.cluster().for_each([&fees, thresh](ClusterNode const& status) {
1280 if (status.getReportTime() >= thresh)
1281 fees.push_back(status.getLoadFee());
1282 });
1283
1284 if (!fees.empty())
1285 {
1286 auto const index = fees.size() / 2;
1287 std::nth_element(fees.begin(), fees.begin() + index, fees.end());
1288 clusterFee = fees[index];
1289 }
1290
1291 app_.getFeeTrack().setClusterFee(clusterFee);
1292}
1293
1294void
1296{
1297 // Don't allow endpoints from peers that are not known tracking or are
1298 // not using a version of the message that we support:
1299 if (tracking_.load() != Tracking::converged || m->version() != 2)
1300 return;
1301
1302 // The number is arbitrary and doesn't have any real significance or
1303 // implication for the protocol.
1304 if (m->endpoints_v2().size() >= 1024)
1305 {
1306 fee_.update(Resource::feeUselessData, "endpoints too large");
1307 return;
1308 }
1309
1311 endpoints.reserve(m->endpoints_v2().size());
1312
1313 auto malformed = 0;
1314 for (auto const& tm : m->endpoints_v2())
1315 {
1316 auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
1317
1318 if (!result)
1319 {
1320 JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
1321 << tm.endpoint() << "}";
1322 malformed++;
1323 continue;
1324 }
1325
1326 // If hops == 0, this Endpoint describes the peer we are connected
1327 // to -- in that case, we take the remote address seen on the
1328 // socket and store that in the IP::Endpoint. If this is the first
1329 // time, then we'll verify that their listener can receive incoming
1330 // by performing a connectivity test. if hops > 0, then we just
1331 // take the address/port we were given
1332 if (tm.hops() == 0)
1333 result = remote_address_.at_port(result->port());
1334
1335 endpoints.emplace_back(*result, tm.hops());
1336 }
1337
1338 // Charge the peer for each malformed endpoint. As there still may be
1339 // multiple valid endpoints we don't return early.
1340 if (malformed > 0)
1341 {
1342 fee_.update(
1343 Resource::feeInvalidData * malformed,
1344 std::to_string(malformed) + " malformed endpoints");
1345 }
1346
1347 if (!endpoints.empty())
1348 overlay_.peerFinder().on_endpoints(slot_, endpoints);
1349}
1350
1351void
1356
1357void
1360 bool eraseTxQueue,
1361 bool batch)
1362{
1363 XRPL_ASSERT(
1364 eraseTxQueue != batch,
1365 ("ripple::PeerImp::handleTransaction : valid inputs"));
1367 return;
1368
1370 {
1371 // If we've never been in synch, there's nothing we can do
1372 // with a transaction
1373 JLOG(p_journal_.debug()) << "Ignoring incoming transaction: "
1374 << "Need network ledger";
1375 return;
1376 }
1377
1378 SerialIter sit(makeSlice(m->rawtransaction()));
1379
1380 try
1381 {
1382 auto stx = std::make_shared<STTx const>(sit);
1383 uint256 txID = stx->getTransactionID();
1384
1385 // Charge strongly for attempting to relay a txn with tfInnerBatchTxn
1386 // LCOV_EXCL_START
1387 if (stx->isFlag(tfInnerBatchTxn) &&
1388 getCurrentTransactionRules()->enabled(featureBatch))
1389 {
1390 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
1391 "tfInnerBatchTxn (handleTransaction).";
1392 fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
1393 return;
1394 }
1395 // LCOV_EXCL_STOP
1396
1397 HashRouterFlags flags;
1398 constexpr std::chrono::seconds tx_interval = 10s;
1399
1400 if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
1401 {
1402 // we have seen this transaction recently
1403 if (any(flags & HashRouterFlags::BAD))
1404 {
1405 fee_.update(Resource::feeUselessData, "known bad");
1406 JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
1407 }
1408
1409 // Erase only if the server has seen this tx. If the server has not
1410 // seen this tx then the tx could not has been queued for this peer.
1411 else if (eraseTxQueue && txReduceRelayEnabled())
1412 removeTxQueue(txID);
1413
1417
1418 return;
1419 }
1420
1421 JLOG(p_journal_.debug()) << "Got tx " << txID;
1422
1423 bool checkSignature = true;
1424 if (cluster())
1425 {
1426 if (!m->has_deferred() || !m->deferred())
1427 {
1428 // Skip local checks if a server we trust
1429 // put the transaction in its open ledger
1430 flags |= HashRouterFlags::TRUSTED;
1431 }
1432
1433 // for non-validator nodes only -- localPublicKey is set for
1434 // validators only
1436 {
1437 // For now, be paranoid and have each validator
1438 // check each transaction, regardless of source
1439 checkSignature = false;
1440 }
1441 }
1442
1444 {
1445 JLOG(p_journal_.trace())
1446 << "No new transactions until synchronized";
1447 }
1448 else if (
1451 {
1453 JLOG(p_journal_.info()) << "Transaction queue is full";
1454 }
1455 else
1456 {
1459 "recvTransaction->checkTransaction",
1461 flags,
1462 checkSignature,
1463 batch,
1464 stx]() {
1465 if (auto peer = weak.lock())
1466 peer->checkTransaction(
1467 flags, checkSignature, stx, batch);
1468 });
1469 }
1470 }
1471 catch (std::exception const& ex)
1472 {
1473 JLOG(p_journal_.warn())
1474 << "Transaction invalid: " << strHex(m->rawtransaction())
1475 << ". Exception: " << ex.what();
1476 }
1477}
1478
1479void
1481{
1482 auto badData = [&](std::string const& msg) {
1483 fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
1484 JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
1485 };
1486 auto const itype{m->itype()};
1487
1488 // Verify ledger info type
1489 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1490 return badData("Invalid ledger info type");
1491
1492 auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
1493 if (m->has_ltype())
1494 return m->ltype();
1495 return std::nullopt;
1496 }();
1497
1498 if (itype == protocol::liTS_CANDIDATE)
1499 {
1500 if (!m->has_ledgerhash())
1501 return badData("Invalid TX candidate set, missing TX set hash");
1502 }
1503 else if (
1504 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1505 !(ltype && *ltype == protocol::ltCLOSED))
1506 {
1507 return badData("Invalid request");
1508 }
1509
1510 // Verify ledger type
1511 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1512 return badData("Invalid ledger type");
1513
1514 // Verify ledger hash
1515 if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash()))
1516 return badData("Invalid ledger hash");
1517
1518 // Verify ledger sequence
1519 if (m->has_ledgerseq())
1520 {
1521 auto const ledgerSeq{m->ledgerseq()};
1522
1523 // Check if within a reasonable range
1524 using namespace std::chrono_literals;
1526 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1527 {
1528 return badData(
1529 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1530 }
1531 }
1532
1533 // Verify ledger node IDs
1534 if (itype != protocol::liBASE)
1535 {
1536 if (m->nodeids_size() <= 0)
1537 return badData("Invalid ledger node IDs");
1538
1539 for (auto const& nodeId : m->nodeids())
1540 {
1542 return badData("Invalid SHAMap node ID");
1543 }
1544 }
1545
1546 // Verify query type
1547 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1548 return badData("Invalid query type");
1549
1550 // Verify query depth
1551 if (m->has_querydepth())
1552 {
1553 if (m->querydepth() > Tuning::maxQueryDepth ||
1554 itype == protocol::liBASE)
1555 {
1556 return badData("Invalid query depth");
1557 }
1558 }
1559
1560 // Queue a job to process the request
1562 app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
1563 if (auto peer = weak.lock())
1564 peer->processLedgerRequest(m);
1565 });
1566}
1567
1568void
1570{
1571 JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
1573 {
1574 fee_.update(
1575 Resource::feeMalformedRequest, "proof_path_request disabled");
1576 return;
1577 }
1578
1579 fee_.update(
1580 Resource::feeModerateBurdenPeer, "received a proof path request");
1583 jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
1584 if (auto peer = weak.lock())
1585 {
1586 auto reply =
1587 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1588 if (reply.has_error())
1589 {
1590 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1591 peer->charge(
1592 Resource::feeMalformedRequest,
1593 "proof_path_request");
1594 else
1595 peer->charge(
1596 Resource::feeRequestNoReply, "proof_path_request");
1597 }
1598 else
1599 {
1600 peer->send(std::make_shared<Message>(
1601 reply, protocol::mtPROOF_PATH_RESPONSE));
1602 }
1603 }
1604 });
1605}
1606
1607void
1609{
1610 if (!ledgerReplayEnabled_)
1611 {
1612 fee_.update(
1613 Resource::feeMalformedRequest, "proof_path_response disabled");
1614 return;
1615 }
1616
1617 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1618 {
1619 fee_.update(Resource::feeInvalidData, "proof_path_response");
1620 }
1621}
1622
1623void
1625{
1626 JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
1627 if (!ledgerReplayEnabled_)
1628 {
1629 fee_.update(
1630 Resource::feeMalformedRequest, "replay_delta_request disabled");
1631 return;
1632 }
1633
1634 fee_.fee = Resource::feeModerateBurdenPeer;
1635 std::weak_ptr<PeerImp> weak = shared_from_this();
1636 app_.getJobQueue().addJob(
1637 jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
1638 if (auto peer = weak.lock())
1639 {
1640 auto reply =
1641 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1642 if (reply.has_error())
1643 {
1644 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1645 peer->charge(
1646 Resource::feeMalformedRequest,
1647 "replay_delta_request");
1648 else
1649 peer->charge(
1650 Resource::feeRequestNoReply,
1651 "replay_delta_request");
1652 }
1653 else
1654 {
1655 peer->send(std::make_shared<Message>(
1656 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1657 }
1658 }
1659 });
1660}
1661
1662void
1664{
1665 if (!ledgerReplayEnabled_)
1666 {
1667 fee_.update(
1668 Resource::feeMalformedRequest, "replay_delta_response disabled");
1669 return;
1670 }
1671
1672 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1673 {
1674 fee_.update(Resource::feeInvalidData, "replay_delta_response");
1675 }
1676}
1677
1678void
1680{
1681 auto badData = [&](std::string const& msg) {
1682 fee_.update(Resource::feeInvalidData, msg);
1683 JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
1684 };
1685
1686 // Verify ledger hash
1687 if (!stringIsUint256Sized(m->ledgerhash()))
1688 return badData("Invalid ledger hash");
1689
1690 // Verify ledger sequence
1691 {
1692 auto const ledgerSeq{m->ledgerseq()};
1693 if (m->type() == protocol::liTS_CANDIDATE)
1694 {
1695 if (ledgerSeq != 0)
1696 {
1697 return badData(
1698 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1699 }
1700 }
1701 else
1702 {
1703 // Check if within a reasonable range
1704 using namespace std::chrono_literals;
1705 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1706 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1707 {
1708 return badData(
1709 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1710 }
1711 }
1712 }
1713
1714 // Verify ledger info type
1715 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1716 return badData("Invalid ledger info type");
1717
1718 // Verify reply error
1719 if (m->has_error() &&
1720 (m->error() < protocol::reNO_LEDGER ||
1721 m->error() > protocol::reBAD_REQUEST))
1722 {
1723 return badData("Invalid reply error");
1724 }
1725
1726 // Verify ledger nodes.
1727 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1728 {
1729 return badData(
1730 "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
1731 }
1732
1733 // If there is a request cookie, attempt to relay the message
1734 if (m->has_requestcookie())
1735 {
1736 if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1737 {
1738 m->clear_requestcookie();
1739 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1740 }
1741 else
1742 {
1743 JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
1744 }
1745 return;
1746 }
1747
1748 uint256 const ledgerHash{m->ledgerhash()};
1749
1750 // Otherwise check if received data for a candidate transaction set
1751 if (m->type() == protocol::liTS_CANDIDATE)
1752 {
1753 std::weak_ptr<PeerImp> weak{shared_from_this()};
1754 app_.getJobQueue().addJob(
1755 jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() {
1756 if (auto peer = weak.lock())
1757 {
1758 peer->app_.getInboundTransactions().gotData(
1759 ledgerHash, peer, m);
1760 }
1761 });
1762 return;
1763 }
1764
1765 // Consume the message
1766 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1767}
1768
1769void
1771{
1772 protocol::TMProposeSet& set = *m;
1773
1774 auto const sig = makeSlice(set.signature());
1775
1776 // Preliminary check for the validity of the signature: A DER encoded
1777 // signature can't be longer than 72 bytes.
1778 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1779 (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
1780 {
1781 JLOG(p_journal_.warn()) << "Proposal: malformed";
1782 fee_.update(
1783 Resource::feeInvalidSignature,
1784 " signature can't be longer than 72 bytes");
1785 return;
1786 }
1787
1788 if (!stringIsUint256Sized(set.currenttxhash()) ||
1789 !stringIsUint256Sized(set.previousledger()))
1790 {
1791 JLOG(p_journal_.warn()) << "Proposal: malformed";
1792 fee_.update(Resource::feeMalformedRequest, "bad hashes");
1793 return;
1794 }
1795
1796 // RH TODO: when isTrusted = false we should probably also cache a key
1797 // suppression for 30 seconds to avoid doing a relatively expensive lookup
1798 // every time a spam packet is received
1799 PublicKey const publicKey{makeSlice(set.nodepubkey())};
1800 auto const isTrusted = app_.validators().trusted(publicKey);
1801
1802 // If the operator has specified that untrusted proposals be dropped then
1803 // this happens here I.e. before further wasting CPU verifying the signature
1804 // of an untrusted key
1805 if (!isTrusted)
1806 {
1807 // report untrusted proposal messages
1808 overlay_.reportInboundTraffic(
1809 TrafficCount::category::proposal_untrusted,
1810 Message::messageSize(*m));
1811
1812 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1813 return;
1814 }
1815
1816 uint256 const proposeHash{set.currenttxhash()};
1817 uint256 const prevLedger{set.previousledger()};
1818
1819 NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
1820
1821 uint256 const suppression = proposalUniqueId(
1822 proposeHash,
1823 prevLedger,
1824 set.proposeseq(),
1825 closeTime,
1826 publicKey.slice(),
1827 sig);
1828
1829 if (auto [added, relayed] =
1830 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1831 !added)
1832 {
1833 // Count unique messages (Slots has it's own 'HashRouter'), which a peer
1834 // receives within IDLED seconds since the message has been relayed.
1835 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
1836 overlay_.updateSlotAndSquelch(
1837 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1838
1839 // report duplicate proposal messages
1840 overlay_.reportInboundTraffic(
1841 TrafficCount::category::proposal_duplicate,
1842 Message::messageSize(*m));
1843
1844 JLOG(p_journal_.trace()) << "Proposal: duplicate";
1845
1846 return;
1847 }
1848
1849 if (!isTrusted)
1850 {
1851 if (tracking_.load() == Tracking::diverged)
1852 {
1853 JLOG(p_journal_.debug())
1854 << "Proposal: Dropping untrusted (peer divergence)";
1855 return;
1856 }
1857
1858 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1859 {
1860 JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
1861 return;
1862 }
1863 }
1864
1865 JLOG(p_journal_.trace())
1866 << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
1867
1868 auto proposal = RCLCxPeerPos(
1869 publicKey,
1870 sig,
1871 suppression,
1873 prevLedger,
1874 set.proposeseq(),
1875 proposeHash,
1876 closeTime,
1877 app_.timeKeeper().closeTime(),
1878 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1879
1880 std::weak_ptr<PeerImp> weak = shared_from_this();
1881 app_.getJobQueue().addJob(
1882 isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
1883 "recvPropose->checkPropose",
1884 [weak, isTrusted, m, proposal]() {
1885 if (auto peer = weak.lock())
1886 peer->checkPropose(isTrusted, m, proposal);
1887 });
1888}
1889
1890void
1892{
1893 JLOG(p_journal_.trace()) << "Status: Change";
1894
1895 if (!m->has_networktime())
1896 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1897
1898 {
1899 std::lock_guard sl(recentLock_);
1900 if (!last_status_.has_newstatus() || m->has_newstatus())
1901 last_status_ = *m;
1902 else
1903 {
1904 // preserve old status
1905 protocol::NodeStatus status = last_status_.newstatus();
1906 last_status_ = *m;
1907 m->set_newstatus(status);
1908 }
1909 }
1910
1911 if (m->newevent() == protocol::neLOST_SYNC)
1912 {
1913 bool outOfSync{false};
1914 {
1915 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1916 // guarded by recentLock_.
1917 std::lock_guard sl(recentLock_);
1918 if (!closedLedgerHash_.isZero())
1919 {
1920 outOfSync = true;
1921 closedLedgerHash_.zero();
1922 }
1923 previousLedgerHash_.zero();
1924 }
1925 if (outOfSync)
1926 {
1927 JLOG(p_journal_.debug()) << "Status: Out of sync";
1928 }
1929 return;
1930 }
1931
1932 {
1933 uint256 closedLedgerHash{};
1934 bool const peerChangedLedgers{
1935 m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
1936
1937 {
1938 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1939 // guarded by recentLock_.
1940 std::lock_guard sl(recentLock_);
1941 if (peerChangedLedgers)
1942 {
1943 closedLedgerHash_ = m->ledgerhash();
1944 closedLedgerHash = closedLedgerHash_;
1945 addLedger(closedLedgerHash, sl);
1946 }
1947 else
1948 {
1949 closedLedgerHash_.zero();
1950 }
1951
1952 if (m->has_ledgerhashprevious() &&
1953 stringIsUint256Sized(m->ledgerhashprevious()))
1954 {
1955 previousLedgerHash_ = m->ledgerhashprevious();
1956 addLedger(previousLedgerHash_, sl);
1957 }
1958 else
1959 {
1960 previousLedgerHash_.zero();
1961 }
1962 }
1963 if (peerChangedLedgers)
1964 {
1965 JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
1966 }
1967 else
1968 {
1969 JLOG(p_journal_.debug()) << "Status: No ledger";
1970 }
1971 }
1972
1973 if (m->has_firstseq() && m->has_lastseq())
1974 {
1975 std::lock_guard sl(recentLock_);
1976
1977 minLedger_ = m->firstseq();
1978 maxLedger_ = m->lastseq();
1979
1980 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1981 minLedger_ = maxLedger_ = 0;
1982 }
1983
1984 if (m->has_ledgerseq() &&
1985 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1986 {
1987 checkTracking(
1988 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1989 }
1990
1991 app_.getOPs().pubPeerStatus([=, this]() -> Json::Value {
1993
1994 if (m->has_newstatus())
1995 {
1996 switch (m->newstatus())
1997 {
1998 case protocol::nsCONNECTING:
1999 j[jss::status] = "CONNECTING";
2000 break;
2001 case protocol::nsCONNECTED:
2002 j[jss::status] = "CONNECTED";
2003 break;
2004 case protocol::nsMONITORING:
2005 j[jss::status] = "MONITORING";
2006 break;
2007 case protocol::nsVALIDATING:
2008 j[jss::status] = "VALIDATING";
2009 break;
2010 case protocol::nsSHUTTING:
2011 j[jss::status] = "SHUTTING";
2012 break;
2013 }
2014 }
2015
2016 if (m->has_newevent())
2017 {
2018 switch (m->newevent())
2019 {
2020 case protocol::neCLOSING_LEDGER:
2021 j[jss::action] = "CLOSING_LEDGER";
2022 break;
2023 case protocol::neACCEPTED_LEDGER:
2024 j[jss::action] = "ACCEPTED_LEDGER";
2025 break;
2026 case protocol::neSWITCHED_LEDGER:
2027 j[jss::action] = "SWITCHED_LEDGER";
2028 break;
2029 case protocol::neLOST_SYNC:
2030 j[jss::action] = "LOST_SYNC";
2031 break;
2032 }
2033 }
2034
2035 if (m->has_ledgerseq())
2036 {
2037 j[jss::ledger_index] = m->ledgerseq();
2038 }
2039
2040 if (m->has_ledgerhash())
2041 {
2042 uint256 closedLedgerHash{};
2043 {
2044 std::lock_guard sl(recentLock_);
2045 closedLedgerHash = closedLedgerHash_;
2046 }
2047 j[jss::ledger_hash] = to_string(closedLedgerHash);
2048 }
2049
2050 if (m->has_networktime())
2051 {
2052 j[jss::date] = Json::UInt(m->networktime());
2053 }
2054
2055 if (m->has_firstseq() && m->has_lastseq())
2056 {
2057 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2058 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2059 }
2060
2061 return j;
2062 });
2063}
2064
2065void
2066PeerImp::checkTracking(std::uint32_t validationSeq)
2067{
2068 std::uint32_t serverSeq;
2069 {
2070 // Extract the sequence number of the highest
2071 // ledger this peer has
2072 std::lock_guard sl(recentLock_);
2073
2074 serverSeq = maxLedger_;
2075 }
2076 if (serverSeq != 0)
2077 {
2078 // Compare the peer's ledger sequence to the
2079 // sequence of a recently-validated ledger
2080 checkTracking(serverSeq, validationSeq);
2081 }
2082}
2083
2084void
2085PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
2086{
2087 int diff = std::max(seq1, seq2) - std::min(seq1, seq2);
2088
2089 if (diff < Tuning::convergedLedgerLimit)
2090 {
2091 // The peer's ledger sequence is close to the validation's
2092 tracking_ = Tracking::converged;
2093 }
2094
2095 if ((diff > Tuning::divergedLedgerLimit) &&
2096 (tracking_.load() != Tracking::diverged))
2097 {
2098 // The peer's ledger sequence is way off the validation's
2099 std::lock_guard sl(recentLock_);
2100
2101 tracking_ = Tracking::diverged;
2102 trackingTime_ = clock_type::now();
2103 }
2104}
2105
2106void
2108{
2109 if (!stringIsUint256Sized(m->hash()))
2110 {
2111 fee_.update(Resource::feeMalformedRequest, "bad hash");
2112 return;
2113 }
2114
2115 uint256 const hash{m->hash()};
2116
2117 if (m->status() == protocol::tsHAVE)
2118 {
2119 std::lock_guard sl(recentLock_);
2120
2121 if (std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2122 recentTxSets_.end())
2123 {
2124 fee_.update(Resource::feeUselessData, "duplicate (tsHAVE)");
2125 return;
2126 }
2127
2128 recentTxSets_.push_back(hash);
2129 }
2130}
2131
2132void
2133PeerImp::onValidatorListMessage(
2134 std::string const& messageType,
2135 std::string const& manifest,
2136 std::uint32_t version,
2137 std::vector<ValidatorBlobInfo> const& blobs)
2138{
2139 // If there are no blobs, the message is malformed (possibly because of
2140 // ValidatorList class rules), so charge accordingly and skip processing.
2141 if (blobs.empty())
2142 {
2143 JLOG(p_journal_.warn()) << "Ignored malformed " << messageType;
2144 // This shouldn't ever happen with a well-behaved peer
2145 fee_.update(Resource::feeHeavyBurdenPeer, "no blobs");
2146 return;
2147 }
2148
2149 auto const hash = sha512Half(manifest, blobs, version);
2150
2151 JLOG(p_journal_.debug()) << "Received " << messageType;
2152
2153 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2154 {
2155 JLOG(p_journal_.debug())
2156 << messageType << ": received duplicate " << messageType;
2157 // Charging this fee here won't hurt the peer in the normal
2158 // course of operation (ie. refresh every 5 minutes), but
2159 // will add up if the peer is misbehaving.
2160 fee_.update(Resource::feeUselessData, "duplicate");
2161 return;
2162 }
2163
2164 auto const applyResult = app_.validators().applyListsAndBroadcast(
2165 manifest,
2166 version,
2167 blobs,
2168 remote_address_.to_string(),
2169 hash,
2170 app_.overlay(),
2171 app_.getHashRouter(),
2172 app_.getOPs());
2173
2174 JLOG(p_journal_.debug())
2175 << "Processed " << messageType << " version " << version << " from "
2176 << (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
2177 : "unknown or invalid publisher")
2178 << " with best result " << to_string(applyResult.bestDisposition());
2179
2180 // Act based on the best result
2181 switch (applyResult.bestDisposition())
2182 {
2183 // New list
2184 case ListDisposition::accepted:
2185 // Newest list is expired, and that needs to be broadcast, too
2186 case ListDisposition::expired:
2187 // Future list
2188 case ListDisposition::pending: {
2189 std::lock_guard<std::mutex> sl(recentLock_);
2190
2191 XRPL_ASSERT(
2192 applyResult.publisherKey,
2193 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2194 "set");
2195 auto const& pubKey = *applyResult.publisherKey;
2196#ifndef NDEBUG
2197 if (auto const iter = publisherListSequences_.find(pubKey);
2198 iter != publisherListSequences_.end())
2199 {
2200 XRPL_ASSERT(
2201 iter->second < applyResult.sequence,
2202 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2203 }
2204#endif
2205 publisherListSequences_[pubKey] = applyResult.sequence;
2206 }
2207 break;
2208 case ListDisposition::same_sequence:
2209 case ListDisposition::known_sequence:
2210#ifndef NDEBUG
2211 {
2212 std::lock_guard<std::mutex> sl(recentLock_);
2213 XRPL_ASSERT(
2214 applyResult.sequence && applyResult.publisherKey,
2215 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2216 "and set publisher key");
2217 XRPL_ASSERT(
2218 publisherListSequences_[*applyResult.publisherKey] <=
2219 applyResult.sequence,
2220 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2221 }
2222#endif // !NDEBUG
2223
2224 break;
2225 case ListDisposition::stale:
2226 case ListDisposition::untrusted:
2227 case ListDisposition::invalid:
2228 case ListDisposition::unsupported_version:
2229 break;
2230 // LCOV_EXCL_START
2231 default:
2232 UNREACHABLE(
2233 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2234 "disposition");
2235 // LCOV_EXCL_STOP
2236 }
2237
2238 // Charge based on the worst result
2239 switch (applyResult.worstDisposition())
2240 {
2241 case ListDisposition::accepted:
2242 case ListDisposition::expired:
2243 case ListDisposition::pending:
2244 // No charges for good data
2245 break;
2246 case ListDisposition::same_sequence:
2247 case ListDisposition::known_sequence:
2248 // Charging this fee here won't hurt the peer in the normal
2249 // course of operation (ie. refresh every 5 minutes), but
2250 // will add up if the peer is misbehaving.
2251 fee_.update(
2252 Resource::feeUselessData,
2253 " duplicate (same_sequence or known_sequence)");
2254 break;
2255 case ListDisposition::stale:
2256 // There are very few good reasons for a peer to send an
2257 // old list, particularly more than once.
2258 fee_.update(Resource::feeInvalidData, "expired");
2259 break;
2260 case ListDisposition::untrusted:
2261 // Charging this fee here won't hurt the peer in the normal
2262 // course of operation (ie. refresh every 5 minutes), but
2263 // will add up if the peer is misbehaving.
2264 fee_.update(Resource::feeUselessData, "untrusted");
2265 break;
2266 case ListDisposition::invalid:
2267 // This shouldn't ever happen with a well-behaved peer
2268 fee_.update(
2269 Resource::feeInvalidSignature, "invalid list disposition");
2270 break;
2271 case ListDisposition::unsupported_version:
2272 // During a version transition, this may be legitimate.
2273 // If it happens frequently, that's probably bad.
2274 fee_.update(Resource::feeInvalidData, "version");
2275 break;
2276 // LCOV_EXCL_START
2277 default:
2278 UNREACHABLE(
2279 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2280 "disposition");
2281 // LCOV_EXCL_STOP
2282 }
2283
2284 // Log based on all the results.
2285 for (auto const& [disp, count] : applyResult.dispositions)
2286 {
2287 switch (disp)
2288 {
2289 // New list
2290 case ListDisposition::accepted:
2291 JLOG(p_journal_.debug())
2292 << "Applied " << count << " new " << messageType;
2293 break;
2294 // Newest list is expired, and that needs to be broadcast, too
2295 case ListDisposition::expired:
2296 JLOG(p_journal_.debug())
2297 << "Applied " << count << " expired " << messageType;
2298 break;
2299 // Future list
2300 case ListDisposition::pending:
2301 JLOG(p_journal_.debug())
2302 << "Processed " << count << " future " << messageType;
2303 break;
2304 case ListDisposition::same_sequence:
2305 JLOG(p_journal_.warn())
2306 << "Ignored " << count << " " << messageType
2307 << "(s) with current sequence";
2308 break;
2309 case ListDisposition::known_sequence:
2310 JLOG(p_journal_.warn())
2311 << "Ignored " << count << " " << messageType
2312 << "(s) with future sequence";
2313 break;
2314 case ListDisposition::stale:
2315 JLOG(p_journal_.warn())
2316 << "Ignored " << count << "stale " << messageType;
2317 break;
2318 case ListDisposition::untrusted:
2319 JLOG(p_journal_.warn())
2320 << "Ignored " << count << " untrusted " << messageType;
2321 break;
2322 case ListDisposition::unsupported_version:
2323 JLOG(p_journal_.warn())
2324 << "Ignored " << count << "unsupported version "
2325 << messageType;
2326 break;
2327 case ListDisposition::invalid:
2328 JLOG(p_journal_.warn())
2329 << "Ignored " << count << "invalid " << messageType;
2330 break;
2331 // LCOV_EXCL_START
2332 default:
2333 UNREACHABLE(
2334 "ripple::PeerImp::onValidatorListMessage : invalid list "
2335 "disposition");
2336 // LCOV_EXCL_STOP
2337 }
2338 }
2339}
2340
2341void
2343{
2344 try
2345 {
2346 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2347 {
2348 JLOG(p_journal_.debug())
2349 << "ValidatorList: received validator list from peer using "
2350 << "protocol version " << to_string(protocol_)
2351 << " which shouldn't support this feature.";
2352 fee_.update(Resource::feeUselessData, "unsupported peer");
2353 return;
2354 }
2355 onValidatorListMessage(
2356 "ValidatorList",
2357 m->manifest(),
2358 m->version(),
2359 ValidatorList::parseBlobs(*m));
2360 }
2361 catch (std::exception const& e)
2362 {
2363 JLOG(p_journal_.warn()) << "ValidatorList: Exception, " << e.what();
2364 using namespace std::string_literals;
2365 fee_.update(Resource::feeInvalidData, e.what());
2366 }
2367}
2368
2369void
2370PeerImp::onMessage(
2372{
2373 try
2374 {
2375 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2376 {
2377 JLOG(p_journal_.debug())
2378 << "ValidatorListCollection: received validator list from peer "
2379 << "using protocol version " << to_string(protocol_)
2380 << " which shouldn't support this feature.";
2381 fee_.update(Resource::feeUselessData, "unsupported peer");
2382 return;
2383 }
2384 else if (m->version() < 2)
2385 {
2386 JLOG(p_journal_.debug())
2387 << "ValidatorListCollection: received invalid validator list "
2388 "version "
2389 << m->version() << " from peer using protocol version "
2390 << to_string(protocol_);
2391 fee_.update(Resource::feeInvalidData, "wrong version");
2392 return;
2393 }
2394 onValidatorListMessage(
2395 "ValidatorListCollection",
2396 m->manifest(),
2397 m->version(),
2398 ValidatorList::parseBlobs(*m));
2399 }
2400 catch (std::exception const& e)
2401 {
2402 JLOG(p_journal_.warn())
2403 << "ValidatorListCollection: Exception, " << e.what();
2404 using namespace std::string_literals;
2405 fee_.update(Resource::feeInvalidData, e.what());
2406 }
2407}
2408
2409void
2411{
2412 if (m->validation().size() < 50)
2413 {
2414 JLOG(p_journal_.warn()) << "Validation: Too small";
2415 fee_.update(Resource::feeMalformedRequest, "too small");
2416 return;
2417 }
2418
2419 try
2420 {
2421 auto const closeTime = app_.timeKeeper().closeTime();
2422
2424 {
2425 SerialIter sit(makeSlice(m->validation()));
2427 std::ref(sit),
2428 [this](PublicKey const& pk) {
2429 return calcNodeID(
2430 app_.validatorManifests().getMasterKey(pk));
2431 },
2432 false);
2433 val->setSeen(closeTime);
2434 }
2435
2436 if (!isCurrent(
2437 app_.getValidations().parms(),
2438 app_.timeKeeper().closeTime(),
2439 val->getSignTime(),
2440 val->getSeenTime()))
2441 {
2442 JLOG(p_journal_.trace()) << "Validation: Not current";
2443 fee_.update(Resource::feeUselessData, "not current");
2444 return;
2445 }
2446
2447 // RH TODO: when isTrusted = false we should probably also cache a key
2448 // suppression for 30 seconds to avoid doing a relatively expensive
2449 // lookup every time a spam packet is received
2450 auto const isTrusted =
2451 app_.validators().trusted(val->getSignerPublic());
2452
2453 // If the operator has specified that untrusted validations be
2454 // dropped then this happens here I.e. before further wasting CPU
2455 // verifying the signature of an untrusted key
2456 if (!isTrusted)
2457 {
2458 // increase untrusted validations received
2459 overlay_.reportInboundTraffic(
2460 TrafficCount::category::validation_untrusted,
2461 Message::messageSize(*m));
2462
2463 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2464 return;
2465 }
2466
2467 auto key = sha512Half(makeSlice(m->validation()));
2468
2469 auto [added, relayed] =
2470 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2471
2472 if (!added)
2473 {
2474 // Count unique messages (Slots has it's own 'HashRouter'), which a
2475 // peer receives within IDLED seconds since the message has been
2476 // relayed.
2477 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
2478 overlay_.updateSlotAndSquelch(
2479 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2480
2481 // increase duplicate validations received
2482 overlay_.reportInboundTraffic(
2483 TrafficCount::category::validation_duplicate,
2484 Message::messageSize(*m));
2485
2486 JLOG(p_journal_.trace()) << "Validation: duplicate";
2487 return;
2488 }
2489
2490 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2491 {
2492 JLOG(p_journal_.debug())
2493 << "Dropping untrusted validation from diverged peer";
2494 }
2495 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2496 {
2497 std::string const name = [isTrusted, val]() {
2498 std::string ret =
2499 isTrusted ? "Trusted validation" : "Untrusted validation";
2500
2501#ifdef DEBUG
2502 ret += " " +
2503 std::to_string(val->getFieldU32(sfLedgerSequence)) + ": " +
2504 to_string(val->getNodeID());
2505#endif
2506
2507 return ret;
2508 }();
2509
2510 std::weak_ptr<PeerImp> weak = shared_from_this();
2511 app_.getJobQueue().addJob(
2512 isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
2513 name,
2514 [weak, val, m, key]() {
2515 if (auto peer = weak.lock())
2516 peer->checkValidation(val, key, m);
2517 });
2518 }
2519 else
2520 {
2521 JLOG(p_journal_.debug())
2522 << "Dropping untrusted validation for load";
2523 }
2524 }
2525 catch (std::exception const& e)
2526 {
2527 JLOG(p_journal_.warn())
2528 << "Exception processing validation: " << e.what();
2529 using namespace std::string_literals;
2530 fee_.update(Resource::feeMalformedRequest, e.what());
2531 }
2532}
2533
2534void
2536{
2537 protocol::TMGetObjectByHash& packet = *m;
2538
2539 JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
2540 << " " << packet.objects_size();
2541
2542 if (packet.query())
2543 {
2544 // this is a query
2545 if (send_queue_.size() >= Tuning::dropSendQueue)
2546 {
2547 JLOG(p_journal_.debug()) << "GetObject: Large send queue";
2548 return;
2549 }
2550
2551 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2552 {
2553 doFetchPack(m);
2554 return;
2555 }
2556
2557 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2558 {
2559 if (!txReduceRelayEnabled())
2560 {
2561 JLOG(p_journal_.error())
2562 << "TMGetObjectByHash: tx reduce-relay is disabled";
2563 fee_.update(Resource::feeMalformedRequest, "disabled");
2564 return;
2565 }
2566
2567 std::weak_ptr<PeerImp> weak = shared_from_this();
2568 app_.getJobQueue().addJob(
2569 jtREQUESTED_TXN, "doTransactions", [weak, m]() {
2570 if (auto peer = weak.lock())
2571 peer->doTransactions(m);
2572 });
2573 return;
2574 }
2575
2576 protocol::TMGetObjectByHash reply;
2577
2578 reply.set_query(false);
2579
2580 if (packet.has_seq())
2581 reply.set_seq(packet.seq());
2582
2583 reply.set_type(packet.type());
2584
2585 if (packet.has_ledgerhash())
2586 {
2587 if (!stringIsUint256Sized(packet.ledgerhash()))
2588 {
2589 fee_.update(Resource::feeMalformedRequest, "ledger hash");
2590 return;
2591 }
2592
2593 reply.set_ledgerhash(packet.ledgerhash());
2594 }
2595
2596 fee_.update(
2597 Resource::feeModerateBurdenPeer,
2598 " received a get object by hash request");
2599
2600 // This is a very minimal implementation
2601 for (int i = 0; i < packet.objects_size(); ++i)
2602 {
2603 auto const& obj = packet.objects(i);
2604 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2605 {
2606 uint256 const hash{obj.hash()};
2607 // VFALCO TODO Move this someplace more sensible so we dont
2608 // need to inject the NodeStore interfaces.
2609 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2610 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2611 if (nodeObject)
2612 {
2613 protocol::TMIndexedObject& newObj = *reply.add_objects();
2614 newObj.set_hash(hash.begin(), hash.size());
2615 newObj.set_data(
2616 &nodeObject->getData().front(),
2617 nodeObject->getData().size());
2618
2619 if (obj.has_nodeid())
2620 newObj.set_index(obj.nodeid());
2621 if (obj.has_ledgerseq())
2622 newObj.set_ledgerseq(obj.ledgerseq());
2623
2624 // VFALCO NOTE "seq" in the message is obsolete
2625 }
2626 }
2627 }
2628
2629 JLOG(p_journal_.trace()) << "GetObj: " << reply.objects_size() << " of "
2630 << packet.objects_size();
2631 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2632 }
2633 else
2634 {
2635 // this is a reply
2636 std::uint32_t pLSeq = 0;
2637 bool pLDo = true;
2638 bool progress = false;
2639
2640 for (int i = 0; i < packet.objects_size(); ++i)
2641 {
2642 protocol::TMIndexedObject const& obj = packet.objects(i);
2643
2644 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2645 {
2646 if (obj.has_ledgerseq())
2647 {
2648 if (obj.ledgerseq() != pLSeq)
2649 {
2650 if (pLDo && (pLSeq != 0))
2651 {
2652 JLOG(p_journal_.debug())
2653 << "GetObj: Full fetch pack for " << pLSeq;
2654 }
2655 pLSeq = obj.ledgerseq();
2656 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2657
2658 if (!pLDo)
2659 {
2660 JLOG(p_journal_.debug())
2661 << "GetObj: Late fetch pack for " << pLSeq;
2662 }
2663 else
2664 progress = true;
2665 }
2666 }
2667
2668 if (pLDo)
2669 {
2670 uint256 const hash{obj.hash()};
2671
2672 app_.getLedgerMaster().addFetchPack(
2673 hash,
2675 obj.data().begin(), obj.data().end()));
2676 }
2677 }
2678 }
2679
2680 if (pLDo && (pLSeq != 0))
2681 {
2682 JLOG(p_journal_.debug())
2683 << "GetObj: Partial fetch pack for " << pLSeq;
2684 }
2685 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2686 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2687 }
2688}
2689
2690void
2692{
2693 if (!txReduceRelayEnabled())
2694 {
2695 JLOG(p_journal_.error())
2696 << "TMHaveTransactions: tx reduce-relay is disabled";
2697 fee_.update(Resource::feeMalformedRequest, "disabled");
2698 return;
2699 }
2700
2701 std::weak_ptr<PeerImp> weak = shared_from_this();
2702 app_.getJobQueue().addJob(
2703 jtMISSING_TXN, "handleHaveTransactions", [weak, m]() {
2704 if (auto peer = weak.lock())
2705 peer->handleHaveTransactions(m);
2706 });
2707}
2708
2709void
2710PeerImp::handleHaveTransactions(
2712{
2713 protocol::TMGetObjectByHash tmBH;
2714 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2715 tmBH.set_query(true);
2716
2717 JLOG(p_journal_.trace())
2718 << "received TMHaveTransactions " << m->hashes_size();
2719
2720 for (std::uint32_t i = 0; i < m->hashes_size(); i++)
2721 {
2722 if (!stringIsUint256Sized(m->hashes(i)))
2723 {
2724 JLOG(p_journal_.error())
2725 << "TMHaveTransactions with invalid hash size";
2726 fee_.update(Resource::feeMalformedRequest, "hash size");
2727 return;
2728 }
2729
2730 uint256 hash(m->hashes(i));
2731
2732 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2733
2734 JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn;
2735
2736 if (!txn)
2737 {
2738 JLOG(p_journal_.debug()) << "adding transaction to request";
2739
2740 auto obj = tmBH.add_objects();
2741 obj->set_hash(hash.data(), hash.size());
2742 }
2743 else
2744 {
2745 // Erase only if a peer has seen this tx. If the peer has not
2746 // seen this tx then the tx could not has been queued for this
2747 // peer.
2748 removeTxQueue(hash);
2749 }
2750 }
2751
2752 JLOG(p_journal_.trace())
2753 << "transaction request object is " << tmBH.objects_size();
2754
2755 if (tmBH.objects_size() > 0)
2756 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2757}
2758
2759void
2761{
2762 if (!txReduceRelayEnabled())
2763 {
2764 JLOG(p_journal_.error())
2765 << "TMTransactions: tx reduce-relay is disabled";
2766 fee_.update(Resource::feeMalformedRequest, "disabled");
2767 return;
2768 }
2769
2770 JLOG(p_journal_.trace())
2771 << "received TMTransactions " << m->transactions_size();
2772
2773 overlay_.addTxMetrics(m->transactions_size());
2774
2775 for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
2776 handleTransaction(
2778 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2779 false,
2780 true);
2781}
2782
2783void
2784PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
2785{
2786 using on_message_fn =
2788 if (!strand_.running_in_this_thread())
2789 return post(
2790 strand_,
2791 std::bind(
2792 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2793
2794 if (!m->has_validatorpubkey())
2795 {
2796 fee_.update(Resource::feeInvalidData, "squelch no pubkey");
2797 return;
2798 }
2799 auto validator = m->validatorpubkey();
2800 auto const slice{makeSlice(validator)};
2801 if (!publicKeyType(slice))
2802 {
2803 fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
2804 return;
2805 }
2806 PublicKey key(slice);
2807
2808 // Ignore the squelch for validator's own messages.
2809 if (key == app_.getValidationPublicKey())
2810 {
2811 JLOG(p_journal_.debug())
2812 << "onMessage: TMSquelch discarding validator's squelch " << slice;
2813 return;
2814 }
2815
2816 std::uint32_t duration =
2817 m->has_squelchduration() ? m->squelchduration() : 0;
2818 if (!m->squelch())
2819 squelch_.removeSquelch(key);
2820 else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
2821 fee_.update(Resource::feeInvalidData, "squelch duration");
2822
2823 JLOG(p_journal_.debug())
2824 << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
2825}
2826
2827//--------------------------------------------------------------------------
2828
2829void
2830PeerImp::addLedger(
2831 uint256 const& hash,
2832 std::lock_guard<std::mutex> const& lockedRecentLock)
2833{
2834 // lockedRecentLock is passed as a reminder that recentLock_ must be
2835 // locked by the caller.
2836 (void)lockedRecentLock;
2837
2838 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2839 recentLedgers_.end())
2840 return;
2841
2842 recentLedgers_.push_back(hash);
2843}
2844
2845void
2846PeerImp::doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
2847{
2848 // VFALCO TODO Invert this dependency using an observer and shared state
2849 // object. Don't queue fetch pack jobs if we're under load or we already
2850 // have some queued.
2851 if (app_.getFeeTrack().isLoadedLocal() ||
2852 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2853 (app_.getJobQueue().getJobCount(jtPACK) > 10))
2854 {
2855 JLOG(p_journal_.info()) << "Too busy to make fetch pack";
2856 return;
2857 }
2858
2859 if (!stringIsUint256Sized(packet->ledgerhash()))
2860 {
2861 JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
2862 fee_.update(Resource::feeMalformedRequest, "hash size");
2863 return;
2864 }
2865
2866 fee_.fee = Resource::feeHeavyBurdenPeer;
2867
2868 uint256 const hash{packet->ledgerhash()};
2869
2870 std::weak_ptr<PeerImp> weak = shared_from_this();
2871 auto elapsed = UptimeClock::now();
2872 auto const pap = &app_;
2873 app_.getJobQueue().addJob(
2874 jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2875 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2876 });
2877}
2878
2879void
2880PeerImp::doTransactions(
2882{
2883 protocol::TMTransactions reply;
2884
2885 JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
2886 << packet->objects_size();
2887
2888 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2889 {
2890 JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
2891 fee_.update(Resource::feeMalformedRequest, "too big");
2892 return;
2893 }
2894
2895 for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
2896 {
2897 auto const& obj = packet->objects(i);
2898
2899 if (!stringIsUint256Sized(obj.hash()))
2900 {
2901 fee_.update(Resource::feeMalformedRequest, "hash size");
2902 return;
2903 }
2904
2905 uint256 hash(obj.hash());
2906
2907 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2908
2909 if (!txn)
2910 {
2911 JLOG(p_journal_.error()) << "doTransactions, transaction not found "
2912 << Slice(hash.data(), hash.size());
2913 fee_.update(Resource::feeMalformedRequest, "tx not found");
2914 return;
2915 }
2916
2917 Serializer s;
2918 auto tx = reply.add_transactions();
2919 auto sttx = txn->getSTransaction();
2920 sttx->add(s);
2921 tx->set_rawtransaction(s.data(), s.size());
2922 tx->set_status(
2923 txn->getStatus() == INCLUDED ? protocol::tsCURRENT
2924 : protocol::tsNEW);
2925 tx->set_receivetimestamp(
2926 app_.timeKeeper().now().time_since_epoch().count());
2927 tx->set_deferred(txn->getSubmitResult().queued);
2928 }
2929
2930 if (reply.transactions_size() > 0)
2931 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2932}
2933
2934void
2935PeerImp::checkTransaction(
2936 HashRouterFlags flags,
2937 bool checkSignature,
2938 std::shared_ptr<STTx const> const& stx,
2939 bool batch)
2940{
2941 // VFALCO TODO Rewrite to not use exceptions
2942 try
2943 {
2944 // charge strongly for relaying batch txns
2945 // LCOV_EXCL_START
2946 if (stx->isFlag(tfInnerBatchTxn) &&
2947 getCurrentTransactionRules()->enabled(featureBatch))
2948 {
2949 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
2950 "tfInnerBatchTxn (checkSignature).";
2951 charge(Resource::feeModerateBurdenPeer, "inner batch txn");
2952 return;
2953 }
2954 // LCOV_EXCL_STOP
2955
2956 // Expired?
2957 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2958 (stx->getFieldU32(sfLastLedgerSequence) <
2959 app_.getLedgerMaster().getValidLedgerIndex()))
2960 {
2961 JLOG(p_journal_.info())
2962 << "Marking transaction " << stx->getTransactionID()
2963 << "as BAD because it's expired";
2964 app_.getHashRouter().setFlags(
2965 stx->getTransactionID(), HashRouterFlags::BAD);
2966 charge(Resource::feeUselessData, "expired tx");
2967 return;
2968 }
2969
2970 if (isPseudoTx(*stx))
2971 {
2972 // Don't do anything with pseudo transactions except put them in the
2973 // TransactionMaster cache
2974 std::string reason;
2975 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2976 XRPL_ASSERT(
2977 tx->getStatus() == NEW,
2978 "ripple::PeerImp::checkTransaction Transaction created "
2979 "correctly");
2980 if (tx->getStatus() == NEW)
2981 {
2982 JLOG(p_journal_.debug())
2983 << "Processing " << (batch ? "batch" : "unsolicited")
2984 << " pseudo-transaction tx " << tx->getID();
2985
2986 app_.getMasterTransaction().canonicalize(&tx);
2987 // Tell the overlay about it, but don't relay it.
2988 auto const toSkip =
2989 app_.getHashRouter().shouldRelay(tx->getID());
2990 if (toSkip)
2991 {
2992 JLOG(p_journal_.debug())
2993 << "Passing skipped pseudo pseudo-transaction tx "
2994 << tx->getID();
2995 app_.overlay().relay(tx->getID(), {}, *toSkip);
2996 }
2997 if (!batch)
2998 {
2999 JLOG(p_journal_.debug())
3000 << "Charging for pseudo-transaction tx " << tx->getID();
3001 charge(Resource::feeUselessData, "pseudo tx");
3002 }
3003
3004 return;
3005 }
3006 }
3007
3008 if (checkSignature)
3009 {
3010 // Check the signature before handing off to the job queue.
3011 if (auto [valid, validReason] = checkValidity(
3012 app_.getHashRouter(),
3013 *stx,
3014 app_.getLedgerMaster().getValidatedRules(),
3015 app_.config());
3016 valid != Validity::Valid)
3017 {
3018 if (!validReason.empty())
3019 {
3020 JLOG(p_journal_.debug())
3021 << "Exception checking transaction: " << validReason;
3022 }
3023
3024 // Probably not necessary to set HashRouterFlags::BAD, but
3025 // doesn't hurt.
3026 app_.getHashRouter().setFlags(
3027 stx->getTransactionID(), HashRouterFlags::BAD);
3028 charge(
3029 Resource::feeInvalidSignature,
3030 "check transaction signature failure");
3031 return;
3032 }
3033 }
3034 else
3035 {
3037 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3038 }
3039
3040 std::string reason;
3041 auto tx = std::make_shared<Transaction>(stx, reason, app_);
3042
3043 if (tx->getStatus() == INVALID)
3044 {
3045 if (!reason.empty())
3046 {
3047 JLOG(p_journal_.debug())
3048 << "Exception checking transaction: " << reason;
3049 }
3050 app_.getHashRouter().setFlags(
3051 stx->getTransactionID(), HashRouterFlags::BAD);
3052 charge(Resource::feeInvalidSignature, "tx (impossible)");
3053 return;
3054 }
3055
3056 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
3057 app_.getOPs().processTransaction(
3058 tx, trusted, false, NetworkOPs::FailHard::no);
3059 }
3060 catch (std::exception const& ex)
3061 {
3062 JLOG(p_journal_.warn())
3063 << "Exception in " << __func__ << ": " << ex.what();
3064 app_.getHashRouter().setFlags(
3065 stx->getTransactionID(), HashRouterFlags::BAD);
3066 using namespace std::string_literals;
3067 charge(Resource::feeInvalidData, "tx "s + ex.what());
3068 }
3069}
3070
3071// Called from our JobQueue
3072void
3073PeerImp::checkPropose(
3074 bool isTrusted,
3076 RCLCxPeerPos peerPos)
3077{
3078 JLOG(p_journal_.trace())
3079 << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal";
3080
3081 XRPL_ASSERT(packet, "ripple::PeerImp::checkPropose : non-null packet");
3082
3083 if (!cluster() && !peerPos.checkSign())
3084 {
3085 std::string desc{"Proposal fails sig check"};
3086 JLOG(p_journal_.warn()) << desc;
3087 charge(Resource::feeInvalidSignature, desc);
3088 return;
3089 }
3090
3091 bool relay;
3092
3093 if (isTrusted)
3094 relay = app_.getOPs().processTrustedProposal(peerPos);
3095 else
3096 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3097
3098 if (relay)
3099 {
3100 // haveMessage contains peers, which are suppressed; i.e. the peers
3101 // are the source of the message, consequently the message should
3102 // not be relayed to these peers. But the message must be counted
3103 // as part of the squelch logic.
3104 auto haveMessage = app_.overlay().relay(
3105 *packet, peerPos.suppressionID(), peerPos.publicKey());
3106 if (!haveMessage.empty())
3107 overlay_.updateSlotAndSquelch(
3108 peerPos.suppressionID(),
3109 peerPos.publicKey(),
3110 std::move(haveMessage),
3111 protocol::mtPROPOSE_LEDGER);
3112 }
3113}
3114
3115void
3116PeerImp::checkValidation(
3118 uint256 const& key,
3120{
3121 if (!val->isValid())
3122 {
3123 std::string desc{"Validation forwarded by peer is invalid"};
3124 JLOG(p_journal_.debug()) << desc;
3125 charge(Resource::feeInvalidSignature, desc);
3126 return;
3127 }
3128
3129 // FIXME it should be safe to remove this try/catch. Investigate codepaths.
3130 try
3131 {
3132 if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
3133 cluster())
3134 {
3135 // haveMessage contains peers, which are suppressed; i.e. the peers
3136 // are the source of the message, consequently the message should
3137 // not be relayed to these peers. But the message must be counted
3138 // as part of the squelch logic.
3139 auto haveMessage =
3140 overlay_.relay(*packet, key, val->getSignerPublic());
3141 if (!haveMessage.empty())
3142 {
3143 overlay_.updateSlotAndSquelch(
3144 key,
3145 val->getSignerPublic(),
3146 std::move(haveMessage),
3147 protocol::mtVALIDATION);
3148 }
3149 }
3150 }
3151 catch (std::exception const& ex)
3152 {
3153 JLOG(p_journal_.trace())
3154 << "Exception processing validation: " << ex.what();
3155 using namespace std::string_literals;
3156 charge(Resource::feeMalformedRequest, "validation "s + ex.what());
3157 }
3158}
3159
3160// Returns the set of peers that can help us get
3161// the TX tree with the specified root hash.
3162//
3164getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
3165{
3167 int retScore = 0;
3168
3170 if (p->hasTxSet(rootHash) && p.get() != skip)
3171 {
3172 auto score = p->getScore(true);
3173 if (!ret || (score > retScore))
3174 {
3175 ret = std::move(p);
3176 retScore = score;
3177 }
3178 }
3179 });
3180
3181 return ret;
3182}
3183
3184// Returns a random peer weighted by how likely to
3185// have the ledger and how responsive it is.
3186//
3189 OverlayImpl& ov,
3190 uint256 const& ledgerHash,
3191 LedgerIndex ledger,
3192 PeerImp const* skip)
3193{
3195 int retScore = 0;
3196
3198 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3199 {
3200 auto score = p->getScore(true);
3201 if (!ret || (score > retScore))
3202 {
3203 ret = std::move(p);
3204 retScore = score;
3205 }
3206 }
3207 });
3208
3209 return ret;
3210}
3211
3212void
3213PeerImp::sendLedgerBase(
3214 std::shared_ptr<Ledger const> const& ledger,
3215 protocol::TMLedgerData& ledgerData)
3216{
3217 JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
3218
3219 Serializer s(sizeof(LedgerInfo));
3220 addRaw(ledger->info(), s);
3221 ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
3222
3223 auto const& stateMap{ledger->stateMap()};
3224 if (stateMap.getHash() != beast::zero)
3225 {
3226 // Return account state root node if possible
3227 Serializer root(768);
3228
3229 stateMap.serializeRoot(root);
3230 ledgerData.add_nodes()->set_nodedata(
3231 root.getDataPtr(), root.getLength());
3232
3233 if (ledger->info().txHash != beast::zero)
3234 {
3235 auto const& txMap{ledger->txMap()};
3236 if (txMap.getHash() != beast::zero)
3237 {
3238 // Return TX root node if possible
3239 root.erase();
3240 txMap.serializeRoot(root);
3241 ledgerData.add_nodes()->set_nodedata(
3242 root.getDataPtr(), root.getLength());
3243 }
3244 }
3245 }
3246
3247 auto message{
3248 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3249 send(message);
3250}
3251
3253PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
3254{
3255 JLOG(p_journal_.trace()) << "getLedger: Ledger";
3256
3258
3259 if (m->has_ledgerhash())
3260 {
3261 // Attempt to find ledger by hash
3262 uint256 const ledgerHash{m->ledgerhash()};
3263 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3264 if (!ledger)
3265 {
3266 JLOG(p_journal_.trace())
3267 << "getLedger: Don't have ledger with hash " << ledgerHash;
3268
3269 if (m->has_querytype() && !m->has_requestcookie())
3270 {
3271 // Attempt to relay the request to a peer
3272 if (auto const peer = getPeerWithLedger(
3273 overlay_,
3274 ledgerHash,
3275 m->has_ledgerseq() ? m->ledgerseq() : 0,
3276 this))
3277 {
3278 m->set_requestcookie(id());
3279 peer->send(
3280 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3281 JLOG(p_journal_.debug())
3282 << "getLedger: Request relayed to peer";
3283 return ledger;
3284 }
3285
3286 JLOG(p_journal_.trace())
3287 << "getLedger: Failed to find peer to relay request";
3288 }
3289 }
3290 }
3291 else if (m->has_ledgerseq())
3292 {
3293 // Attempt to find ledger by sequence
3294 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3295 {
3296 JLOG(p_journal_.debug())
3297 << "getLedger: Early ledger sequence request";
3298 }
3299 else
3300 {
3301 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3302 if (!ledger)
3303 {
3304 JLOG(p_journal_.debug())
3305 << "getLedger: Don't have ledger with sequence "
3306 << m->ledgerseq();
3307 }
3308 }
3309 }
3310 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3311 {
3312 ledger = app_.getLedgerMaster().getClosedLedger();
3313 }
3314
3315 if (ledger)
3316 {
3317 // Validate retrieved ledger sequence
3318 auto const ledgerSeq{ledger->info().seq};
3319 if (m->has_ledgerseq())
3320 {
3321 if (ledgerSeq != m->ledgerseq())
3322 {
3323 // Do not resource charge a peer responding to a relay
3324 if (!m->has_requestcookie())
3325 charge(
3326 Resource::feeMalformedRequest, "get_ledger ledgerSeq");
3327
3328 ledger.reset();
3329 JLOG(p_journal_.warn())
3330 << "getLedger: Invalid ledger sequence " << ledgerSeq;
3331 }
3332 }
3333 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3334 {
3335 ledger.reset();
3336 JLOG(p_journal_.debug())
3337 << "getLedger: Early ledger sequence request " << ledgerSeq;
3338 }
3339 }
3340 else
3341 {
3342 JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
3343 }
3344
3345 return ledger;
3346}
3347
3349PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
3350{
3351 JLOG(p_journal_.trace()) << "getTxSet: TX set";
3352
3353 uint256 const txSetHash{m->ledgerhash()};
3355 app_.getInboundTransactions().getSet(txSetHash, false)};
3356 if (!shaMap)
3357 {
3358 if (m->has_querytype() && !m->has_requestcookie())
3359 {
3360 // Attempt to relay the request to a peer
3361 if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
3362 {
3363 m->set_requestcookie(id());
3364 peer->send(
3365 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3366 JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
3367 }
3368 else
3369 {
3370 JLOG(p_journal_.debug())
3371 << "getTxSet: Failed to find relay peer";
3372 }
3373 }
3374 else
3375 {
3376 JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
3377 }
3378 }
3379
3380 return shaMap;
3381}
3382
3383void
3384PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
3385{
3386 // Do not resource charge a peer responding to a relay
3387 if (!m->has_requestcookie())
3388 charge(
3389 Resource::feeModerateBurdenPeer, "received a get ledger request");
3390
3393 SHAMap const* map{nullptr};
3394 protocol::TMLedgerData ledgerData;
3395 bool fatLeaves{true};
3396 auto const itype{m->itype()};
3397
3398 if (itype == protocol::liTS_CANDIDATE)
3399 {
3400 if (sharedMap = getTxSet(m); !sharedMap)
3401 return;
3402 map = sharedMap.get();
3403
3404 // Fill out the reply
3405 ledgerData.set_ledgerseq(0);
3406 ledgerData.set_ledgerhash(m->ledgerhash());
3407 ledgerData.set_type(protocol::liTS_CANDIDATE);
3408 if (m->has_requestcookie())
3409 ledgerData.set_requestcookie(m->requestcookie());
3410
3411 // We'll already have most transactions
3412 fatLeaves = false;
3413 }
3414 else
3415 {
3416 if (send_queue_.size() >= Tuning::dropSendQueue)
3417 {
3418 JLOG(p_journal_.debug())
3419 << "processLedgerRequest: Large send queue";
3420 return;
3421 }
3422 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3423 {
3424 JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy";
3425 return;
3426 }
3427
3428 if (ledger = getLedger(m); !ledger)
3429 return;
3430
3431 // Fill out the reply
3432 auto const ledgerHash{ledger->info().hash};
3433 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3434 ledgerData.set_ledgerseq(ledger->info().seq);
3435 ledgerData.set_type(itype);
3436 if (m->has_requestcookie())
3437 ledgerData.set_requestcookie(m->requestcookie());
3438
3439 switch (itype)
3440 {
3441 case protocol::liBASE:
3442 sendLedgerBase(ledger, ledgerData);
3443 return;
3444
3445 case protocol::liTX_NODE:
3446 map = &ledger->txMap();
3447 JLOG(p_journal_.trace()) << "processLedgerRequest: TX map hash "
3448 << to_string(map->getHash());
3449 break;
3450
3451 case protocol::liAS_NODE:
3452 map = &ledger->stateMap();
3453 JLOG(p_journal_.trace())
3454 << "processLedgerRequest: Account state map hash "
3455 << to_string(map->getHash());
3456 break;
3457
3458 default:
3459 // This case should not be possible here
3460 JLOG(p_journal_.error())
3461 << "processLedgerRequest: Invalid ledger info type";
3462 return;
3463 }
3464 }
3465
3466 if (!map)
3467 {
3468 JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map";
3469 return;
3470 }
3471
3472 // Add requested node data to reply
3473 if (m->nodeids_size() > 0)
3474 {
3475 auto const queryDepth{
3476 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3477
3479
3480 for (int i = 0; i < m->nodeids_size() &&
3481 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3482 ++i)
3483 {
3484 auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
3485
3486 data.clear();
3487 data.reserve(Tuning::softMaxReplyNodes);
3488
3489 try
3490 {
3491 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3492 {
3493 JLOG(p_journal_.trace())
3494 << "processLedgerRequest: getNodeFat got "
3495 << data.size() << " nodes";
3496
3497 for (auto const& d : data)
3498 {
3499 if (ledgerData.nodes_size() >=
3500 Tuning::hardMaxReplyNodes)
3501 break;
3502 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3503 node->set_nodeid(d.first.getRawString());
3504 node->set_nodedata(d.second.data(), d.second.size());
3505 }
3506 }
3507 else
3508 {
3509 JLOG(p_journal_.warn())
3510 << "processLedgerRequest: getNodeFat returns false";
3511 }
3512 }
3513 catch (std::exception const& e)
3514 {
3515 std::string info;
3516 switch (itype)
3517 {
3518 case protocol::liBASE:
3519 // This case should not be possible here
3520 info = "Ledger base";
3521 break;
3522
3523 case protocol::liTX_NODE:
3524 info = "TX node";
3525 break;
3526
3527 case protocol::liAS_NODE:
3528 info = "AS node";
3529 break;
3530
3531 case protocol::liTS_CANDIDATE:
3532 info = "TS candidate";
3533 break;
3534
3535 default:
3536 info = "Invalid";
3537 break;
3538 }
3539
3540 if (!m->has_ledgerhash())
3541 info += ", no hash specified";
3542
3543 JLOG(p_journal_.warn())
3544 << "processLedgerRequest: getNodeFat with nodeId "
3545 << *shaMapNodeId << " and ledger info type " << info
3546 << " throws exception: " << e.what();
3547 }
3548 }
3549
3550 JLOG(p_journal_.info())
3551 << "processLedgerRequest: Got request for " << m->nodeids_size()
3552 << " nodes at depth " << queryDepth << ", return "
3553 << ledgerData.nodes_size() << " nodes";
3554 }
3555
3556 if (ledgerData.nodes_size() == 0)
3557 return;
3558
3559 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3560}
3561
3562int
3563PeerImp::getScore(bool haveItem) const
3564{
3565 // Random component of score, used to break ties and avoid
3566 // overloading the "best" peer
3567 static int const spRandomMax = 9999;
3568
3569 // Score for being very likely to have the thing we are
3570 // look for; should be roughly spRandomMax
3571 static int const spHaveItem = 10000;
3572
3573 // Score reduction for each millisecond of latency; should
3574 // be roughly spRandomMax divided by the maximum reasonable
3575 // latency
3576 static int const spLatency = 30;
3577
3578 // Penalty for unknown latency; should be roughly spRandomMax
3579 static int const spNoLatency = 8000;
3580
3581 int score = rand_int(spRandomMax);
3582
3583 if (haveItem)
3584 score += spHaveItem;
3585
3587 {
3588 std::lock_guard sl(recentLock_);
3589 latency = latency_;
3590 }
3591
3592 if (latency)
3593 score -= latency->count() * spLatency;
3594 else
3595 score -= spNoLatency;
3596
3597 return score;
3598}
3599
3600bool
3601PeerImp::isHighLatency() const
3602{
3603 std::lock_guard sl(recentLock_);
3604 return latency_ >= peerHighLatency;
3605}
3606
3607void
3608PeerImp::Metrics::add_message(std::uint64_t bytes)
3609{
3610 using namespace std::chrono_literals;
3611 std::unique_lock lock{mutex_};
3612
3613 totalBytes_ += bytes;
3614 accumBytes_ += bytes;
3615 auto const timeElapsed = clock_type::now() - intervalStart_;
3616 auto const timeElapsedInSecs =
3617 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3618
3619 if (timeElapsedInSecs >= 1s)
3620 {
3621 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3622 rollingAvg_.push_back(avgBytes);
3623
3624 auto const totalBytes =
3625 std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3626 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3627
3628 intervalStart_ = clock_type::now();
3629 accumBytes_ = 0;
3630 }
3631}
3632
3634PeerImp::Metrics::average_bytes() const
3635{
3636 std::shared_lock lock{mutex_};
3637 return rollingAvgBytes_;
3638}
3639
3641PeerImp::Metrics::total_bytes() const
3642{
3643 std::shared_lock lock{mutex_};
3644 return totalBytes_;
3645}
3646
3647} // namespace ripple
T accumulate(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:149
A version-independent IP address and port combination.
Definition IPEndpoint.h:38
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:75
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:68
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Definition Journal.h:314
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
Stream warn() const
Definition Journal.h:340
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
Definition Cluster.cpp:83
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:49
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
Definition Cluster.cpp:57
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:38
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:248
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:265
int MAX_TRANSACTIONS
Definition Config.h:226
std::chrono::seconds MAX_DIVERGED_TIME
Definition Config.h:284
std::chrono::seconds MAX_UNKNOWN_TIME
Definition Config.h:281
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:179
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:142
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:168
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:55
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
Definition PeerImp.h:118
std::queue< std::shared_ptr< Message > > send_queue_
Definition PeerImp.h:243
std::unique_ptr< LoadEvent > load_event_
Definition PeerImp.h:258
boost::beast::http::fields const & headers_
Definition PeerImp.h:242
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Definition PeerImp.cpp:1157
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
Definition PeerImp.cpp:528
clock_type::duration uptime() const
Definition PeerImp.h:443
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
Definition PeerImp.cpp:344
protocol::TMStatusChange last_status_
Definition PeerImp.h:235
std::string name_
Definition PeerImp.h:167
boost::circular_buffer< uint256 > recentTxSets_
Definition PeerImp.h:178
std::unique_ptr< stream_type > stream_ptr_
Definition PeerImp.h:143
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Definition PeerImp.cpp:1166
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Definition PeerImp.h:121
Compressed compressionEnabled_
Definition PeerImp.h:263
uint256 closedLedgerHash_
Definition PeerImp.h:174
std::string domain() const
Definition PeerImp.cpp:899
std::optional< std::uint32_t > lastPingSeq_
Definition PeerImp.h:181
std::string const & fingerprint() const override
Definition PeerImp.h:695
beast::Journal const journal_
Definition PeerImp.h:141
virtual void run()
Definition PeerImp.cpp:163
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
Definition PeerImp.cpp:619
LedgerIndex maxLedger_
Definition PeerImp.h:173
beast::Journal const p_journal_
Definition PeerImp.h:142
bool const inbound_
Definition PeerImp.h:158
PeerImp(PeerImp const &)=delete
Application & app_
Definition PeerImp.h:135
void stop() override
Definition PeerImp.cpp:221
void shutdown()
Initiates the peer disconnection sequence.
Definition PeerImp.cpp:643
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
Definition PeerImp.cpp:570
bool hasTxSet(uint256 const &hash) const override
Definition PeerImp.cpp:552
clock_type::time_point lastPingTime_
Definition PeerImp.h:182
void onMessageUnknown(std::uint16_t type)
Definition PeerImp.cpp:1108
std::shared_ptr< PeerFinder::Slot > const slot_
Definition PeerImp.h:238
std::shared_mutex nameMutex_
Definition PeerImp.h:168
boost::circular_buffer< uint256 > recentLedgers_
Definition PeerImp.h:177
id_t const id_
Definition PeerImp.h:136
std::optional< std::chrono::milliseconds > latency_
Definition PeerImp.h:180
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
Definition PeerImp.cpp:1358
beast::IP::Endpoint const remote_address_
Definition PeerImp.h:153
Json::Value json() override
Definition PeerImp.cpp:393
PublicKey const publicKey_
Definition PeerImp.h:166
void close()
Forcibly closes the underlying socket connection.
Definition PeerImp.cpp:686
hash_set< uint256 > txQueue_
Definition PeerImp.h:268
std::mutex recentLock_
Definition PeerImp.h:234
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Definition PeerImp.cpp:1114
bool txReduceRelayEnabled_
Definition PeerImp.h:270
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
Definition PeerImp.cpp:580
clock_type::time_point trackingTime_
Definition PeerImp.h:164
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
Definition PeerImp.cpp:808
bool shutdownStarted_
Definition PeerImp.h:249
socket_type & socket_
Definition PeerImp.h:144
ProtocolVersion protocol_
Definition PeerImp.h:161
reduce_relay::Squelch< UptimeClock > squelch_
Definition PeerImp.h:185
bool writePending_
Definition PeerImp.h:255
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
Definition PeerImp.cpp:385
struct ripple::PeerImp::@22 metrics_
uint256 previousLedgerHash_
Definition PeerImp.h:175
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
Definition PeerImp.cpp:356
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
Definition PeerImp.cpp:741
void send(std::shared_ptr< Message > const &m) override
Definition PeerImp.cpp:241
std::string name() const
Definition PeerImp.cpp:892
boost::system::error_code error_code
Definition PeerImp.h:125
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:951
bool ledgerReplayEnabled_
Definition PeerImp.h:272
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
Definition PeerImp.h:132
bool crawl() const
Returns true if this connection will publicly share its IP address.
Definition PeerImp.cpp:370
waitable_timer timer_
Definition PeerImp.h:149
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
Definition PeerImp.cpp:712
void sendTxQueue() override
Send aggregated transactions' hashes.
Definition PeerImp.cpp:308
bool txReduceRelayEnabled() const override
Definition PeerImp.h:513
bool supportsFeature(ProtocolFeature f) const override
Definition PeerImp.cpp:511
ChargeWithContext fee_
Definition PeerImp.h:237
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:1042
http_request_type request_
Definition PeerImp.h:240
OverlayImpl & overlay_
Definition PeerImp.h:157
LedgerIndex minLedger_
Definition PeerImp.h:172
virtual ~PeerImp()
Definition PeerImp.cpp:140
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
Definition PeerImp.cpp:327
stream_type & stream_
Definition PeerImp.h:145
static std::string makePrefix(std::string const &fingerprint)
Definition PeerImp.cpp:733
bool cluster() const override
Returns true if this connection is a member of the cluster.
Definition PeerImp.cpp:379
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
Definition PeerImp.cpp:660
boost::asio::strand< boost::asio::executor > strand_
Definition PeerImp.h:146
void cycleStatus() override
Definition PeerImp.cpp:560
boost::beast::multi_buffer read_buffer_
Definition PeerImp.h:239
Resource::Consumer usage_
Definition PeerImp.h:236
bool readPending_
Definition PeerImp.h:252
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
Definition PeerImp.cpp:543
void doProtocolStart()
Definition PeerImp.cpp:909
std::atomic< Tracking > tracking_
Definition PeerImp.h:163
Represents a peer connection in the overlay.
A public key.
Definition PublicKey.h:62
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
A consumption charge.
Definition Charge.h:30
An endpoint that consumes resources.
Definition Consumer.h:36
int balance()
Returns the credit balance representing consumption.
Definition Consumer.cpp:137
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition Consumer.cpp:124
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
Definition Consumer.cpp:106
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:97
std::size_t size() const noexcept
Definition Serializer.h:72
void const * data() const noexcept
Definition Serializer.h:78
int getLength() const
Definition Serializer.h:233
void const * getDataPtr() const
Definition Serializer.h:223
An immutable linear range of bytes.
Definition Slice.h:46
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:64
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
Definition base_uint.h:526
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition base_uint.h:503
T emplace_back(T... args)
T empty(T... args)
T find(T... args)
T for_each(T... args)
T get(T... args)
T is_same_v
T load(T... args)
T lock(T... args)
T max(T... args)
T min(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
unsigned int UInt
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:187
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:870
static constexpr char FEATURE_COMPR[]
Definition Handshake.h:141
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
Definition PeerImp.cpp:157
static constexpr char FEATURE_LEDGER_REPLAY[]
Definition Handshake.h:147
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
HashRouterFlags
Definition HashRouter.h:34
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:30
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
Definition PeerImp.cpp:3188
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:244
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:119
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:33
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
Definition PublicKey.h:269
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
Definition PeerImp.cpp:3164
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
Definition Handshake.h:198
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
Definition apply.cpp:118
static constexpr char FEATURE_TXRR[]
Definition Handshake.h:145
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
std::optional< Rules > const & getCurrentTransactionRules()
Definition Rules.cpp:47
Number root(Number f, unsigned d)
Definition Number.cpp:636
@ manifest
Manifest.
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:44
@ jtLEDGER_REQ
Definition Job.h:59
@ jtPROPOSAL_ut
Definition Job.h:60
@ jtREPLAY_REQ
Definition Job.h:58
@ jtTRANSACTION
Definition Job.h:62
@ jtPEER
Definition Job.h:80
@ jtREQUESTED_TXN
Definition Job.h:64
@ jtMISSING_TXN
Definition Job.h:63
@ jtVALIDATION_t
Definition Job.h:71
@ jtMANIFEST
Definition Job.h:55
@ jtTXN_DATA
Definition Job.h:69
@ jtPACK
Definition Job.h:43
@ jtVALIDATION_ut
Definition Job.h:54
@ jtPROPOSAL_t
Definition Job.h:74
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:224
static constexpr char FEATURE_VPRR[]
Definition Handshake.h:143
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:61
STL namespace.
T nth_element(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Information about the notional ledger backing the view.
beast::IP::Address public_ip
Definition Overlay.h:69
std::optional< std::uint32_t > networkID
Definition Overlay.h:72
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Definition PeerImp.h:220
Describes a single consumer.
Definition Gossip.h:37
beast::IP::Endpoint address
Definition Gossip.h:41
Data format for exchanging consumption information across peers.
Definition Gossip.h:32
std::vector< Item > items
Definition Gossip.h:44
T tie(T... args)
T to_string(T... args)
T what(T... args)