1#include <xrpld/app/consensus/RCLValidations.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionMaster.h>
6#include <xrpld/app/misc/HashRouter.h>
7#include <xrpld/app/misc/LoadFeeTrack.h>
8#include <xrpld/app/misc/NetworkOPs.h>
9#include <xrpld/app/misc/Transaction.h>
10#include <xrpld/app/misc/ValidatorList.h>
11#include <xrpld/app/tx/apply.h>
12#include <xrpld/overlay/Cluster.h>
13#include <xrpld/overlay/detail/PeerImp.h>
14#include <xrpld/overlay/detail/Tuning.h>
16#include <xrpl/basics/UptimeClock.h>
17#include <xrpl/basics/base64.h>
18#include <xrpl/basics/random.h>
19#include <xrpl/basics/safe_cast.h>
20#include <xrpl/core/PerfLog.h>
21#include <xrpl/protocol/TxFlags.h>
22#include <xrpl/protocol/digest.h>
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/beast/core/ostream.hpp>
34using namespace std::chrono_literals;
68 , prefix_(makePrefix(fingerprint_))
69 , sink_(app_.journal(
"Peer"), prefix_)
70 , p_sink_(app_.journal(
"Protocol"), prefix_)
73 , stream_ptr_(
std::move(stream_ptr))
74 , socket_(stream_ptr_->next_layer().socket())
75 , stream_(*stream_ptr_)
76 , strand_(
boost::asio::make_strand(socket_.get_executor()))
78 , remote_address_(slot->remote_endpoint())
84 , publicKey_(publicKey)
87 , squelch_(app_.journal(
"Squelch"))
89 , fee_{Resource::feeTrivialPeer,
""}
91 , request_(
std::move(request))
93 , compressionEnabled_(
98 app_.config().COMPRESSION)
104 app_.config().TX_REDUCE_RELAY_ENABLE))
108 app_.config().LEDGER_REPLAY))
109 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
113 <<
" vp reduce-relay base squelch enabled "
123 bool const inCluster{
cluster()};
146 if (!
strand_.running_in_this_thread())
149 auto parseLedgerHash =
163 if (
auto const iter =
headers_.find(
"Closed-Ledger");
166 closed = parseLedgerHash(iter->value());
169 fail(
"Malformed handshake data (1)");
172 if (
auto const iter =
headers_.find(
"Previous-Ledger");
175 previous = parseLedgerHash(iter->value());
178 fail(
"Malformed handshake data (2)");
181 if (previous && !closed)
182 fail(
"Malformed handshake data (3)");
204 if (!
strand_.running_in_this_thread())
224 if (!
strand_.running_in_this_thread())
234 auto validator = m->getValidatorKey();
235 if (validator && !
squelch_.expireSquelch(*validator))
245 safe_cast<TrafficCount::category>(m->getCategory()),
266 sink << n <<
" sendq: " << sendq_size;
275 boost::asio::async_write(
284 std::placeholders::_1,
285 std::placeholders::_2)));
291 if (!
strand_.running_in_this_thread())
297 protocol::TMHaveTransactions ht;
299 ht.add_hashes(hash.data(), hash.size());
310 if (!
strand_.running_in_this_thread())
327 if (!
strand_.running_in_this_thread())
332 auto removed =
txQueue_.erase(hash);
344 fail(
"charge: Resources");
353 auto const iter =
headers_.find(
"Crawl");
356 return boost::iequals(iter->value(),
"public");
382 ret[jss::inbound] =
true;
386 ret[jss::cluster] =
true;
396 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
413 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
418 if ((minSeq != 0) || (maxSeq != 0))
419 ret[jss::complete_ledgers] =
425 ret[jss::track] =
"diverged";
429 ret[jss::track] =
"unknown";
438 protocol::TMStatusChange last_status;
445 if (closedLedgerHash != beast::zero)
446 ret[jss::ledger] =
to_string(closedLedgerHash);
448 if (last_status.has_newstatus())
450 switch (last_status.newstatus())
452 case protocol::nsCONNECTING:
453 ret[jss::status] =
"connecting";
456 case protocol::nsCONNECTED:
457 ret[jss::status] =
"connected";
460 case protocol::nsMONITORING:
461 ret[jss::status] =
"monitoring";
464 case protocol::nsVALIDATING:
465 ret[jss::status] =
"validating";
468 case protocol::nsSHUTTING:
469 ret[jss::status] =
"shutting";
474 <<
"Unknown status: " << last_status.newstatus();
479 ret[jss::metrics][jss::total_bytes_recv] =
481 ret[jss::metrics][jss::total_bytes_sent] =
483 ret[jss::metrics][jss::avg_bps_recv] =
485 ret[jss::metrics][jss::avg_bps_sent] =
564 strand_.running_in_this_thread(),
565 "xrpl::PeerImp::fail : strand in this thread");
578 if (!
strand_.running_in_this_thread())
603 strand_.running_in_this_thread(),
604 "xrpl::PeerImp::tryAsyncShutdown : strand in this thread");
617 stream_.async_shutdown(bind_executor(
627 strand_.running_in_this_thread(),
628 "xrpl::PeerImp::shutdown: strand in this thread");
635 boost::beast::get_lowest_layer(
stream_).cancel();
652 (ec != boost::asio::error::eof &&
653 ec != boost::asio::error::operation_aborted &&
654 ec.message().find(
"application data after close notify") ==
670 strand_.running_in_this_thread(),
671 "xrpl::PeerImp::close : strand in this thread");
697 timer_.expires_after(interval);
705 timer_.async_wait(bind_executor(
725 strand_.running_in_this_thread(),
726 "xrpl::PeerImp::onTimer : strand in this thread");
734 if (ec == boost::asio::error::operation_aborted)
751 return fail(
"Large send queue");
755 clock_type::duration duration;
768 return fail(
"Not useful");
774 return fail(
"Ping Timeout");
779 protocol::TMPing message;
780 message.set_type(protocol::TMPing::ptPING);
807 "xrpl::PeerImp::doAccept : empty read buffer");
820 return fail(
"makeSharedValue: Unexpected failure");
852 boost::asio::async_write(
854 write_buffer->data(),
855 boost::asio::transfer_all(),
862 if (ec == boost::asio::error::operation_aborted)
865 return fail(
"onWriteResponse", ec);
866 if (write_buffer->size() == bytes_transferred)
868 return fail(
"Failed to write header");
935 strand_.running_in_this_thread(),
936 "xrpl::PeerImp::onReadMessage : strand in this thread");
945 if (ec == boost::asio::error::eof)
951 if (ec == boost::asio::error::operation_aborted)
954 return fail(
"onReadMessage", ec);
962 stream <<
"onReadMessage: "
963 << (bytes_transferred > 0
964 ?
to_string(bytes_transferred) +
" bytes"
968 metrics_.recv.add_message(bytes_transferred);
978 using namespace std::chrono_literals;
983 "invokeProtocolMessage",
993 return fail(
"onReadMessage", ec);
995 if (bytes_consumed == 0)
1018 std::placeholders::_1,
1019 std::placeholders::_2)));
1026 strand_.running_in_this_thread(),
1027 "xrpl::PeerImp::onWriteMessage : strand in this thread");
1036 if (ec == boost::asio::error::operation_aborted)
1039 return fail(
"onWriteMessage", ec);
1044 stream <<
"onWriteMessage: "
1045 << (bytes_transferred > 0
1046 ?
to_string(bytes_transferred) +
" bytes"
1050 metrics_.sent.add_message(bytes_transferred);
1054 "xrpl::PeerImp::onWriteMessage : non-empty send buffer");
1065 "xrpl::PeerImp::onWriteMessage : shutdown started");
1068 return boost::asio::async_write(
1070 boost::asio::buffer(
1077 std::placeholders::_1,
1078 std::placeholders::_2)));
1107 *m,
static_cast<protocol::MessageType
>(type),
true);
1117 if ((type == MessageType::mtTRANSACTION ||
1118 type == MessageType::mtHAVE_TRANSACTIONS ||
1119 type == MessageType::mtTRANSACTIONS ||
1131 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1133 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1134 << uncompressed_size <<
" " << isCompressed;
1149 auto const s = m->list_size();
1169 if (m->type() == protocol::TMPing::ptPING)
1173 m->set_type(protocol::TMPing::ptPONG);
1178 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1188 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1213 for (
int i = 0; i < m->clusternodes().size(); ++i)
1215 protocol::TMClusterNode
const& node = m->clusternodes(i);
1218 if (node.has_nodename())
1219 name = node.nodename();
1221 auto const publicKey =
1228 auto const reportTime =
1232 *publicKey,
name, node.nodeload(), reportTime);
1236 int loadSources = m->loadsources().size();
1237 if (loadSources != 0)
1240 gossip.
items.reserve(loadSources);
1241 for (
int i = 0; i < m->loadsources().size(); ++i)
1243 protocol::TMLoadSource
const& node = m->loadsources(i);
1248 gossip.
items.push_back(item);
1261 if (status.getReportTime() >= thresh)
1262 fees.push_back(status.getLoadFee());
1267 auto const index = fees.size() / 2;
1269 clusterFee = fees[index];
1285 if (m->endpoints_v2().size() >= 1024)
1292 endpoints.
reserve(m->endpoints_v2().size());
1295 for (
auto const& tm : m->endpoints_v2())
1302 << tm.endpoint() <<
"}";
1328 if (!endpoints.
empty())
1345 eraseTxQueue !=
batch,
1346 (
"xrpl::PeerImp::handleTransaction : valid inputs"));
1355 <<
"Need network ledger";
1364 uint256 txID = stx->getTransactionID();
1386 JLOG(
p_journal_.
warn()) <<
"Ignoring Network relayed Tx containing "
1387 "tfInnerBatchTxn (handleTransaction).";
1419 bool checkSignature =
true;
1422 if (!m->has_deferred() || !m->deferred())
1435 checkSignature =
false;
1442 <<
"No new transactions until synchronized";
1455 "recvTransaction->checkTransaction",
1461 if (
auto peer = weak.lock())
1462 peer->checkTransaction(
1463 flags, checkSignature, stx,
batch);
1470 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1471 <<
". Exception: " << ex.
what();
1482 auto const itype{m->itype()};
1485 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1486 return badData(
"Invalid ledger info type");
1494 if (itype == protocol::liTS_CANDIDATE)
1496 if (!m->has_ledgerhash())
1497 return badData(
"Invalid TX candidate set, missing TX set hash");
1500 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1501 !(ltype && *ltype == protocol::ltCLOSED))
1503 return badData(
"Invalid request");
1507 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1508 return badData(
"Invalid ledger type");
1512 return badData(
"Invalid ledger hash");
1515 if (m->has_ledgerseq())
1517 auto const ledgerSeq{m->ledgerseq()};
1520 using namespace std::chrono_literals;
1530 if (itype != protocol::liBASE)
1532 if (m->nodeids_size() <= 0)
1533 return badData(
"Invalid ledger node IDs");
1535 for (
auto const& nodeId : m->nodeids())
1538 return badData(
"Invalid SHAMap node ID");
1543 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1544 return badData(
"Invalid query type");
1547 if (m->has_querydepth())
1550 itype == protocol::liBASE)
1552 return badData(
"Invalid query depth");
1559 if (
auto peer = weak.
lock())
1560 peer->processLedgerRequest(m);
1580 if (
auto peer = weak.
lock())
1583 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1584 if (reply.has_error())
1586 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1588 Resource::feeMalformedRequest,
1589 "proof_path_request");
1592 Resource::feeRequestNoReply,
"proof_path_request");
1596 peer->send(std::make_shared<Message>(
1597 reply, protocol::mtPROOF_PATH_RESPONSE));
1606 if (!ledgerReplayEnabled_)
1609 Resource::feeMalformedRequest,
"proof_path_response disabled");
1613 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1615 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1622 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1623 if (!ledgerReplayEnabled_)
1626 Resource::feeMalformedRequest,
"replay_delta_request disabled");
1630 fee_.fee = Resource::feeModerateBurdenPeer;
1632 app_.getJobQueue().addJob(
1634 if (
auto peer = weak.
lock())
1637 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1638 if (reply.has_error())
1640 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1642 Resource::feeMalformedRequest,
1643 "replay_delta_request");
1646 Resource::feeRequestNoReply,
1647 "replay_delta_request");
1651 peer->send(std::make_shared<Message>(
1652 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1661 if (!ledgerReplayEnabled_)
1664 Resource::feeMalformedRequest,
"replay_delta_response disabled");
1668 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1670 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1678 fee_.update(Resource::feeInvalidData, msg);
1679 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1684 return badData(
"Invalid ledger hash");
1688 auto const ledgerSeq{m->ledgerseq()};
1689 if (m->type() == protocol::liTS_CANDIDATE)
1700 using namespace std::chrono_literals;
1701 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1702 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1711 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1712 return badData(
"Invalid ledger info type");
1715 if (m->has_error() &&
1716 (m->error() < protocol::reNO_LEDGER ||
1717 m->error() > protocol::reBAD_REQUEST))
1719 return badData(
"Invalid reply error");
1723 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1726 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1730 if (m->has_requestcookie())
1732 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1734 m->clear_requestcookie();
1739 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1744 uint256 const ledgerHash{m->ledgerhash()};
1747 if (m->type() == protocol::liTS_CANDIDATE)
1750 app_.getJobQueue().addJob(
1751 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1752 if (
auto peer = weak.lock())
1754 peer->app_.getInboundTransactions().gotData(
1755 ledgerHash, peer, m);
1762 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1768 protocol::TMProposeSet&
set = *m;
1777 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1779 Resource::feeInvalidSignature,
1780 " signature can't be longer than 72 bytes");
1787 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1788 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1796 auto const isTrusted = app_.validators().trusted(publicKey);
1804 overlay_.reportInboundTraffic(
1805 TrafficCount::category::proposal_untrusted,
1806 Message::messageSize(*m));
1808 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1812 uint256 const proposeHash{
set.currenttxhash()};
1813 uint256 const prevLedger{
set.previousledger()};
1825 if (
auto [added, relayed] =
1826 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1831 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1832 overlay_.updateSlotAndSquelch(
1833 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1836 overlay_.reportInboundTraffic(
1837 TrafficCount::category::proposal_duplicate,
1838 Message::messageSize(*m));
1840 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1847 if (tracking_.load() == Tracking::diverged)
1849 JLOG(p_journal_.debug())
1850 <<
"Proposal: Dropping untrusted (peer divergence)";
1854 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1856 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1861 JLOG(p_journal_.trace())
1862 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1873 app_.timeKeeper().closeTime(),
1874 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1877 app_.getJobQueue().addJob(
1879 "recvPropose->checkPropose",
1881 if (
auto peer = weak.lock())
1882 peer->checkPropose(isTrusted, m,
proposal);
1889 JLOG(p_journal_.trace()) <<
"Status: Change";
1891 if (!m->has_networktime())
1892 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1896 if (!last_status_.has_newstatus() || m->has_newstatus())
1901 protocol::NodeStatus status = last_status_.newstatus();
1903 m->set_newstatus(status);
1907 if (m->newevent() == protocol::neLOST_SYNC)
1909 bool outOfSync{
false};
1914 if (!closedLedgerHash_.isZero())
1917 closedLedgerHash_.zero();
1919 previousLedgerHash_.zero();
1923 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1930 bool const peerChangedLedgers{
1937 if (peerChangedLedgers)
1939 closedLedgerHash_ = m->ledgerhash();
1940 closedLedgerHash = closedLedgerHash_;
1941 addLedger(closedLedgerHash, sl);
1945 closedLedgerHash_.zero();
1948 if (m->has_ledgerhashprevious() &&
1951 previousLedgerHash_ = m->ledgerhashprevious();
1952 addLedger(previousLedgerHash_, sl);
1956 previousLedgerHash_.zero();
1959 if (peerChangedLedgers)
1961 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1965 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1969 if (m->has_firstseq() && m->has_lastseq())
1973 minLedger_ = m->firstseq();
1974 maxLedger_ = m->lastseq();
1976 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1977 minLedger_ = maxLedger_ = 0;
1980 if (m->has_ledgerseq() &&
1981 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1984 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1987 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1990 if (m->has_newstatus())
1992 switch (m->newstatus())
1994 case protocol::nsCONNECTING:
1995 j[jss::status] =
"CONNECTING";
1997 case protocol::nsCONNECTED:
1998 j[jss::status] =
"CONNECTED";
2000 case protocol::nsMONITORING:
2001 j[jss::status] =
"MONITORING";
2003 case protocol::nsVALIDATING:
2004 j[jss::status] =
"VALIDATING";
2006 case protocol::nsSHUTTING:
2007 j[jss::status] =
"SHUTTING";
2012 if (m->has_newevent())
2014 switch (m->newevent())
2016 case protocol::neCLOSING_LEDGER:
2017 j[jss::action] =
"CLOSING_LEDGER";
2019 case protocol::neACCEPTED_LEDGER:
2020 j[jss::action] =
"ACCEPTED_LEDGER";
2022 case protocol::neSWITCHED_LEDGER:
2023 j[jss::action] =
"SWITCHED_LEDGER";
2025 case protocol::neLOST_SYNC:
2026 j[jss::action] =
"LOST_SYNC";
2031 if (m->has_ledgerseq())
2033 j[jss::ledger_index] = m->ledgerseq();
2036 if (m->has_ledgerhash())
2038 uint256 closedLedgerHash{};
2041 closedLedgerHash = closedLedgerHash_;
2043 j[jss::ledger_hash] = to_string(closedLedgerHash);
2046 if (m->has_networktime())
2051 if (m->has_firstseq() && m->has_lastseq())
2053 j[jss::ledger_index_min] =
Json::UInt(m->firstseq());
2054 j[jss::ledger_index_max] =
Json::UInt(m->lastseq());
2070 serverSeq = maxLedger_;
2076 checkTracking(serverSeq, validationSeq);
2085 if (diff < Tuning::convergedLedgerLimit)
2088 tracking_ = Tracking::converged;
2091 if ((diff > Tuning::divergedLedgerLimit) &&
2092 (tracking_.load() != Tracking::diverged))
2097 tracking_ = Tracking::diverged;
2098 trackingTime_ = clock_type::now();
2107 fee_.update(Resource::feeMalformedRequest,
"bad hash");
2111 uint256 const hash{m->hash()};
2113 if (m->status() == protocol::tsHAVE)
2117 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2118 recentTxSets_.end())
2120 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
2124 recentTxSets_.push_back(hash);
2129PeerImp::onValidatorListMessage(
2139 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType;
2141 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
2147 JLOG(p_journal_.debug()) <<
"Received " << messageType;
2149 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2151 JLOG(p_journal_.debug())
2152 << messageType <<
": received duplicate " << messageType;
2156 fee_.update(Resource::feeUselessData,
"duplicate");
2160 auto const applyResult = app_.validators().applyListsAndBroadcast(
2164 remote_address_.to_string(),
2167 app_.getHashRouter(),
2170 JLOG(p_journal_.debug())
2171 <<
"Processed " << messageType <<
" version " << version <<
" from "
2172 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2173 :
"unknown or invalid publisher")
2174 <<
" with best result " << to_string(applyResult.bestDisposition());
2177 switch (applyResult.bestDisposition())
2180 case ListDisposition::accepted:
2182 case ListDisposition::expired:
2184 case ListDisposition::pending: {
2188 applyResult.publisherKey,
2189 "xrpl::PeerImp::onValidatorListMessage : publisher key is "
2191 auto const& pubKey = *applyResult.publisherKey;
2193 if (
auto const iter = publisherListSequences_.find(pubKey);
2194 iter != publisherListSequences_.end())
2197 iter->second < applyResult.sequence,
2198 "xrpl::PeerImp::onValidatorListMessage : lower sequence");
2201 publisherListSequences_[pubKey] = applyResult.sequence;
2204 case ListDisposition::same_sequence:
2205 case ListDisposition::known_sequence:
2210 applyResult.sequence && applyResult.publisherKey,
2211 "xrpl::PeerImp::onValidatorListMessage : nonzero sequence "
2212 "and set publisher key");
2214 publisherListSequences_[*applyResult.publisherKey] <=
2215 applyResult.sequence,
2216 "xrpl::PeerImp::onValidatorListMessage : maximum sequence");
2221 case ListDisposition::stale:
2222 case ListDisposition::untrusted:
2223 case ListDisposition::invalid:
2224 case ListDisposition::unsupported_version:
2229 "xrpl::PeerImp::onValidatorListMessage : invalid best list "
2235 switch (applyResult.worstDisposition())
2237 case ListDisposition::accepted:
2238 case ListDisposition::expired:
2239 case ListDisposition::pending:
2242 case ListDisposition::same_sequence:
2243 case ListDisposition::known_sequence:
2248 Resource::feeUselessData,
2249 " duplicate (same_sequence or known_sequence)");
2251 case ListDisposition::stale:
2254 fee_.update(Resource::feeInvalidData,
"expired");
2256 case ListDisposition::untrusted:
2260 fee_.update(Resource::feeUselessData,
"untrusted");
2262 case ListDisposition::invalid:
2265 Resource::feeInvalidSignature,
"invalid list disposition");
2267 case ListDisposition::unsupported_version:
2270 fee_.update(Resource::feeInvalidData,
"version");
2275 "xrpl::PeerImp::onValidatorListMessage : invalid worst list "
2281 for (
auto const& [disp, count] : applyResult.dispositions)
2286 case ListDisposition::accepted:
2287 JLOG(p_journal_.debug())
2288 <<
"Applied " << count <<
" new " << messageType;
2291 case ListDisposition::expired:
2292 JLOG(p_journal_.debug())
2293 <<
"Applied " << count <<
" expired " << messageType;
2296 case ListDisposition::pending:
2297 JLOG(p_journal_.debug())
2298 <<
"Processed " << count <<
" future " << messageType;
2300 case ListDisposition::same_sequence:
2301 JLOG(p_journal_.warn())
2302 <<
"Ignored " << count <<
" " << messageType
2303 <<
"(s) with current sequence";
2305 case ListDisposition::known_sequence:
2306 JLOG(p_journal_.warn())
2307 <<
"Ignored " << count <<
" " << messageType
2308 <<
"(s) with future sequence";
2310 case ListDisposition::stale:
2311 JLOG(p_journal_.warn())
2312 <<
"Ignored " << count <<
"stale " << messageType;
2314 case ListDisposition::untrusted:
2315 JLOG(p_journal_.warn())
2316 <<
"Ignored " << count <<
" untrusted " << messageType;
2318 case ListDisposition::unsupported_version:
2319 JLOG(p_journal_.warn())
2320 <<
"Ignored " << count <<
"unsupported version "
2323 case ListDisposition::invalid:
2324 JLOG(p_journal_.warn())
2325 <<
"Ignored " << count <<
"invalid " << messageType;
2330 "xrpl::PeerImp::onValidatorListMessage : invalid list "
2342 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2344 JLOG(p_journal_.debug())
2345 <<
"ValidatorList: received validator list from peer using "
2346 <<
"protocol version " << to_string(protocol_)
2347 <<
" which shouldn't support this feature.";
2348 fee_.update(Resource::feeUselessData,
"unsupported peer");
2351 onValidatorListMessage(
2355 ValidatorList::parseBlobs(*m));
2359 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what();
2360 using namespace std::string_literals;
2361 fee_.update(Resource::feeInvalidData, e.
what());
2371 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2373 JLOG(p_journal_.debug())
2374 <<
"ValidatorListCollection: received validator list from peer "
2375 <<
"using protocol version " << to_string(protocol_)
2376 <<
" which shouldn't support this feature.";
2377 fee_.update(Resource::feeUselessData,
"unsupported peer");
2380 else if (m->version() < 2)
2382 JLOG(p_journal_.debug())
2383 <<
"ValidatorListCollection: received invalid validator list "
2385 << m->version() <<
" from peer using protocol version "
2386 << to_string(protocol_);
2387 fee_.update(Resource::feeInvalidData,
"wrong version");
2390 onValidatorListMessage(
2391 "ValidatorListCollection",
2394 ValidatorList::parseBlobs(*m));
2398 JLOG(p_journal_.warn())
2399 <<
"ValidatorListCollection: Exception, " << e.
what();
2400 using namespace std::string_literals;
2401 fee_.update(Resource::feeInvalidData, e.
what());
2408 if (m->validation().size() < 50)
2410 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2411 fee_.update(Resource::feeMalformedRequest,
"too small");
2417 auto const closeTime = app_.timeKeeper().closeTime();
2426 app_.validatorManifests().getMasterKey(pk));
2429 val->setSeen(closeTime);
2433 app_.getValidations().parms(),
2434 app_.timeKeeper().closeTime(),
2436 val->getSeenTime()))
2438 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2439 fee_.update(Resource::feeUselessData,
"not current");
2446 auto const isTrusted =
2447 app_.validators().trusted(val->getSignerPublic());
2455 overlay_.reportInboundTraffic(
2456 TrafficCount::category::validation_untrusted,
2457 Message::messageSize(*m));
2459 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2465 auto [added, relayed] =
2466 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2473 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2474 overlay_.updateSlotAndSquelch(
2475 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2478 overlay_.reportInboundTraffic(
2479 TrafficCount::category::validation_duplicate,
2480 Message::messageSize(*m));
2482 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2486 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2488 JLOG(p_journal_.debug())
2489 <<
"Dropping untrusted validation from diverged peer";
2491 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2495 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2500 to_string(val->getNodeID());
2507 app_.getJobQueue().addJob(
2510 [weak, val, m, key]() {
2511 if (
auto peer = weak.
lock())
2512 peer->checkValidation(val, key, m);
2517 JLOG(p_journal_.debug())
2518 <<
"Dropping untrusted validation for load";
2523 JLOG(p_journal_.warn())
2524 <<
"Exception processing validation: " << e.
what();
2525 using namespace std::string_literals;
2526 fee_.update(Resource::feeMalformedRequest, e.
what());
2533 protocol::TMGetObjectByHash& packet = *m;
2535 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2536 <<
" " << packet.objects_size();
2541 if (send_queue_.size() >= Tuning::dropSendQueue)
2543 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2547 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2553 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2555 if (!txReduceRelayEnabled())
2557 JLOG(p_journal_.error())
2558 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2559 fee_.update(Resource::feeMalformedRequest,
"disabled");
2564 app_.getJobQueue().addJob(
2566 if (
auto peer = weak.
lock())
2567 peer->doTransactions(m);
2572 protocol::TMGetObjectByHash reply;
2574 reply.set_query(
false);
2576 if (packet.has_seq())
2577 reply.set_seq(packet.seq());
2579 reply.set_type(packet.type());
2581 if (packet.has_ledgerhash())
2585 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2589 reply.set_ledgerhash(packet.ledgerhash());
2593 Resource::feeModerateBurdenPeer,
2594 " received a get object by hash request");
2597 for (
int i = 0; i < packet.objects_size(); ++i)
2599 auto const& obj = packet.objects(i);
2602 uint256 const hash{obj.hash()};
2605 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2606 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2609 protocol::TMIndexedObject& newObj = *reply.add_objects();
2610 newObj.set_hash(hash.begin(), hash.size());
2612 &nodeObject->getData().front(),
2613 nodeObject->getData().size());
2615 if (obj.has_nodeid())
2616 newObj.set_index(obj.nodeid());
2617 if (obj.has_ledgerseq())
2618 newObj.set_ledgerseq(obj.ledgerseq());
2625 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2626 << packet.objects_size();
2634 bool progress =
false;
2636 for (
int i = 0; i < packet.objects_size(); ++i)
2638 protocol::TMIndexedObject
const& obj = packet.objects(i);
2642 if (obj.has_ledgerseq())
2644 if (obj.ledgerseq() != pLSeq)
2646 if (pLDo && (pLSeq != 0))
2648 JLOG(p_journal_.debug())
2649 <<
"GetObj: Full fetch pack for " << pLSeq;
2651 pLSeq = obj.ledgerseq();
2652 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2656 JLOG(p_journal_.debug())
2657 <<
"GetObj: Late fetch pack for " << pLSeq;
2666 uint256 const hash{obj.hash()};
2668 app_.getLedgerMaster().addFetchPack(
2671 obj.data().begin(), obj.data().end()));
2676 if (pLDo && (pLSeq != 0))
2678 JLOG(p_journal_.debug())
2679 <<
"GetObj: Partial fetch pack for " << pLSeq;
2681 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2682 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2689 if (!txReduceRelayEnabled())
2691 JLOG(p_journal_.error())
2692 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2693 fee_.update(Resource::feeMalformedRequest,
"disabled");
2698 app_.getJobQueue().addJob(
2700 if (
auto peer = weak.
lock())
2701 peer->handleHaveTransactions(m);
2706PeerImp::handleHaveTransactions(
2709 protocol::TMGetObjectByHash tmBH;
2710 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2711 tmBH.set_query(
true);
2713 JLOG(p_journal_.trace())
2714 <<
"received TMHaveTransactions " << m->hashes_size();
2720 JLOG(p_journal_.error())
2721 <<
"TMHaveTransactions with invalid hash size";
2722 fee_.update(Resource::feeMalformedRequest,
"hash size");
2728 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2730 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2734 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2736 auto obj = tmBH.add_objects();
2737 obj->set_hash(hash.
data(), hash.
size());
2744 removeTxQueue(hash);
2748 JLOG(p_journal_.trace())
2749 <<
"transaction request object is " << tmBH.objects_size();
2751 if (tmBH.objects_size() > 0)
2758 if (!txReduceRelayEnabled())
2760 JLOG(p_journal_.error())
2761 <<
"TMTransactions: tx reduce-relay is disabled";
2762 fee_.update(Resource::feeMalformedRequest,
"disabled");
2766 JLOG(p_journal_.trace())
2767 <<
"received TMTransactions " << m->transactions_size();
2769 overlay_.addTxMetrics(m->transactions_size());
2774 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2782 using on_message_fn =
2784 if (!strand_.running_in_this_thread())
2788 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2790 if (!m->has_validatorpubkey())
2792 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2795 auto validator = m->validatorpubkey();
2799 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2805 if (key == app_.getValidationPublicKey())
2807 JLOG(p_journal_.debug())
2808 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2813 m->has_squelchduration() ? m->squelchduration() : 0;
2815 squelch_.removeSquelch(key);
2817 fee_.update(Resource::feeInvalidData,
"squelch duration");
2819 JLOG(p_journal_.debug())
2820 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2832 (void)lockedRecentLock;
2834 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2835 recentLedgers_.end())
2838 recentLedgers_.push_back(hash);
2847 if (app_.getFeeTrack().isLoadedLocal() ||
2848 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2849 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2851 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2857 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2858 fee_.update(Resource::feeMalformedRequest,
"hash size");
2862 fee_.fee = Resource::feeHeavyBurdenPeer;
2864 uint256 const hash{packet->ledgerhash()};
2867 auto elapsed = UptimeClock::now();
2868 auto const pap = &app_;
2869 app_.getJobQueue().addJob(
2870 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2871 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2876PeerImp::doTransactions(
2879 protocol::TMTransactions reply;
2881 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2882 << packet->objects_size();
2884 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2886 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2887 fee_.update(Resource::feeMalformedRequest,
"too big");
2893 auto const& obj = packet->objects(i);
2897 fee_.update(Resource::feeMalformedRequest,
"hash size");
2903 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2907 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
2909 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2914 auto tx = reply.add_transactions();
2915 auto sttx = txn->getSTransaction();
2917 tx->set_rawtransaction(s.
data(), s.
size());
2919 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
2921 tx->set_receivetimestamp(
2922 app_.timeKeeper().now().time_since_epoch().count());
2923 tx->set_deferred(txn->getSubmitResult().queued);
2926 if (reply.transactions_size() > 0)
2931PeerImp::checkTransaction(
2933 bool checkSignature,
2960 JLOG(p_journal_.warn()) <<
"Ignoring Network relayed Tx containing "
2961 "tfInnerBatchTxn (checkSignature).";
2962 charge(Resource::feeModerateBurdenPeer,
"inner batch txn");
2968 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2969 (stx->getFieldU32(sfLastLedgerSequence) <
2970 app_.getLedgerMaster().getValidLedgerIndex()))
2972 JLOG(p_journal_.info())
2973 <<
"Marking transaction " << stx->getTransactionID()
2974 <<
"as BAD because it's expired";
2975 app_.getHashRouter().setFlags(
2976 stx->getTransactionID(), HashRouterFlags::BAD);
2977 charge(Resource::feeUselessData,
"expired tx");
2988 tx->getStatus() ==
NEW,
2989 "xrpl::PeerImp::checkTransaction Transaction created "
2991 if (tx->getStatus() ==
NEW)
2993 JLOG(p_journal_.debug())
2994 <<
"Processing " << (
batch ?
"batch" :
"unsolicited")
2995 <<
" pseudo-transaction tx " << tx->getID();
2997 app_.getMasterTransaction().canonicalize(&tx);
3000 app_.getHashRouter().shouldRelay(tx->getID());
3003 JLOG(p_journal_.debug())
3004 <<
"Passing skipped pseudo pseudo-transaction tx "
3006 app_.overlay().relay(tx->getID(), {}, *toSkip);
3010 JLOG(p_journal_.debug())
3011 <<
"Charging for pseudo-transaction tx " << tx->getID();
3012 charge(Resource::feeUselessData,
"pseudo tx");
3023 app_.getHashRouter(),
3025 app_.getLedgerMaster().getValidatedRules(),
3027 valid != Validity::Valid)
3029 if (!validReason.empty())
3031 JLOG(p_journal_.debug())
3032 <<
"Exception checking transaction: " << validReason;
3037 app_.getHashRouter().setFlags(
3038 stx->getTransactionID(), HashRouterFlags::BAD);
3040 Resource::feeInvalidSignature,
3041 "check transaction signature failure");
3048 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3054 if (tx->getStatus() ==
INVALID)
3056 if (!reason.
empty())
3058 JLOG(p_journal_.debug())
3059 <<
"Exception checking transaction: " << reason;
3061 app_.getHashRouter().setFlags(
3062 stx->getTransactionID(), HashRouterFlags::BAD);
3063 charge(Resource::feeInvalidSignature,
"tx (impossible)");
3067 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
3068 app_.getOPs().processTransaction(
3069 tx, trusted,
false, NetworkOPs::FailHard::no);
3073 JLOG(p_journal_.warn())
3074 <<
"Exception in " << __func__ <<
": " << ex.
what();
3075 app_.getHashRouter().setFlags(
3076 stx->getTransactionID(), HashRouterFlags::BAD);
3077 using namespace std::string_literals;
3078 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
3084PeerImp::checkPropose(
3089 JLOG(p_journal_.trace())
3090 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
3092 XRPL_ASSERT(packet,
"xrpl::PeerImp::checkPropose : non-null packet");
3097 JLOG(p_journal_.warn()) << desc;
3098 charge(Resource::feeInvalidSignature, desc);
3105 relay = app_.getOPs().processTrustedProposal(peerPos);
3107 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3115 auto haveMessage = app_.overlay().relay(
3117 if (!haveMessage.empty())
3118 overlay_.updateSlotAndSquelch(
3121 std::move(haveMessage),
3122 protocol::mtPROPOSE_LEDGER);
3127PeerImp::checkValidation(
3132 if (!val->isValid())
3134 std::string desc{
"Validation forwarded by peer is invalid"};
3135 JLOG(p_journal_.debug()) << desc;
3136 charge(Resource::feeInvalidSignature, desc);
3151 overlay_.relay(*packet, key, val->getSignerPublic());
3152 if (!haveMessage.empty())
3154 overlay_.updateSlotAndSquelch(
3156 val->getSignerPublic(),
3157 std::move(haveMessage),
3158 protocol::mtVALIDATION);
3164 JLOG(p_journal_.trace())
3165 <<
"Exception processing validation: " << ex.
what();
3166 using namespace std::string_literals;
3167 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
3181 if (p->hasTxSet(rootHash) && p.get() != skip)
3183 auto score = p->getScore(true);
3184 if (!ret || (score > retScore))
3209 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3211 auto score = p->getScore(true);
3212 if (!ret || (score > retScore))
3224PeerImp::sendLedgerBase(
3226 protocol::TMLedgerData& ledgerData)
3228 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3231 addRaw(ledger->header(), s);
3234 auto const& stateMap{ledger->stateMap()};
3235 if (stateMap.getHash() != beast::zero)
3240 stateMap.serializeRoot(
root);
3241 ledgerData.add_nodes()->set_nodedata(
3242 root.getDataPtr(),
root.getLength());
3244 if (ledger->header().txHash != beast::zero)
3246 auto const& txMap{ledger->txMap()};
3247 if (txMap.getHash() != beast::zero)
3251 txMap.serializeRoot(
root);
3252 ledgerData.add_nodes()->set_nodedata(
3253 root.getDataPtr(),
root.getLength());
3266 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3270 if (m->has_ledgerhash())
3273 uint256 const ledgerHash{m->ledgerhash()};
3274 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3277 JLOG(p_journal_.trace())
3278 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3280 if (m->has_querytype() && !m->has_requestcookie())
3286 m->has_ledgerseq() ? m->ledgerseq() : 0,
3289 m->set_requestcookie(
id());
3292 JLOG(p_journal_.debug())
3293 <<
"getLedger: Request relayed to peer";
3297 JLOG(p_journal_.trace())
3298 <<
"getLedger: Failed to find peer to relay request";
3302 else if (m->has_ledgerseq())
3305 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3307 JLOG(p_journal_.debug())
3308 <<
"getLedger: Early ledger sequence request";
3312 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3315 JLOG(p_journal_.debug())
3316 <<
"getLedger: Don't have ledger with sequence "
3321 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3323 ledger = app_.getLedgerMaster().getClosedLedger();
3329 auto const ledgerSeq{ledger->header().seq};
3330 if (m->has_ledgerseq())
3332 if (ledgerSeq != m->ledgerseq())
3335 if (!m->has_requestcookie())
3337 Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3340 JLOG(p_journal_.warn())
3341 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3344 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3347 JLOG(p_journal_.debug())
3348 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3353 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3362 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3364 uint256 const txSetHash{m->ledgerhash()};
3366 app_.getInboundTransactions().getSet(txSetHash,
false)};
3369 if (m->has_querytype() && !m->has_requestcookie())
3374 m->set_requestcookie(
id());
3377 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3381 JLOG(p_journal_.debug())
3382 <<
"getTxSet: Failed to find relay peer";
3387 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3398 if (!m->has_requestcookie())
3400 Resource::feeModerateBurdenPeer,
"received a get ledger request");
3404 SHAMap const* map{
nullptr};
3405 protocol::TMLedgerData ledgerData;
3406 bool fatLeaves{
true};
3407 auto const itype{m->itype()};
3409 if (itype == protocol::liTS_CANDIDATE)
3411 if (sharedMap = getTxSet(m); !sharedMap)
3413 map = sharedMap.
get();
3416 ledgerData.set_ledgerseq(0);
3417 ledgerData.set_ledgerhash(m->ledgerhash());
3418 ledgerData.set_type(protocol::liTS_CANDIDATE);
3419 if (m->has_requestcookie())
3420 ledgerData.set_requestcookie(m->requestcookie());
3427 if (send_queue_.size() >= Tuning::dropSendQueue)
3429 JLOG(p_journal_.debug())
3430 <<
"processLedgerRequest: Large send queue";
3433 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3435 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3439 if (ledger = getLedger(m); !ledger)
3443 auto const ledgerHash{ledger->header().hash};
3444 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3445 ledgerData.set_ledgerseq(ledger->header().seq);
3446 ledgerData.set_type(itype);
3447 if (m->has_requestcookie())
3448 ledgerData.set_requestcookie(m->requestcookie());
3452 case protocol::liBASE:
3453 sendLedgerBase(ledger, ledgerData);
3456 case protocol::liTX_NODE:
3457 map = &ledger->txMap();
3458 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3459 << to_string(map->getHash());
3462 case protocol::liAS_NODE:
3463 map = &ledger->stateMap();
3464 JLOG(p_journal_.trace())
3465 <<
"processLedgerRequest: Account state map hash "
3466 << to_string(map->getHash());
3471 JLOG(p_journal_.error())
3472 <<
"processLedgerRequest: Invalid ledger info type";
3479 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3484 if (m->nodeids_size() > 0)
3486 auto const queryDepth{
3487 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3491 for (
int i = 0; i < m->nodeids_size() &&
3492 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3498 data.reserve(Tuning::softMaxReplyNodes);
3502 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3504 JLOG(p_journal_.trace())
3505 <<
"processLedgerRequest: getNodeFat got "
3506 << data.size() <<
" nodes";
3508 for (
auto const& d : data)
3510 if (ledgerData.nodes_size() >=
3511 Tuning::hardMaxReplyNodes)
3513 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3514 node->set_nodeid(d.first.getRawString());
3515 node->set_nodedata(d.second.data(), d.second.size());
3520 JLOG(p_journal_.warn())
3521 <<
"processLedgerRequest: getNodeFat returns false";
3529 case protocol::liBASE:
3531 info =
"Ledger base";
3534 case protocol::liTX_NODE:
3538 case protocol::liAS_NODE:
3542 case protocol::liTS_CANDIDATE:
3543 info =
"TS candidate";
3551 if (!m->has_ledgerhash())
3552 info +=
", no hash specified";
3554 JLOG(p_journal_.warn())
3555 <<
"processLedgerRequest: getNodeFat with nodeId "
3556 << *shaMapNodeId <<
" and ledger info type " << info
3557 <<
" throws exception: " << e.
what();
3561 JLOG(p_journal_.info())
3562 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3563 <<
" nodes at depth " << queryDepth <<
", return "
3564 << ledgerData.nodes_size() <<
" nodes";
3567 if (ledgerData.nodes_size() == 0)
3574PeerImp::getScore(
bool haveItem)
const
3578 static int const spRandomMax = 9999;
3582 static int const spHaveItem = 10000;
3587 static int const spLatency = 30;
3590 static int const spNoLatency = 8000;
3595 score += spHaveItem;
3604 score -= latency->count() * spLatency;
3606 score -= spNoLatency;
3612PeerImp::isHighLatency()
const
3615 return latency_ >= peerHighLatency;
3621 using namespace std::chrono_literals;
3624 totalBytes_ += bytes;
3625 accumBytes_ += bytes;
3626 auto const timeElapsed = clock_type::now() - intervalStart_;
3627 auto const timeElapsedInSecs =
3628 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3630 if (timeElapsedInSecs >= 1s)
3632 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3633 rollingAvg_.push_back(avgBytes);
3635 auto const totalBytes =
3637 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3639 intervalStart_ = clock_type::now();
3645PeerImp::Metrics::average_bytes()
const
3648 return rollingAvgBytes_;
3652PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual HashRouter & getHashRouter()=0
virtual Cluster & cluster()=0
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual TimeKeeper & timeKeeper()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual JobQueue & getJobQueue()=0
virtual ValidatorList & validators()=0
virtual NetworkOPs & getOPs()=0
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
std::size_t size() const
The number of nodes in the cluster list.
std::chrono::seconds MAX_DIVERGED_TIME
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_UNKNOWN_TIME
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
int getJobCount(JobType t) const
Jobs waiting at this priority.
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
std::chrono::seconds getValidatedLedgerAge()
LedgerIndex getValidLedgerIndex()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
virtual bool isNeedNetworkLedger()=0
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
PeerFinder::Manager & peerFinder()
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void onPeerDeactivate(Peer::id_t id)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
std::optional< std::chrono::milliseconds > latency_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
std::unique_ptr< LoadEvent > load_event_
beast::Journal const p_journal_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
ProtocolVersion protocol_
bool txReduceRelayEnabled_
void close()
Forcibly closes the underlying socket connection.
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
http_request_type request_
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
bool txReduceRelayEnabled() const override
std::shared_ptr< PeerFinder::Slot > const slot_
boost::beast::http::fields const & headers_
Compressed compressionEnabled_
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
boost::system::error_code error_code
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
std::string const & fingerprint() const override
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
bool ledgerReplayEnabled_
void sendTxQueue() override
Send aggregated transactions' hashes.
uint256 closedLedgerHash_
reduce_relay::Squelch< UptimeClock > squelch_
PeerImp(PeerImp const &)=delete
void cycleStatus() override
std::shared_mutex nameMutex_
beast::IP::Endpoint const remote_address_
std::string domain() const
std::atomic< Tracking > tracking_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
clock_type::duration uptime() const
PublicKey const publicKey_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Json::Value json() override
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::queue< std::shared_ptr< Message > > send_queue_
static std::string makePrefix(std::string const &fingerprint)
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
void onMessageUnknown(std::uint16_t type)
protocol::TMStatusChange last_status_
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
std::unique_ptr< stream_type > stream_ptr_
struct xrpl::PeerImp::@22 metrics_
boost::beast::multi_buffer read_buffer_
void send(std::shared_ptr< Message > const &m) override
hash_set< uint256 > txQueue_
bool supportsFeature(ProtocolFeature f) const override
void shutdown()
Initiates the peer disconnection sequence.
boost::asio::strand< boost::asio::executor > strand_
clock_type::time_point lastPingTime_
boost::circular_buffer< uint256 > recentLedgers_
bool hasTxSet(uint256 const &hash) const override
boost::circular_buffer< uint256 > recentTxSets_
clock_type::time_point trackingTime_
beast::Journal const journal_
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
uint256 previousLedgerHash_
Resource::Consumer usage_
std::optional< std::uint32_t > lastPingSeq_
bool crawl() const
Returns true if this connection will publicly share its IP address.
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
bool checkSign() const
Verify the signing hash of the proposal.
An endpoint that consumes resources.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
int balance()
Returns the credit balance representing consumption.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
void const * getDataPtr() const
std::size_t size() const noexcept
void const * data() const noexcept
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
static constexpr std::size_t size()
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeInvalidData
Charge const feeModerateBurdenPeer
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeTrivialPeer
Charge const feeUselessData
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ sendQueueLogFreq
How often to log send queue size.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ maxQueryDepth
The maximum number of levels to search.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
static constexpr char FEATURE_COMPR[]
Stopwatch & stopwatch()
Returns an instance of a wall clock.
constexpr std::uint32_t tfInnerBatchTxn
std::string to_string(base_uint< Bits, Tag > const &a)
std::string strHex(FwdIt begin, FwdIt end)
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
std::string base64_decode(std::string_view data)
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
@ ValidatorListPropagation
@ ValidatorList2Propagation
Number root(Number f, unsigned d)
static bool stringIsUint256Sized(std::string const &pBuffStr)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
@ proposal
proposal for signing
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
static constexpr char FEATURE_LEDGER_REPLAY[]
static constexpr char FEATURE_VPRR[]
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
static constexpr char FEATURE_TXRR[]
T shared_from_this(T... args)
std::optional< std::uint32_t > networkID
beast::IP::Address public_ip
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items