1#include <xrpld/app/consensus/RCLValidations.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionMaster.h>
6#include <xrpld/app/misc/Transaction.h>
7#include <xrpld/app/misc/ValidatorList.h>
8#include <xrpld/overlay/Cluster.h>
9#include <xrpld/overlay/detail/PeerImp.h>
10#include <xrpld/overlay/detail/Tuning.h>
12#include <xrpl/basics/UptimeClock.h>
13#include <xrpl/basics/base64.h>
14#include <xrpl/basics/random.h>
15#include <xrpl/basics/safe_cast.h>
16#include <xrpl/core/HashRouter.h>
17#include <xrpl/core/PerfLog.h>
18#include <xrpl/protocol/TxFlags.h>
19#include <xrpl/protocol/digest.h>
20#include <xrpl/server/LoadFeeTrack.h>
21#include <xrpl/server/NetworkOPs.h>
22#include <xrpl/tx/apply.h>
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/beast/core/ostream.hpp>
34using namespace std::chrono_literals;
67 , prefix_(makePrefix(fingerprint_))
68 , sink_(app_.getJournal(
"Peer"), prefix_)
69 , p_sink_(app_.getJournal(
"Protocol"), prefix_)
72 , stream_ptr_(
std::move(stream_ptr))
73 , socket_(stream_ptr_->next_layer().socket())
74 , stream_(*stream_ptr_)
75 , strand_(
boost::asio::make_strand(socket_.get_executor()))
77 , remote_address_(slot->remote_endpoint())
83 , publicKey_(publicKey)
86 , squelch_(app_.getJournal(
"Squelch"))
88 , fee_{Resource::feeTrivialPeer,
""}
90 , request_(
std::move(request))
92 , compressionEnabled_(
96 , txReduceRelayEnabled_(
98 , ledgerReplayEnabled_(
100 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
103 <<
" vp reduce-relay base squelch enabled "
113 bool const inCluster{
cluster()};
136 if (!
strand_.running_in_this_thread())
155 if (
auto const iter =
headers_.find(
"Closed-Ledger"); iter !=
headers_.end())
157 closed = parseLedgerHash(iter->value());
160 fail(
"Malformed handshake data (1)");
163 if (
auto const iter =
headers_.find(
"Previous-Ledger"); iter !=
headers_.end())
165 previous = parseLedgerHash(iter->value());
168 fail(
"Malformed handshake data (2)");
171 if (previous && !closed)
172 fail(
"Malformed handshake data (3)");
198 if (!
strand_.running_in_this_thread())
221 if (!
strand_.running_in_this_thread())
237 auto validator = m->getValidatorKey();
238 if (validator && !
squelch_.expireSquelch(*validator))
248 safe_cast<TrafficCount::category>(m->getCategory()),
267 sink << n <<
" sendq: " << sendq_size;
276 boost::asio::async_write(
284 std::placeholders::_1,
285 std::placeholders::_2)));
291 if (!
strand_.running_in_this_thread())
299 protocol::TMHaveTransactions ht;
301 ht.add_hashes(hash.data(), hash.size());
312 if (!
strand_.running_in_this_thread())
331 if (!
strand_.running_in_this_thread())
337 auto removed =
txQueue_.erase(hash);
345 strand_.running_in_this_thread())
349 fail(
"charge: Resources");
358 auto const iter =
headers_.find(
"Crawl");
361 return boost::iequals(iter->value(),
"public");
387 ret[jss::inbound] =
true;
391 ret[jss::cluster] =
true;
403 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
420 static_cast<Json::UInt>(std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
425 if ((minSeq != 0) || (maxSeq != 0))
431 ret[jss::track] =
"diverged";
435 ret[jss::track] =
"unknown";
444 protocol::TMStatusChange last_status;
451 if (closedLedgerHash != beast::zero)
452 ret[jss::ledger] =
to_string(closedLedgerHash);
454 if (last_status.has_newstatus())
456 switch (last_status.newstatus())
458 case protocol::nsCONNECTING:
459 ret[jss::status] =
"connecting";
462 case protocol::nsCONNECTED:
463 ret[jss::status] =
"connected";
466 case protocol::nsMONITORING:
467 ret[jss::status] =
"monitoring";
470 case protocol::nsVALIDATING:
471 ret[jss::status] =
"validating";
474 case protocol::nsSHUTTING:
475 ret[jss::status] =
"shutting";
479 JLOG(
p_journal_.
warn()) <<
"Unknown status: " << last_status.newstatus();
561 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::fail : strand in this thread");
574 if (!
strand_.running_in_this_thread())
600 strand_.running_in_this_thread(),
601 "xrpl::PeerImp::tryAsyncShutdown : strand in this thread");
614 stream_.async_shutdown(bind_executor(
621 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::shutdown: strand in this thread");
628 boost::beast::get_lowest_layer(
stream_).cancel();
644 bool const shouldLog =
645 (ec != boost::asio::error::eof && ec != boost::asio::error::operation_aborted &&
646 ec.message().find(
"application data after close notify") == std::string::npos);
660 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::close : strand in this thread");
686 timer_.expires_after(interval);
695 timer_.async_wait(bind_executor(
712 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::onTimer : strand in this thread");
720 if (ec == boost::asio::error::operation_aborted)
740 fail(
"Large send queue");
746 clock_type::duration duration;
765 fail(
"Ping Timeout");
772 protocol::TMPing message;
773 message.set_type(protocol::TMPing::ptPING);
798 XRPL_ASSERT(
read_buffer_.size() == 0,
"xrpl::PeerImp::doAccept : empty read buffer");
815 fail(
"makeSharedValue: Unexpected failure");
849 boost::asio::async_write(
851 write_buffer->data(),
852 boost::asio::transfer_all(),
859 if (ec == boost::asio::error::operation_aborted)
866 fail(
"onWriteResponse", ec);
869 if (write_buffer->size() == bytes_transferred)
874 fail(
"Failed to write header");
945 strand_.running_in_this_thread(),
"xrpl::PeerImp::onReadMessage : strand in this thread");
954 if (ec == boost::asio::error::eof)
961 if (ec == boost::asio::error::operation_aborted)
967 fail(
"onReadMessage", ec);
979 stream <<
"onReadMessage: "
980 << (bytes_transferred > 0 ?
to_string(bytes_transferred) +
" bytes" :
"");
983 metrics_.recv.add_message(bytes_transferred);
993 using namespace std::chrono_literals;
996 "invokeProtocolMessage",
1007 fail(
"onReadMessage", ec);
1011 if (bytes_consumed == 0)
1026 XRPL_ASSERT(!
shutdownStarted_,
"xrpl::PeerImp::onReadMessage : shutdown started");
1036 std::placeholders::_1,
1037 std::placeholders::_2)));
1044 strand_.running_in_this_thread(),
"xrpl::PeerImp::onWriteMessage : strand in this thread");
1053 if (ec == boost::asio::error::operation_aborted)
1059 fail(
"onWriteMessage", ec);
1065 stream <<
"onWriteMessage: "
1066 << (bytes_transferred > 0 ?
to_string(bytes_transferred) +
" bytes" :
"");
1069 metrics_.sent.add_message(bytes_transferred);
1071 XRPL_ASSERT(!
send_queue_.empty(),
"xrpl::PeerImp::onWriteMessage : non-empty send buffer");
1083 XRPL_ASSERT(!
shutdownStarted_,
"xrpl::PeerImp::onWriteMessage : shutdown started");
1086 boost::asio::async_write(
1094 std::placeholders::_1,
1095 std::placeholders::_2)));
1124 auto const category =
1134 if ((type == MessageType::mtTRANSACTION || type == MessageType::mtHAVE_TRANSACTIONS ||
1135 type == MessageType::mtTRANSACTIONS ||
1148 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" " << uncompressed_size
1149 <<
" " << isCompressed;
1162 auto const s = m->list_size();
1181 if (m->type() == protocol::TMPing::ptPING)
1185 m->set_type(protocol::TMPing::ptPONG);
1190 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1229 for (
int i = 0; i < m->clusternodes().size(); ++i)
1231 protocol::TMClusterNode
const& node = m->clusternodes(i);
1234 if (node.has_nodename())
1235 name = node.nodename();
1249 int const loadSources = m->loadsources().size();
1250 if (loadSources != 0)
1253 gossip.
items.reserve(loadSources);
1254 for (
int i = 0; i < m->loadsources().size(); ++i)
1256 protocol::TMLoadSource
const& node = m->loadsources(i);
1261 gossip.
items.push_back(item);
1274 if (status.getReportTime() >= thresh)
1275 fees.push_back(status.getLoadFee());
1280 auto const index = fees.size() / 2;
1282 clusterFee = fees[index];
1298 if (m->endpoints_v2().size() >= 1024)
1305 endpoints.
reserve(m->endpoints_v2().size());
1308 for (
auto const& tm : m->endpoints_v2())
1315 <<
"failed to parse incoming endpoint: {" << tm.endpoint() <<
"}";
1341 if (!endpoints.
empty())
1357 XRPL_ASSERT(eraseTxQueue !=
batch, (
"xrpl::PeerImp::handleTransaction : valid inputs"));
1365 JLOG(
p_journal_.
debug()) <<
"Ignoring incoming transaction: Need network ledger";
1374 uint256 const txID = stx->getTransactionID();
1396 JLOG(
p_journal_.
warn()) <<
"Ignoring Network relayed Tx containing "
1397 "tfInnerBatchTxn (handleTransaction).";
1430 bool checkSignature =
true;
1433 if (!m->has_deferred() || !m->deferred())
1446 checkSignature =
false;
1469 if (
auto peer = weak.lock())
1470 peer->checkTransaction(flags, checkSignature, stx,
batch);
1477 <<
". Exception: " << ex.
what();
1488 auto const itype{m->itype()};
1491 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1493 badData(
"Invalid ledger info type");
1503 if (itype == protocol::liTS_CANDIDATE)
1505 if (!m->has_ledgerhash())
1507 badData(
"Invalid TX candidate set, missing TX set hash");
1512 !m->has_ledgerhash() && !m->has_ledgerseq() && (!ltype || *ltype != protocol::ltCLOSED))
1514 badData(
"Invalid request");
1519 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1521 badData(
"Invalid ledger type");
1528 badData(
"Invalid ledger hash");
1533 if (m->has_ledgerseq())
1535 auto const ledgerSeq{m->ledgerseq()};
1538 using namespace std::chrono_literals;
1548 if (itype != protocol::liBASE)
1550 if (m->nodeids_size() <= 0)
1552 badData(
"Invalid ledger node IDs");
1556 for (
auto const& nodeId : m->nodeids())
1560 badData(
"Invalid SHAMap node ID");
1567 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1569 badData(
"Invalid query type");
1574 if (m->has_querydepth())
1578 badData(
"Invalid query depth");
1586 if (
auto peer = weak.
lock())
1587 peer->processLedgerRequest(m);
1604 if (
auto peer = weak.
lock())
1606 auto reply = peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1607 if (reply.has_error())
1609 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1611 peer->charge(Resource::feeMalformedRequest,
"proof_path_request");
1615 peer->charge(Resource::feeRequestNoReply,
"proof_path_request");
1620 peer->send(std::make_shared<Message>(reply, protocol::mtPROOF_PATH_RESPONSE));
1629 if (!ledgerReplayEnabled_)
1631 fee_.update(Resource::feeMalformedRequest,
"proof_path_response disabled");
1635 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1637 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1644 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1645 if (!ledgerReplayEnabled_)
1647 fee_.update(Resource::feeMalformedRequest,
"replay_delta_request disabled");
1651 fee_.fee = Resource::feeModerateBurdenPeer;
1653 app_.getJobQueue().addJob(
jtREPLAY_REQ,
"RcvReplDReq", [weak, m]() {
1654 if (
auto peer = weak.
lock())
1656 auto reply = peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1657 if (reply.has_error())
1659 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1661 peer->charge(Resource::feeMalformedRequest,
"replay_delta_request");
1665 peer->charge(Resource::feeRequestNoReply,
"replay_delta_request");
1670 peer->send(std::make_shared<Message>(reply, protocol::mtREPLAY_DELTA_RESPONSE));
1679 if (!ledgerReplayEnabled_)
1681 fee_.update(Resource::feeMalformedRequest,
"replay_delta_response disabled");
1685 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1687 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1695 fee_.update(Resource::feeInvalidData, msg);
1696 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1702 badData(
"Invalid ledger hash");
1708 auto const ledgerSeq{m->ledgerseq()};
1709 if (m->type() == protocol::liTS_CANDIDATE)
1720 using namespace std::chrono_literals;
1721 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1722 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1731 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1733 badData(
"Invalid ledger info type");
1738 if (m->has_error() &&
1739 (m->error() < protocol::reNO_LEDGER || m->error() > protocol::reBAD_REQUEST))
1741 badData(
"Invalid reply error");
1746 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1748 badData(
"Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1753 if (m->has_requestcookie())
1755 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1757 m->clear_requestcookie();
1762 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1767 uint256 const ledgerHash{m->ledgerhash()};
1770 if (m->type() == protocol::liTS_CANDIDATE)
1773 app_.getJobQueue().addJob(
jtTXN_DATA,
"RcvPeerData", [weak, ledgerHash, m]() {
1774 if (
auto peer = weak.lock())
1776 peer->app_.getInboundTransactions().gotData(ledgerHash, peer, m);
1783 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1789 protocol::TMProposeSet
const&
set = *m;
1798 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1799 fee_.update(Resource::feeInvalidSignature,
" signature can't be longer than 72 bytes");
1805 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1806 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1814 auto const isTrusted = app_.getValidators().trusted(publicKey);
1822 overlay_.reportInboundTraffic(
1823 TrafficCount::category::proposal_untrusted, Message::messageSize(*m));
1825 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1829 uint256 const proposeHash{
set.currenttxhash()};
1830 uint256 const prevLedger{
set.previousledger()};
1835 proposeHash, prevLedger,
set.proposeseq(), closeTime, publicKey.slice(), sig);
1837 if (
auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1842 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1843 overlay_.updateSlotAndSquelch(suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1846 overlay_.reportInboundTraffic(
1847 TrafficCount::category::proposal_duplicate, Message::messageSize(*m));
1849 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1856 if (tracking_.load() == Tracking::diverged)
1858 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (peer divergence)";
1862 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1864 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1869 JLOG(p_journal_.trace()) <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1880 app_.getTimeKeeper().closeTime(),
1881 calcNodeID(app_.getValidatorManifests().getMasterKey(publicKey))});
1884 app_.getJobQueue().addJob(
1886 if (
auto peer = weak.lock())
1887 peer->checkPropose(isTrusted, m,
proposal);
1894 JLOG(p_journal_.trace()) <<
"Status: Change";
1896 if (!m->has_networktime())
1897 m->set_networktime(app_.getTimeKeeper().now().time_since_epoch().count());
1901 if (!last_status_.has_newstatus() || m->has_newstatus())
1908 protocol::NodeStatus
const status = last_status_.newstatus();
1910 m->set_newstatus(status);
1914 if (m->newevent() == protocol::neLOST_SYNC)
1916 bool outOfSync{
false};
1921 if (!closedLedgerHash_.isZero())
1924 closedLedgerHash_.zero();
1926 previousLedgerHash_.zero();
1930 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1943 if (peerChangedLedgers)
1945 closedLedgerHash_ = m->ledgerhash();
1946 closedLedgerHash = closedLedgerHash_;
1947 addLedger(closedLedgerHash, sl);
1951 closedLedgerHash_.zero();
1956 previousLedgerHash_ = m->ledgerhashprevious();
1957 addLedger(previousLedgerHash_, sl);
1961 previousLedgerHash_.zero();
1964 if (peerChangedLedgers)
1966 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1970 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1974 if (m->has_firstseq() && m->has_lastseq())
1978 minLedger_ = m->firstseq();
1979 maxLedger_ = m->lastseq();
1981 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1982 minLedger_ = maxLedger_ = 0;
1985 if (m->has_ledgerseq() && app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1987 checkTracking(m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1990 app_.getOPs().pubPeerStatus([m,
this]() ->
Json::Value {
1993 if (m->has_newstatus())
1995 switch (m->newstatus())
1997 case protocol::nsCONNECTING:
1998 j[jss::status] =
"CONNECTING";
2000 case protocol::nsCONNECTED:
2001 j[jss::status] =
"CONNECTED";
2003 case protocol::nsMONITORING:
2004 j[jss::status] =
"MONITORING";
2006 case protocol::nsVALIDATING:
2007 j[jss::status] =
"VALIDATING";
2009 case protocol::nsSHUTTING:
2010 j[jss::status] =
"SHUTTING";
2015 if (m->has_newevent())
2017 switch (m->newevent())
2019 case protocol::neCLOSING_LEDGER:
2020 j[jss::action] =
"CLOSING_LEDGER";
2022 case protocol::neACCEPTED_LEDGER:
2023 j[jss::action] =
"ACCEPTED_LEDGER";
2025 case protocol::neSWITCHED_LEDGER:
2026 j[jss::action] =
"SWITCHED_LEDGER";
2028 case protocol::neLOST_SYNC:
2029 j[jss::action] =
"LOST_SYNC";
2034 if (m->has_ledgerseq())
2036 j[jss::ledger_index] = m->ledgerseq();
2039 if (m->has_ledgerhash())
2041 uint256 closedLedgerHash{};
2044 closedLedgerHash = closedLedgerHash_;
2046 j[jss::ledger_hash] = to_string(closedLedgerHash);
2049 if (m->has_networktime())
2054 if (m->has_firstseq() && m->has_lastseq())
2056 j[jss::ledger_index_min] =
Json::UInt(m->firstseq());
2057 j[jss::ledger_index_max] =
Json::UInt(m->lastseq());
2073 serverSeq = maxLedger_;
2079 checkTracking(serverSeq, validationSeq);
2088 if (diff < Tuning::convergedLedgerLimit)
2091 tracking_ = Tracking::converged;
2094 if ((diff > Tuning::divergedLedgerLimit) && (tracking_.load() != Tracking::diverged))
2099 tracking_ = Tracking::diverged;
2100 trackingTime_ = clock_type::now();
2109 fee_.update(Resource::feeMalformedRequest,
"bad hash");
2113 uint256 const hash{m->hash()};
2115 if (m->status() == protocol::tsHAVE)
2119 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) != recentTxSets_.end())
2121 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
2125 recentTxSets_.push_back(hash);
2130PeerImp::onValidatorListMessage(
2140 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType;
2142 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
2148 JLOG(p_journal_.debug()) <<
"Received " << messageType;
2150 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2152 JLOG(p_journal_.debug()) << messageType <<
": received duplicate " << messageType;
2156 fee_.update(Resource::feeUselessData,
"duplicate");
2160 auto const applyResult = app_.getValidators().applyListsAndBroadcast(
2164 remote_address_.to_string(),
2167 app_.getHashRouter(),
2170 JLOG(p_journal_.debug()) <<
"Processed " << messageType <<
" version " << version <<
" from "
2171 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2172 :
"unknown or invalid publisher")
2173 <<
" with best result " << to_string(applyResult.bestDisposition());
2176 switch (applyResult.bestDisposition())
2179 case ListDisposition::accepted:
2181 case ListDisposition::expired:
2183 case ListDisposition::pending: {
2187 applyResult.publisherKey,
2188 "xrpl::PeerImp::onValidatorListMessage : publisher key is "
2190 auto const& pubKey = *applyResult.publisherKey;
2192 if (
auto const iter = publisherListSequences_.find(pubKey);
2193 iter != publisherListSequences_.end())
2196 iter->second < applyResult.sequence,
2197 "xrpl::PeerImp::onValidatorListMessage : lower sequence");
2200 publisherListSequences_[pubKey] = applyResult.sequence;
2203 case ListDisposition::same_sequence:
2204 case ListDisposition::known_sequence:
2209 applyResult.sequence && applyResult.publisherKey,
2210 "xrpl::PeerImp::onValidatorListMessage : nonzero sequence "
2211 "and set publisher key");
2213 publisherListSequences_[*applyResult.publisherKey] <= applyResult.sequence,
2214 "xrpl::PeerImp::onValidatorListMessage : maximum sequence");
2219 case ListDisposition::stale:
2220 case ListDisposition::untrusted:
2221 case ListDisposition::invalid:
2222 case ListDisposition::unsupported_version:
2227 "xrpl::PeerImp::onValidatorListMessage : invalid best list "
2233 switch (applyResult.worstDisposition())
2235 case ListDisposition::accepted:
2236 case ListDisposition::expired:
2237 case ListDisposition::pending:
2240 case ListDisposition::same_sequence:
2241 case ListDisposition::known_sequence:
2245 fee_.update(Resource::feeUselessData,
" duplicate (same_sequence or known_sequence)");
2247 case ListDisposition::stale:
2250 fee_.update(Resource::feeInvalidData,
"expired");
2252 case ListDisposition::untrusted:
2256 fee_.update(Resource::feeUselessData,
"untrusted");
2258 case ListDisposition::invalid:
2260 fee_.update(Resource::feeInvalidSignature,
"invalid list disposition");
2262 case ListDisposition::unsupported_version:
2265 fee_.update(Resource::feeInvalidData,
"version");
2270 "xrpl::PeerImp::onValidatorListMessage : invalid worst list "
2276 for (
auto const& [disp, count] : applyResult.dispositions)
2281 case ListDisposition::accepted:
2282 JLOG(p_journal_.debug()) <<
"Applied " << count <<
" new " << messageType;
2285 case ListDisposition::expired:
2286 JLOG(p_journal_.debug()) <<
"Applied " << count <<
" expired " << messageType;
2289 case ListDisposition::pending:
2290 JLOG(p_journal_.debug()) <<
"Processed " << count <<
" future " << messageType;
2292 case ListDisposition::same_sequence:
2293 JLOG(p_journal_.warn())
2294 <<
"Ignored " << count <<
" " << messageType <<
"(s) with current sequence";
2296 case ListDisposition::known_sequence:
2297 JLOG(p_journal_.warn())
2298 <<
"Ignored " << count <<
" " << messageType <<
"(s) with future sequence";
2300 case ListDisposition::stale:
2301 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
"stale " << messageType;
2303 case ListDisposition::untrusted:
2304 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
" untrusted " << messageType;
2306 case ListDisposition::unsupported_version:
2307 JLOG(p_journal_.warn())
2308 <<
"Ignored " << count <<
"unsupported version " << messageType;
2310 case ListDisposition::invalid:
2311 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
"invalid " << messageType;
2316 "xrpl::PeerImp::onValidatorListMessage : invalid list "
2328 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2330 JLOG(p_journal_.debug()) <<
"ValidatorList: received validator list from peer using "
2331 <<
"protocol version " << to_string(protocol_)
2332 <<
" which shouldn't support this feature.";
2333 fee_.update(Resource::feeUselessData,
"unsupported peer");
2336 onValidatorListMessage(
2337 "ValidatorList", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2341 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what();
2342 using namespace std::string_literals;
2343 fee_.update(Resource::feeInvalidData, e.
what());
2352 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2354 JLOG(p_journal_.debug())
2355 <<
"ValidatorListCollection: received validator list from peer "
2356 <<
"using protocol version " << to_string(protocol_)
2357 <<
" which shouldn't support this feature.";
2358 fee_.update(Resource::feeUselessData,
"unsupported peer");
2361 if (m->version() < 2)
2363 JLOG(p_journal_.debug())
2364 <<
"ValidatorListCollection: received invalid validator list "
2366 << m->version() <<
" from peer using protocol version " << to_string(protocol_);
2367 fee_.update(Resource::feeInvalidData,
"wrong version");
2370 onValidatorListMessage(
2371 "ValidatorListCollection", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2375 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, " << e.
what();
2376 using namespace std::string_literals;
2377 fee_.update(Resource::feeInvalidData, e.
what());
2384 if (m->validation().size() < 50)
2386 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2387 fee_.update(Resource::feeMalformedRequest,
"too small");
2393 auto const closeTime = app_.getTimeKeeper().closeTime();
2401 return calcNodeID(app_.getValidatorManifests().getMasterKey(pk));
2404 val->setSeen(closeTime);
2408 app_.getValidations().parms(),
2409 app_.getTimeKeeper().closeTime(),
2411 val->getSeenTime()))
2413 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2414 fee_.update(Resource::feeUselessData,
"not current");
2421 auto const isTrusted = app_.getValidators().trusted(val->getSignerPublic());
2429 overlay_.reportInboundTraffic(
2430 TrafficCount::category::validation_untrusted, Message::messageSize(*m));
2432 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2438 auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2445 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2447 overlay_.updateSlotAndSquelch(
2448 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2452 overlay_.reportInboundTraffic(
2453 TrafficCount::category::validation_duplicate, Message::messageSize(*m));
2455 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2459 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2461 JLOG(p_journal_.debug()) <<
"Dropping untrusted validation from diverged peer";
2463 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2465 std::string const name = isTrusted ?
"ChkTrust" :
"ChkUntrust";
2468 app_.getJobQueue().addJob(
2470 if (
auto peer = weak.
lock())
2471 peer->checkValidation(val, key, m);
2476 JLOG(p_journal_.debug()) <<
"Dropping untrusted validation for load";
2481 JLOG(p_journal_.warn()) <<
"Exception processing validation: " << e.
what();
2482 using namespace std::string_literals;
2483 fee_.update(Resource::feeMalformedRequest, e.
what());
2490 protocol::TMGetObjectByHash
const& packet = *m;
2492 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type() <<
" "
2493 << packet.objects_size();
2498 if (send_queue_.size() >= Tuning::dropSendQueue)
2500 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2504 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2510 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2512 if (!txReduceRelayEnabled())
2514 JLOG(p_journal_.error()) <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2515 fee_.update(Resource::feeMalformedRequest,
"disabled");
2521 if (
auto peer = weak.
lock())
2522 peer->doTransactions(m);
2527 protocol::TMGetObjectByHash reply;
2529 reply.set_query(
false);
2531 if (packet.has_seq())
2532 reply.set_seq(packet.seq());
2534 reply.set_type(packet.type());
2536 if (packet.has_ledgerhash())
2540 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2544 reply.set_ledgerhash(packet.ledgerhash());
2547 fee_.update(Resource::feeModerateBurdenPeer,
" received a get object by hash request");
2550 for (
int i = 0; i < packet.objects_size(); ++i)
2552 auto const& obj = packet.objects(i);
2555 uint256 const hash{obj.hash()};
2558 std::uint32_t const seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2559 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2562 protocol::TMIndexedObject& newObj = *reply.add_objects();
2563 newObj.set_hash(hash.begin(), hash.size());
2564 newObj.set_data(&nodeObject->getData().front(), nodeObject->getData().size());
2566 if (obj.has_nodeid())
2567 newObj.set_index(obj.nodeid());
2568 if (obj.has_ledgerseq())
2569 newObj.set_ledgerseq(obj.ledgerseq());
2575 if (reply.objects_size() >= Tuning::hardMaxReplyNodes)
2578 Resource::feeModerateBurdenPeer,
2579 " Reply limit reached. Truncating reply.");
2586 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2587 << packet.objects_size();
2595 bool progress =
false;
2597 for (
int i = 0; i < packet.objects_size(); ++i)
2599 protocol::TMIndexedObject
const& obj = packet.objects(i);
2603 if (obj.has_ledgerseq())
2605 if (obj.ledgerseq() != pLSeq)
2607 if (pLDo && (pLSeq != 0))
2609 JLOG(p_journal_.debug()) <<
"GetObj: Full fetch pack for " << pLSeq;
2611 pLSeq = obj.ledgerseq();
2612 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2616 JLOG(p_journal_.debug()) <<
"GetObj: Late fetch pack for " << pLSeq;
2627 uint256 const hash{obj.hash()};
2629 app_.getLedgerMaster().addFetchPack(
2635 if (pLDo && (pLSeq != 0))
2637 JLOG(p_journal_.debug()) <<
"GetObj: Partial fetch pack for " << pLSeq;
2639 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2640 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2647 if (!txReduceRelayEnabled())
2649 JLOG(p_journal_.error()) <<
"TMHaveTransactions: tx reduce-relay is disabled";
2650 fee_.update(Resource::feeMalformedRequest,
"disabled");
2655 app_.getJobQueue().addJob(
jtMISSING_TXN,
"HandleHaveTxs", [weak, m]() {
2656 if (
auto peer = weak.
lock())
2657 peer->handleHaveTransactions(m);
2664 protocol::TMGetObjectByHash tmBH;
2665 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2666 tmBH.set_query(
true);
2668 JLOG(p_journal_.trace()) <<
"received TMHaveTransactions " << m->hashes_size();
2674 JLOG(p_journal_.error()) <<
"TMHaveTransactions with invalid hash size";
2675 fee_.update(Resource::feeMalformedRequest,
"hash size");
2681 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2683 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2687 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2689 auto obj = tmBH.add_objects();
2690 obj->set_hash(hash.
data(), hash.
size());
2697 removeTxQueue(hash);
2701 JLOG(p_journal_.trace()) <<
"transaction request object is " << tmBH.objects_size();
2703 if (tmBH.objects_size() > 0)
2710 if (!txReduceRelayEnabled())
2712 JLOG(p_journal_.error()) <<
"TMTransactions: tx reduce-relay is disabled";
2713 fee_.update(Resource::feeMalformedRequest,
"disabled");
2717 JLOG(p_journal_.trace()) <<
"received TMTransactions " << m->transactions_size();
2719 overlay_.addTxMetrics(m->transactions_size());
2725 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2735 if (!strand_.running_in_this_thread())
2737 post(strand_,
std::bind((on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2741 if (!m->has_validatorpubkey())
2743 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2746 auto validator = m->validatorpubkey();
2750 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2756 if (key == app_.getValidationPublicKey())
2758 JLOG(p_journal_.debug()) <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2762 std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
2765 squelch_.removeSquelch(key);
2769 fee_.update(Resource::feeInvalidData,
"squelch duration");
2772 JLOG(p_journal_.debug()) <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2782 (void)lockedRecentLock;
2784 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) != recentLedgers_.end())
2787 recentLedgers_.push_back(hash);
2796 if (app_.getFeeTrack().isLoadedLocal() ||
2797 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2798 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2800 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2806 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2807 fee_.update(Resource::feeMalformedRequest,
"hash size");
2811 fee_.fee = Resource::feeHeavyBurdenPeer;
2813 uint256 const hash{packet->ledgerhash()};
2816 auto elapsed = UptimeClock::now();
2817 auto const pap = &app_;
2818 app_.getJobQueue().addJob(
jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2819 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2826 protocol::TMTransactions reply;
2828 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2829 << packet->objects_size();
2831 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2833 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2834 fee_.update(Resource::feeMalformedRequest,
"too big");
2840 auto const& obj = packet->objects(i);
2844 fee_.update(Resource::feeMalformedRequest,
"hash size");
2850 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2854 JLOG(p_journal_.error())
2855 <<
"doTransactions, transaction not found " <<
Slice(hash.
data(), hash.
size());
2856 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2861 auto tx = reply.add_transactions();
2862 auto sttx = txn->getSTransaction();
2864 tx->set_rawtransaction(s.
data(), s.
size());
2865 tx->set_status(txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT : protocol::tsNEW);
2866 tx->set_receivetimestamp(app_.getTimeKeeper().now().time_since_epoch().count());
2867 tx->set_deferred(txn->getSubmitResult().queued);
2870 if (reply.transactions_size() > 0)
2875PeerImp::checkTransaction(
2877 bool checkSignature,
2904 JLOG(p_journal_.warn()) <<
"Ignoring Network relayed Tx containing "
2905 "tfInnerBatchTxn (checkSignature).";
2906 charge(Resource::feeModerateBurdenPeer,
"inner batch txn");
2912 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2913 (stx->getFieldU32(sfLastLedgerSequence) < app_.getLedgerMaster().getValidLedgerIndex()))
2915 JLOG(p_journal_.info()) <<
"Marking transaction " << stx->getTransactionID()
2916 <<
"as BAD because it's expired";
2917 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2918 charge(Resource::feeUselessData,
"expired tx");
2929 tx->getStatus() ==
NEW,
2930 "xrpl::PeerImp::checkTransaction Transaction created "
2932 if (tx->getStatus() ==
NEW)
2934 JLOG(p_journal_.debug()) <<
"Processing " << (
batch ?
"batch" :
"unsolicited")
2935 <<
" pseudo-transaction tx " << tx->getID();
2937 app_.getMasterTransaction().canonicalize(&tx);
2939 auto const toSkip = app_.getHashRouter().shouldRelay(tx->getID());
2942 JLOG(p_journal_.debug())
2943 <<
"Passing skipped pseudo pseudo-transaction tx " << tx->getID();
2944 app_.getOverlay().relay(tx->getID(), {}, *toSkip);
2948 JLOG(p_journal_.debug())
2949 <<
"Charging for pseudo-transaction tx " << tx->getID();
2950 charge(Resource::feeUselessData,
"pseudo tx");
2961 app_.getHashRouter(), *stx, app_.getLedgerMaster().getValidatedRules());
2962 valid != Validity::Valid)
2964 if (!validReason.empty())
2966 JLOG(p_journal_.debug()) <<
"Exception checking transaction: " << validReason;
2971 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2972 charge(Resource::feeInvalidSignature,
"check transaction signature failure");
2978 forceValidity(app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2984 if (tx->getStatus() ==
INVALID)
2986 if (!reason.
empty())
2988 JLOG(p_journal_.debug()) <<
"Exception checking transaction: " << reason;
2990 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2991 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2995 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
2996 app_.getOPs().processTransaction(tx, trusted,
false, NetworkOPs::FailHard::no);
3000 JLOG(p_journal_.warn()) <<
"Exception in " << __func__ <<
": " << ex.
what();
3001 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
3002 using namespace std::string_literals;
3003 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
3009PeerImp::checkPropose(
3014 JLOG(p_journal_.trace()) <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
3016 XRPL_ASSERT(packet,
"xrpl::PeerImp::checkPropose : non-null packet");
3020 std::string const desc{
"Proposal fails sig check"};
3021 JLOG(p_journal_.warn()) << desc;
3022 charge(Resource::feeInvalidSignature, desc);
3030 relay = app_.getOPs().processTrustedProposal(peerPos);
3034 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3045 if (!haveMessage.empty())
3047 overlay_.updateSlotAndSquelch(
3050 std::move(haveMessage),
3051 protocol::mtPROPOSE_LEDGER);
3057PeerImp::checkValidation(
3062 if (!val->isValid())
3064 std::string const desc{
"Validation forwarded by peer is invalid"};
3065 JLOG(p_journal_.debug()) << desc;
3066 charge(Resource::feeInvalidSignature, desc);
3073 if (app_.getOPs().recvValidation(val,
std::to_string(
id())) || cluster())
3079 auto haveMessage = overlay_.relay(*packet, key, val->getSignerPublic());
3080 if (!haveMessage.empty())
3082 overlay_.updateSlotAndSquelch(
3083 key, val->getSignerPublic(), std::move(haveMessage), protocol::mtVALIDATION);
3089 JLOG(p_journal_.trace()) <<
"Exception processing validation: " << ex.
what();
3090 using namespace std::string_literals;
3091 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
3105 if (p->hasTxSet(rootHash) && p.get() != skip)
3107 auto score = p->getScore(true);
3108 if (!ret || (score > retScore))
3133 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3135 auto score = p->getScore(true);
3136 if (!ret || (score > retScore))
3148PeerImp::sendLedgerBase(
3150 protocol::TMLedgerData& ledgerData)
3152 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3155 addRaw(ledger->header(), s);
3158 auto const& stateMap{ledger->stateMap()};
3159 if (stateMap.getHash() != beast::zero)
3164 stateMap.serializeRoot(
root);
3165 ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(),
root.getLength());
3167 if (ledger->header().txHash != beast::zero)
3169 auto const& txMap{ledger->txMap()};
3170 if (txMap.getHash() != beast::zero)
3174 txMap.serializeRoot(
root);
3175 ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(),
root.getLength());
3187 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3191 if (m->has_ledgerhash())
3194 uint256 const ledgerHash{m->ledgerhash()};
3195 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3198 JLOG(p_journal_.trace()) <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3200 if (m->has_querytype() && !m->has_requestcookie())
3204 overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0,
this))
3206 m->set_requestcookie(
id());
3208 JLOG(p_journal_.debug()) <<
"getLedger: Request relayed to peer";
3212 JLOG(p_journal_.trace()) <<
"getLedger: Failed to find peer to relay request";
3216 else if (m->has_ledgerseq())
3219 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3221 JLOG(p_journal_.debug()) <<
"getLedger: Early ledger sequence request";
3225 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3228 JLOG(p_journal_.debug())
3229 <<
"getLedger: Don't have ledger with sequence " << m->ledgerseq();
3233 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3235 ledger = app_.getLedgerMaster().getClosedLedger();
3241 auto const ledgerSeq{ledger->header().seq};
3242 if (m->has_ledgerseq())
3244 if (ledgerSeq != m->ledgerseq())
3247 if (!m->has_requestcookie())
3248 charge(Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3251 JLOG(p_journal_.warn()) <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3254 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3257 JLOG(p_journal_.debug()) <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3262 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3271 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3273 uint256 const txSetHash{m->ledgerhash()};
3277 if (m->has_querytype() && !m->has_requestcookie())
3282 m->set_requestcookie(
id());
3284 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3288 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find relay peer";
3293 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3304 if (!m->has_requestcookie())
3305 charge(Resource::feeModerateBurdenPeer,
"received a get ledger request");
3309 SHAMap const* map{
nullptr};
3310 protocol::TMLedgerData ledgerData;
3311 bool fatLeaves{
true};
3312 auto const itype{m->itype()};
3314 if (itype == protocol::liTS_CANDIDATE)
3316 if (sharedMap = getTxSet(m); !sharedMap)
3318 map = sharedMap.
get();
3321 ledgerData.set_ledgerseq(0);
3322 ledgerData.set_ledgerhash(m->ledgerhash());
3323 ledgerData.set_type(protocol::liTS_CANDIDATE);
3324 if (m->has_requestcookie())
3325 ledgerData.set_requestcookie(m->requestcookie());
3332 if (send_queue_.size() >= Tuning::dropSendQueue)
3334 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Large send queue";
3337 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3339 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3343 if (ledger = getLedger(m); !ledger)
3347 auto const ledgerHash{ledger->header().hash};
3348 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3349 ledgerData.set_ledgerseq(ledger->header().seq);
3350 ledgerData.set_type(itype);
3351 if (m->has_requestcookie())
3352 ledgerData.set_requestcookie(m->requestcookie());
3356 case protocol::liBASE:
3357 sendLedgerBase(ledger, ledgerData);
3360 case protocol::liTX_NODE:
3361 map = &ledger->txMap();
3362 JLOG(p_journal_.trace())
3363 <<
"processLedgerRequest: TX map hash " << to_string(map->getHash());
3366 case protocol::liAS_NODE:
3367 map = &ledger->stateMap();
3368 JLOG(p_journal_.trace())
3369 <<
"processLedgerRequest: Account state map hash " << to_string(map->getHash());
3374 JLOG(p_journal_.error()) <<
"processLedgerRequest: Invalid ledger info type";
3381 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3386 if (m->nodeids_size() > 0)
3389 auto const queryDepth{m->has_querydepth() ? m->querydepth() : defaultDepth};
3394 i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3400 data.reserve(Tuning::softMaxReplyNodes);
3404 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3406 JLOG(p_journal_.trace())
3407 <<
"processLedgerRequest: getNodeFat got " << data.size() <<
" nodes";
3409 for (
auto const& d : data)
3411 if (ledgerData.nodes_size() >= Tuning::hardMaxReplyNodes)
3413 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3414 node->set_nodeid(d.first.getRawString());
3415 node->set_nodedata(d.second.data(), d.second.size());
3420 JLOG(p_journal_.warn()) <<
"processLedgerRequest: getNodeFat returns false";
3428 case protocol::liBASE:
3430 info =
"Ledger base";
3433 case protocol::liTX_NODE:
3437 case protocol::liAS_NODE:
3441 case protocol::liTS_CANDIDATE:
3442 info =
"TS candidate";
3450 if (!m->has_ledgerhash())
3451 info +=
", no hash specified";
3453 JLOG(p_journal_.warn())
3454 <<
"processLedgerRequest: getNodeFat with nodeId " << *shaMapNodeId
3455 <<
" and ledger info type " << info <<
" throws exception: " << e.
what();
3459 JLOG(p_journal_.info()) <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3460 <<
" nodes at depth " << queryDepth <<
", return "
3461 << ledgerData.nodes_size() <<
" nodes";
3464 if (ledgerData.nodes_size() == 0)
3471PeerImp::getScore(
bool haveItem)
const
3475 static int const spRandomMax = 9999;
3479 static int const spHaveItem = 10000;
3484 static int const spLatency = 30;
3487 static int const spNoLatency = 8000;
3492 score += spHaveItem;
3502 score -= latency->count() * spLatency;
3506 score -= spNoLatency;
3513PeerImp::isHighLatency()
const
3516 return latency_ >= peerHighLatency;
3522 using namespace std::chrono_literals;
3525 totalBytes_ += bytes;
3526 accumBytes_ += bytes;
3527 auto const timeElapsed = clock_type::now() - intervalStart_;
3528 auto const timeElapsedInSecs = std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3530 if (timeElapsedInSecs >= 1s)
3532 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3533 rollingAvg_.push_back(avgBytes);
3535 auto const totalBytes =
std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3536 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3538 intervalStart_ = clock_type::now();
3544PeerImp::Metrics::average_bytes()
const
3547 return rollingAvgBytes_;
3551PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
std::size_t size() const
The number of nodes in the cluster list.
std::chrono::seconds MAX_DIVERGED_TIME
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_UNKNOWN_TIME
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
int getJobCount(JobType t) const
Jobs waiting at this priority.
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
std::chrono::seconds getValidatedLedgerAge()
LedgerIndex getValidLedgerIndex()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
virtual bool isNeedNetworkLedger()=0
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
PeerFinder::Manager & peerFinder()
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void onPeerDeactivate(Peer::id_t id)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
std::optional< std::chrono::milliseconds > latency_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
std::unique_ptr< LoadEvent > load_event_
beast::Journal const p_journal_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
ProtocolVersion protocol_
bool txReduceRelayEnabled_
void close()
Forcibly closes the underlying socket connection.
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
http_request_type request_
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
bool txReduceRelayEnabled() const override
std::shared_ptr< PeerFinder::Slot > const slot_
boost::beast::http::fields const & headers_
Compressed compressionEnabled_
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
boost::system::error_code error_code
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
std::string const & fingerprint() const override
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
bool ledgerReplayEnabled_
void sendTxQueue() override
Send aggregated transactions' hashes.
uint256 closedLedgerHash_
reduce_relay::Squelch< UptimeClock > squelch_
PeerImp(PeerImp const &)=delete
void cycleStatus() override
std::shared_mutex nameMutex_
beast::IP::Endpoint const remote_address_
std::string domain() const
std::atomic< Tracking > tracking_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
clock_type::duration uptime() const
PublicKey const publicKey_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Json::Value json() override
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::queue< std::shared_ptr< Message > > send_queue_
static std::string makePrefix(std::string const &fingerprint)
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
void onMessageUnknown(std::uint16_t type)
protocol::TMStatusChange last_status_
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
std::unique_ptr< stream_type > stream_ptr_
struct xrpl::PeerImp::@22 metrics_
boost::beast::multi_buffer read_buffer_
void send(std::shared_ptr< Message > const &m) override
hash_set< uint256 > txQueue_
bool supportsFeature(ProtocolFeature f) const override
void shutdown()
Initiates the peer disconnection sequence.
boost::asio::strand< boost::asio::executor > strand_
clock_type::time_point lastPingTime_
boost::circular_buffer< uint256 > recentLedgers_
bool hasTxSet(uint256 const &hash) const override
boost::circular_buffer< uint256 > recentTxSets_
clock_type::time_point trackingTime_
beast::Journal const journal_
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
uint256 previousLedgerHash_
Resource::Consumer usage_
std::optional< std::uint32_t > lastPingSeq_
bool crawl() const
Returns true if this connection will publicly share its IP address.
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
bool checkSign() const
Verify the signing hash of the proposal.
An endpoint that consumes resources.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
int balance()
Returns the credit balance representing consumption.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
void const * getDataPtr() const
std::size_t size() const noexcept
void const * data() const noexcept
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & getValidators()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual HashRouter & getHashRouter()=0
virtual Cluster & getCluster()=0
virtual TimeKeeper & getTimeKeeper()=0
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
static constexpr std::size_t size()
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeInvalidData
Charge const feeModerateBurdenPeer
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeTrivialPeer
Charge const feeUselessData
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ sendQueueLogFreq
How often to log send queue size.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ maxQueryDepth
The maximum number of levels to search.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
constexpr FlagValue tfInnerBatchTxn
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
static constexpr char FEATURE_COMPR[]
Stopwatch & stopwatch()
Returns an instance of a wall clock.
std::string to_string(base_uint< Bits, Tag > const &a)
std::string strHex(FwdIt begin, FwdIt end)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules)
Checks transaction signature and local checks.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
std::string base64_decode(std::string_view data)
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
@ ValidatorListPropagation
@ ValidatorList2Propagation
Number root(Number f, unsigned d)
static bool stringIsUint256Sized(std::string const &pBuffStr)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
@ proposal
proposal for signing
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
static constexpr char FEATURE_LEDGER_REPLAY[]
static constexpr char FEATURE_VPRR[]
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
static constexpr char FEATURE_TXRR[]
T shared_from_this(T... args)
std::optional< std::uint32_t > networkID
beast::IP::Address public_ip
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items