1#include <xrpld/overlay/detail/PeerImp.h>
3#include <xrpld/app/consensus/RCLCxPeerPos.h>
4#include <xrpld/app/consensus/RCLValidations.h>
5#include <xrpld/app/ledger/InboundLedgers.h>
6#include <xrpld/app/ledger/InboundTransactions.h>
7#include <xrpld/app/ledger/LedgerMaster.h>
8#include <xrpld/app/ledger/TransactionMaster.h>
9#include <xrpld/app/misc/Transaction.h>
10#include <xrpld/app/misc/ValidatorList.h>
11#include <xrpld/consensus/Validations.h>
12#include <xrpld/overlay/Cluster.h>
13#include <xrpld/overlay/ClusterNode.h>
14#include <xrpld/overlay/Peer.h>
15#include <xrpld/overlay/ReduceRelayCommon.h>
16#include <xrpld/overlay/detail/Handshake.h>
17#include <xrpld/overlay/detail/OverlayImpl.h>
18#include <xrpld/overlay/detail/ProtocolMessage.h>
19#include <xrpld/overlay/detail/ProtocolVersion.h>
20#include <xrpld/overlay/detail/TrafficCount.h>
21#include <xrpld/overlay/detail/Tuning.h>
22#include <xrpld/peerfinder/PeerfinderManager.h>
23#include <xrpld/peerfinder/Slot.h>
25#include <xrpl/basics/Blob.h>
26#include <xrpl/basics/Log.h>
27#include <xrpl/basics/SHAMapHash.h>
28#include <xrpl/basics/Slice.h>
29#include <xrpl/basics/ToString.h>
30#include <xrpl/basics/UptimeClock.h>
31#include <xrpl/basics/base64.h>
32#include <xrpl/basics/base_uint.h>
33#include <xrpl/basics/chrono.h>
34#include <xrpl/basics/random.h>
35#include <xrpl/basics/safe_cast.h>
36#include <xrpl/basics/strHex.h>
37#include <xrpl/beast/utility/Journal.h>
38#include <xrpl/beast/utility/Zero.h>
39#include <xrpl/beast/utility/instrumentation.h>
40#include <xrpl/core/HashRouter.h>
41#include <xrpl/core/Job.h>
42#include <xrpl/core/PerfLog.h>
43#include <xrpl/json/json_forwards.h>
44#include <xrpl/json/json_value.h>
45#include <xrpl/ledger/Ledger.h>
46#include <xrpl/protocol/KeyType.h>
47#include <xrpl/protocol/LedgerHeader.h>
48#include <xrpl/protocol/Protocol.h>
49#include <xrpl/protocol/PublicKey.h>
50#include <xrpl/protocol/SField.h>
51#include <xrpl/protocol/STTx.h>
52#include <xrpl/protocol/Serializer.h>
53#include <xrpl/protocol/TxFlags.h>
54#include <xrpl/protocol/digest.h>
55#include <xrpl/protocol/jss.h>
56#include <xrpl/protocol/tokens.h>
57#include <xrpl/resource/Charge.h>
58#include <xrpl/resource/Consumer.h>
59#include <xrpl/resource/Disposition.h>
60#include <xrpl/resource/Fees.h>
61#include <xrpl/resource/Gossip.h>
62#include <xrpl/server/LoadFeeTrack.h>
63#include <xrpl/server/NetworkOPs.h>
64#include <xrpl/shamap/SHAMapNodeID.h>
65#include <xrpl/tx/apply.h>
67#include <boost/algorithm/string/predicate.hpp>
68#include <boost/asio/bind_executor.hpp>
69#include <boost/asio/buffer.hpp>
70#include <boost/asio/completion_condition.hpp>
71#include <boost/asio/dispatch.hpp>
72#include <boost/asio/error.hpp>
73#include <boost/asio/strand.hpp>
74#include <boost/asio/write.hpp>
75#include <boost/beast/core/multi_buffer.hpp>
76#include <boost/beast/core/ostream.hpp>
77#include <boost/system/system_error.hpp>
79#include <google/protobuf/message.h>
103using namespace std::chrono_literals;
109constexpr std::chrono::milliseconds kPeerHighLatency{300};
112constexpr std::chrono::seconds kPeerTimerInterval{60};
170 <<
" vp reduce-relay base squelch enabled "
177 bool const inCluster{
cluster()};
202 if (
uint256 ret; ret.parseHex(value))
214 if (
auto const iter = self->headers_.find(
"Closed-Ledger"); iter != self->headers_.end())
216 closed = parseLedgerHash(iter->value());
219 self->fail(
"Malformed handshake data (1)");
222 if (
auto const iter = self->headers_.find(
"Previous-Ledger"); iter != self->headers_.end())
224 previous = parseLedgerHash(iter->value());
227 self->fail(
"Malformed handshake data (2)");
230 if (previous && !closed)
231 self->fail(
"Malformed handshake data (3)");
236 self->closedLedgerHash_ = *closed;
238 self->previousLedgerHash_ = *previous;
247 self->doProtocolStart();
259 if (!self->socket_.is_open())
272 if (self->gracefulClose_)
274 if (self->detaching_)
276 if (!self->socket_.is_open())
279 auto validator = m->getValidatorKey();
280 if (validator && !self->squelch_.expireSquelch(*validator))
282 self->overlay_.reportOutboundTraffic(
284 static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
289 self->overlay_.reportOutboundTraffic(
291 static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
294 self->overlay_.reportOutboundTraffic(
296 static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
298 auto sendqSize = self->sendQueue_.size();
305 self->largeSendq_ = 0;
308 auto sink = self->journal_.debug();
312 sink << n <<
" sendq: " << sendqSize;
315 self->sendQueue_.push(m);
320 boost::asio::async_write(
322 boost::asio::buffer(self->sendQueue_.front()->getBuffer(self->compressionEnabled_)),
334 if (!self->txQueue_.empty())
336 protocol::TMHaveTransactions ht;
338 self->txQueue_, [&](
auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
339 JLOG(self->pJournal_.trace()) <<
"sendTxQueue " << self->txQueue_.size();
340 self->txQueue_.clear();
352 JLOG(self->pJournal_.warn()) <<
"addTxQueue exceeds the cap";
356 self->txQueue_.insert(hash);
357 JLOG(self->pJournal_.trace()) <<
"addTxQueue " << self->txQueue_.size();
365 auto removed = self->txQueue_.erase(hash);
366 JLOG(self->pJournal_.trace()) <<
"removeTxQueue " << removed;
375 self->usage_.disconnect(self->pJournal_))
383 bool expected =
false;
384 if (self->chargeDisconnectFired_.compare_exchange_strong(
385 expected,
true, std::memory_order_acq_rel))
387 self->overlay_.incPeerDisconnectCharges();
388 self->fail(
"charge: Resources");
399 auto const iter =
headers_.find(
"Crawl");
402 return boost::iequals(iter->value(),
"public");
428 ret[jss::inbound] =
true;
432 ret[jss::cluster] =
true;
444 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
447 ret[jss::load] =
usage_.balance();
466 if ((minSeq != 0) || (maxSeq != 0))
472 ret[jss::track] =
"diverged";
476 ret[jss::track] =
"unknown";
485 protocol::TMStatusChange lastStatus;
492 if (closedLedgerHash != beast::kZero)
493 ret[jss::ledger] =
to_string(closedLedgerHash);
495 if (lastStatus.has_newstatus())
497 switch (lastStatus.newstatus())
499 case protocol::nsCONNECTING:
500 ret[jss::status] =
"connecting";
503 case protocol::nsCONNECTED:
504 ret[jss::status] =
"connected";
507 case protocol::nsMONITORING:
508 ret[jss::status] =
"monitoring";
511 case protocol::nsVALIDATING:
512 ret[jss::status] =
"validating";
515 case protocol::nsSHUTTING:
516 ret[jss::status] =
"shutting";
520 JLOG(
pJournal_.warn()) <<
"Unknown status: " << lastStatus.newstatus();
602 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::close : strand in this thread");
623 JLOG(self->journal_.warn()) << n <<
" failed: " << reason;
632 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::fail : strand in this thread");
645 strand_.running_in_this_thread(),
"xrpl::PeerImp::gracefulClose : strand in this thread");
646 XRPL_ASSERT(
socket_.is_open(),
"xrpl::PeerImp::gracefulClose : socket is open");
647 XRPL_ASSERT(!
gracefulClose_,
"xrpl::PeerImp::gracefulClose : socket is not closing");
652 stream_.async_shutdown(bind_executor(
661 timer_.expires_after(kPeerTimerInterval);
663 catch (boost::system::system_error
const& e)
665 JLOG(
journal_.error()) <<
"setTimer: " << e.code();
668 timer_.async_wait(bind_executor(
680 catch (boost::system::system_error
const&)
704 if (ec == boost::asio::error::operation_aborted)
708 JLOG(
journal_.error()) <<
"onTimer: " << ec.message();
715 fail(
"Large send queue");
721 clock_type::duration duration;
740 fail(
"Ping Timeout");
747 protocol::TMPing message;
748 message.set_type(protocol::TMPing::ptPING);
768 bool const shouldLog =
769 (ec != boost::asio::error::eof && ec != boost::asio::error::operation_aborted &&
770 !ec.message().contains(
"application data after close notify"));
774 JLOG(
journal_.debug()) <<
"onShutdown: " << ec.message();
785 XRPL_ASSERT(
readBuffer_.size() == 0,
"xrpl::PeerImp::doAccept : empty read buffer");
793 fail(
"makeSharedValue: Unexpected failure");
805 JLOG(
journal_.info()) <<
"Cluster name: " << *member;
817 !
overlay_.peerFinder().config().peerPrivate,
827 boost::asio::async_write(
830 boost::asio::transfer_all(),
839 if (ec == boost::asio::error::operation_aborted)
842 fail(
"onWriteResponse", ec);
846 if (writeBuffer->size() == bytesTransferred)
851 fail(
"Failed to write header");
881 app_.getValidators().forEachAvailable(
896 app_.getHashRouter(),
900 app_.getHashRouter().addSuppressionPeer(hash,
id_);
904 if (
auto m =
overlay_.getManifestsMessage())
919 if (ec == boost::asio::error::operation_aborted)
922 if (ec == boost::asio::error::eof)
929 fail(
"onReadMessage", ec);
935 stream <<
"onReadMessage: "
936 << (bytesTransferred > 0 ?
to_string(bytesTransferred) +
" bytes" :
"");
939 metrics_.recv.addMessage(bytesTransferred);
949 using namespace std::chrono_literals;
952 "invokeProtocolMessage",
958 fail(
"onReadMessage", ec);
968 if (bytesConsumed == 0)
981 std::placeholders::_1,
982 std::placeholders::_2)));
993 if (ec == boost::asio::error::operation_aborted)
996 fail(
"onWriteMessage", ec);
1001 stream <<
"onWriteMessage: "
1002 << (bytesTransferred > 0 ?
to_string(bytesTransferred) +
" bytes" :
"");
1005 metrics_.sent.addMessage(bytesTransferred);
1007 XRPL_ASSERT(!
sendQueue_.empty(),
"xrpl::PeerImp::onWriteMessage : non-empty send buffer");
1012 boost::asio::async_write(
1020 std::placeholders::_1,
1021 std::placeholders::_2)));
1027 stream_.async_shutdown(bind_executor(
1057 auto const category =
1064 overlay_.reportInboundTraffic(category,
static_cast<int>(size));
1067 if ((type == MessageType::mtTRANSACTION || type == MessageType::mtHAVE_TRANSACTIONS ||
1068 type == MessageType::mtTRANSACTIONS ||
1081 JLOG(
journal_.trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" " << uncompressedSize
1082 <<
" " << isCompressed;
1095 auto const s = m->list_size();
1114 if (m->type() == protocol::TMPing::ptPING)
1118 m->set_type(protocol::TMPing::ptPONG);
1123 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1162 for (
int i = 0; i < m->clusternodes().size(); ++i)
1164 protocol::TMClusterNode
const& node = m->clusternodes(i);
1167 if (node.has_nodename())
1168 name = node.nodename();
1178 app_.getCluster().update(*publicKey,
name, node.nodeload(), reportTime);
1182 int const loadSources = m->loadsources().size();
1183 if (loadSources != 0)
1186 gossip.
items.reserve(loadSources);
1187 for (
int i = 0; i < m->loadsources().size(); ++i)
1189 protocol::TMLoadSource
const& node = m->loadsources(i);
1194 gossip.
items.push_back(item);
1196 overlay_.resourceManager().importConsumers(
name(), gossip);
1200 auto const thresh =
app_.getTimeKeeper().now() - 90s;
1206 app_.getCluster().forEach([&fees, thresh](
ClusterNode const& status) {
1207 if (status.getReportTime() >= thresh)
1213 auto const index = fees.
size() / 2;
1215 clusterFee = fees[index];
1218 app_.getFeeTrack().setClusterFee(clusterFee);
1231 if (m->endpoints_v2().size() >= 1024)
1238 endpoints.
reserve(m->endpoints_v2().size());
1241 for (
auto const& tm : m->endpoints_v2())
1248 <<
"failed to parse incoming endpoint: {" << tm.endpoint() <<
"}";
1274 if (!endpoints.
empty())
1290 XRPL_ASSERT(eraseTxQueue != batch, (
"xrpl::PeerImp::handleTransaction : valid inputs"));
1294 if (
app_.getOPs().isNeedNetworkLedger())
1298 JLOG(
pJournal_.debug()) <<
"Ignoring incoming transaction: Need network ledger";
1307 uint256 const txID = stx->getTransactionID();
1329 JLOG(
pJournal_.warn()) <<
"Ignoring Network relayed Tx containing "
1330 "tfInnerBatchTxn (handleTransaction).";
1339 if (!
app_.getHashRouter().shouldProcess(txID,
id_, flags, kTxInterval))
1345 JLOG(
pJournal_.debug()) <<
"Ignoring known bad tx " << txID;
1361 JLOG(
pJournal_.debug()) <<
"Got tx " << txID;
1363 bool checkSignature =
true;
1366 if (!m->has_deferred() || !m->deferred())
1375 if (!
app_.getValidationPublicKey())
1379 checkSignature =
false;
1383 if (
app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
1385 JLOG(
pJournal_.trace()) <<
"No new transactions until synchronized";
1390 JLOG(
pJournal_.info()) <<
"Transaction queue is full";
1394 app_.getJobQueue().addJob(
1402 if (
auto peer = weak.lock())
1403 peer->checkTransaction(flags, checkSignature, stx, batch);
1409 JLOG(
pJournal_.warn()) <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1410 <<
". Exception: " << ex.
what();
1419 JLOG(
pJournal_.warn()) <<
"TMGetLedger: " << msg;
1421 auto const itype{m->itype()};
1424 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1426 badData(
"Invalid ledger info type");
1433 return std::nullopt;
1436 if (itype == protocol::liTS_CANDIDATE)
1438 if (!m->has_ledgerhash())
1440 badData(
"Invalid TX candidate set, missing TX set hash");
1445 !m->has_ledgerhash() && !m->has_ledgerseq() && (!ltype || *ltype != protocol::ltCLOSED))
1447 badData(
"Invalid request");
1452 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1454 badData(
"Invalid ledger type");
1461 badData(
"Invalid ledger hash");
1466 if (m->has_ledgerseq())
1468 auto const ledgerSeq{m->ledgerseq()};
1471 using namespace std::chrono_literals;
1472 if (
app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1473 ledgerSeq >
app_.getLedgerMaster().getValidLedgerIndex() + 10)
1481 if (itype != protocol::liBASE)
1483 if (m->nodeids_size() <= 0)
1485 badData(
"Invalid ledger node IDs");
1489 for (
auto const& nodeId : m->nodeids())
1493 badData(
"Invalid SHAMap node ID");
1500 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1502 badData(
"Invalid query type");
1507 if (m->has_querydepth())
1511 badData(
"Invalid query depth");
1519 if (
auto peer = weak.
lock())
1520 peer->processLedgerRequest(m);
1527 JLOG(
pJournal_.trace()) <<
"onMessage, TMProofPathRequest";
1537 if (
auto peer = weak.
lock())
1539 auto reply = peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1540 if (reply.has_error())
1542 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1577 JLOG(
pJournal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1587 if (
auto peer = weak.
lock())
1589 auto reply = peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1590 if (reply.has_error())
1592 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1629 JLOG(
pJournal_.warn()) <<
"TMLedgerData: " << msg;
1635 badData(
"Invalid ledger hash");
1641 auto const ledgerSeq{m->ledgerseq()};
1642 if (m->type() == protocol::liTS_CANDIDATE)
1653 using namespace std::chrono_literals;
1654 if (
app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1655 ledgerSeq >
app_.getLedgerMaster().getValidLedgerIndex() + 10)
1664 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1666 badData(
"Invalid ledger info type");
1671 if (m->has_error() &&
1672 (m->error() < protocol::reNO_LEDGER || m->error() > protocol::reBAD_REQUEST))
1674 badData(
"Invalid reply error");
1681 badData(
"Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1686 if (m->has_requestcookie())
1688 if (
auto peer =
overlay_.findPeerByShortID(m->requestcookie()))
1690 m->clear_requestcookie();
1695 JLOG(
pJournal_.info()) <<
"Unable to route TX/ledger data reply";
1703 if (m->type() == protocol::liTS_CANDIDATE)
1706 app_.getJobQueue().addJob(
JtTxnData,
"RcvPeerData", [weak, ledgerHash, m]() {
1707 if (
auto peer = weak.
lock())
1709 peer->app_.getInboundTransactions().gotData(ledgerHash, peer, m);
1722 protocol::TMProposeSet
const&
set = *m;
1731 JLOG(
pJournal_.warn()) <<
"Proposal: malformed";
1738 JLOG(
pJournal_.warn()) <<
"Proposal: malformed";
1747 auto const isTrusted =
app_.getValidators().trusted(publicKey);
1758 if (
app_.config().relayUntrustedProposals == -1)
1768 proposeHash, prevLedger,
set.proposeseq(), closeTime, publicKey.
slice(), sig);
1770 if (
auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression,
id_);
1776 overlay_.updateSlotAndSquelch(suppression, publicKey,
id_, protocol::mtPROPOSE_LEDGER);
1782 JLOG(
pJournal_.trace()) <<
"Proposal: duplicate";
1791 JLOG(
pJournal_.debug()) <<
"Proposal: Dropping untrusted (peer divergence)";
1795 if (!
cluster() &&
app_.getFeeTrack().isLoadedLocal())
1797 JLOG(
pJournal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1802 JLOG(
pJournal_.trace()) <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1813 app_.getTimeKeeper().closeTime(),
1814 calcNodeID(
app_.getValidatorManifests().getMasterKey(publicKey))});
1817 app_.getJobQueue().addJob(
1819 if (
auto peer = weak.lock())
1820 peer->checkPropose(isTrusted, m, proposal);
1827 JLOG(
pJournal_.trace()) <<
"Status: Change";
1829 if (!m->has_networktime())
1830 m->set_networktime(
app_.getTimeKeeper().now().time_since_epoch().count());
1834 if (!
lastStatus_.has_newstatus() || m->has_newstatus())
1841 protocol::NodeStatus
const status =
lastStatus_.newstatus();
1843 m->set_newstatus(status);
1847 if (m->newevent() == protocol::neLOST_SYNC)
1849 bool outOfSync{
false};
1863 JLOG(
pJournal_.debug()) <<
"Status: Out of sync";
1876 if (peerChangedLedgers)
1897 if (peerChangedLedgers)
1899 JLOG(
pJournal_.debug()) <<
"LCL is " << closedLedgerHash;
1903 JLOG(
pJournal_.debug()) <<
"Status: No ledger";
1907 if (m->has_firstseq() && m->has_lastseq())
1918 if (m->has_ledgerseq() &&
app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1926 if (m->has_newstatus())
1928 switch (m->newstatus())
1930 case protocol::nsCONNECTING:
1931 j[jss::status] =
"CONNECTING";
1933 case protocol::nsCONNECTED:
1934 j[jss::status] =
"CONNECTED";
1936 case protocol::nsMONITORING:
1937 j[jss::status] =
"MONITORING";
1939 case protocol::nsVALIDATING:
1940 j[jss::status] =
"VALIDATING";
1942 case protocol::nsSHUTTING:
1943 j[jss::status] =
"SHUTTING";
1948 if (m->has_newevent())
1950 switch (m->newevent())
1952 case protocol::neCLOSING_LEDGER:
1953 j[jss::action] =
"CLOSING_LEDGER";
1955 case protocol::neACCEPTED_LEDGER:
1956 j[jss::action] =
"ACCEPTED_LEDGER";
1958 case protocol::neSWITCHED_LEDGER:
1959 j[jss::action] =
"SWITCHED_LEDGER";
1961 case protocol::neLOST_SYNC:
1962 j[jss::action] =
"LOST_SYNC";
1967 if (m->has_ledgerseq())
1969 j[jss::ledger_index] = m->ledgerseq();
1972 if (m->has_ledgerhash())
1979 j[jss::ledger_hash] =
to_string(closedLedgerHash);
1982 if (m->has_networktime())
1987 if (m->has_firstseq() && m->has_lastseq())
1989 j[jss::ledger_index_min] =
json::UInt(m->firstseq());
1990 j[jss::ledger_index_max] =
json::UInt(m->lastseq());
2048 if (m->status() == protocol::tsHAVE)
2073 JLOG(
pJournal_.warn()) <<
"Ignored malformed " << messageType;
2079 auto const hash =
sha512Half(manifest, blobs, version);
2081 JLOG(
pJournal_.debug()) <<
"Received " << messageType;
2083 if (!
app_.getHashRouter().addSuppressionPeer(hash,
id_))
2085 JLOG(
pJournal_.debug()) << messageType <<
": received duplicate " << messageType;
2093 auto const applyResult =
app_.getValidators().applyListsAndBroadcast(
2100 app_.getHashRouter(),
2103 JLOG(
pJournal_.debug()) <<
"Processed " << messageType <<
" version " << version <<
" from "
2104 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2105 :
"unknown or invalid publisher")
2106 <<
" with best result " <<
to_string(applyResult.bestDisposition());
2109 switch (applyResult.bestDisposition())
2120 applyResult.publisherKey,
2121 "xrpl::PeerImp::onValidatorListMessage : publisher key is "
2124 auto const& pubKey = *applyResult.publisherKey;
2130 iter->second < applyResult.sequence,
2131 "xrpl::PeerImp::onValidatorListMessage : lower sequence");
2143 applyResult.sequence && applyResult.publisherKey,
2144 "xrpl::PeerImp::onValidatorListMessage : nonzero sequence "
2145 "and set publisher key");
2148 "xrpl::PeerImp::onValidatorListMessage : maximum sequence");
2161 "xrpl::PeerImp::onValidatorListMessage : invalid best list "
2167 switch (applyResult.worstDisposition())
2204 "xrpl::PeerImp::onValidatorListMessage : invalid worst list "
2210 for (
auto const& [disp, count] : applyResult.dispositions)
2216 JLOG(
pJournal_.debug()) <<
"Applied " << count <<
" new " << messageType;
2220 JLOG(
pJournal_.debug()) <<
"Applied " << count <<
" expired " << messageType;
2224 JLOG(
pJournal_.debug()) <<
"Processed " << count <<
" future " << messageType;
2228 <<
"Ignored " << count <<
" " << messageType <<
"(s) with current sequence";
2232 <<
"Ignored " << count <<
" " << messageType <<
"(s) with future sequence";
2235 JLOG(
pJournal_.warn()) <<
"Ignored " << count <<
"stale " << messageType;
2238 JLOG(
pJournal_.warn()) <<
"Ignored " << count <<
" untrusted " << messageType;
2242 <<
"Ignored " << count <<
"unsupported version " << messageType;
2245 JLOG(
pJournal_.warn()) <<
"Ignored " << count <<
"invalid " << messageType;
2250 "xrpl::PeerImp::onValidatorListMessage : invalid list "
2264 JLOG(
pJournal_.debug()) <<
"ValidatorList: received validator list from peer using "
2266 <<
" which shouldn't support this feature.";
2275 JLOG(
pJournal_.warn()) <<
"ValidatorList: Exception, " << e.
what();
2276 using namespace std::string_literals;
2288 JLOG(
pJournal_.debug()) <<
"ValidatorListCollection: received validator list from peer "
2290 <<
" which shouldn't support this feature.";
2294 if (m->version() < 2)
2297 <<
"ValidatorListCollection: received invalid validator list "
2308 JLOG(
pJournal_.warn()) <<
"ValidatorListCollection: Exception, " << e.
what();
2309 using namespace std::string_literals;
2317 if (m->validation().size() < 50)
2319 JLOG(
pJournal_.warn()) <<
"Validation: Too small";
2326 auto const closeTime =
app_.getTimeKeeper().closeTime();
2334 return calcNodeID(
app_.getValidatorManifests().getMasterKey(pk));
2337 val->setSeen(closeTime);
2341 app_.getValidations().parms(),
2342 app_.getTimeKeeper().closeTime(),
2344 val->getSeenTime()))
2346 JLOG(
pJournal_.trace()) <<
"Validation: Not current";
2354 auto const isTrusted =
app_.getValidators().trusted(val->getSignerPublic());
2365 if (
app_.config().relayUntrustedValidations == -1)
2371 auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(key,
id_);
2381 key, val->getSignerPublic(),
id_, protocol::mtVALIDATION);
2388 JLOG(
pJournal_.trace()) <<
"Validation: duplicate";
2394 JLOG(
pJournal_.debug()) <<
"Dropping untrusted validation from diverged peer";
2396 else if (isTrusted || !
app_.getFeeTrack().isLoadedLocal())
2401 app_.getJobQueue().addJob(
2403 if (
auto peer = weak.
lock())
2404 peer->checkValidation(val, key, m);
2409 JLOG(
pJournal_.debug()) <<
"Dropping untrusted validation for load";
2414 JLOG(
pJournal_.warn()) <<
"Exception processing validation: " << e.
what();
2415 using namespace std::string_literals;
2423 protocol::TMGetObjectByHash
const& packet = *m;
2425 JLOG(
pJournal_.trace()) <<
"received TMGetObjectByHash " << packet.type() <<
" "
2426 << packet.objects_size();
2433 JLOG(
pJournal_.debug()) <<
"GetObject: Large send queue";
2437 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2443 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2447 JLOG(
pJournal_.error()) <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2454 if (
auto peer = weak.
lock())
2455 peer->doTransactions(m);
2460 if (packet.has_ledgerhash())
2464 JLOG(
pJournal_.debug()) <<
"GetObj: malformed ledgerhash from peer " <<
id_;
2475 <<
"GetObj: oversized request from peer " <<
id_ <<
" (" << packet.objects_size()
2485 bool const queued =
app_.getJobQueue().addJob(
JtLedgerReq,
"RcvGetObjByHash", [weak, m]() {
2486 auto peer = weak.
lock();
2491 peer->processGetObjectByHash(m);
2498 JLOG(peer->pJournal_.warn()) <<
"GetObj: handler threw: " << e.
what();
2506 JLOG(
pJournal_.warn()) <<
"GetObj: job queue refused request from peer " <<
id_;
2523 bool progress =
false;
2525 for (
int i = 0; i < packet.objects_size(); ++i)
2527 protocol::TMIndexedObject
const& obj = packet.objects(i);
2531 if (obj.has_ledgerseq())
2533 if (obj.ledgerseq() != pLSeq)
2535 if (pLDo && (pLSeq != 0))
2537 JLOG(
pJournal_.debug()) <<
"GetObj: Full fetch pack for " << pLSeq;
2539 pLSeq = obj.ledgerseq();
2540 pLDo = !
app_.getLedgerMaster().haveLedger(pLSeq);
2544 JLOG(
pJournal_.debug()) <<
"GetObj: Late fetch pack for " << pLSeq;
2557 app_.getLedgerMaster().addFetchPack(
2563 if (pLDo && (pLSeq != 0))
2565 JLOG(
pJournal_.debug()) <<
"GetObj: Partial fetch pack for " << pLSeq;
2567 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2568 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2575 protocol::TMGetObjectByHash
const& packet = *m;
2577 protocol::TMGetObjectByHash reply;
2578 reply.set_query(
false);
2579 reply.set_type(packet.type());
2581 if (packet.has_ledgerhash())
2583 reply.set_ledgerhash(packet.ledgerhash());
2594 int const requested = packet.objects_size();
2597 for (
int i = 0; i < iterLimit; ++i)
2599 auto const& obj = packet.objects(i);
2606 std::uint32_t const seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2607 auto const nodeObject =
app_.getNodeStore().fetchNodeObject(hash, seq);
2611 protocol::TMIndexedObject& newObj = *reply.add_objects();
2612 newObj.set_hash(hash.
begin(), hash.
size());
2613 auto const& data = nodeObject->getData();
2614 newObj.set_data(data.data(), data.size());
2615 if (obj.has_nodeid())
2616 newObj.set_index(obj.nodeid());
2617 if (obj.has_ledgerseq())
2618 newObj.set_ledgerseq(obj.ledgerseq());
2629 "processed get object by hash request");
2631 JLOG(
pJournal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of " << requested;
2640 JLOG(
pJournal_.error()) <<
"TMHaveTransactions: tx reduce-relay is disabled";
2647 if (
auto peer = weak.
lock())
2648 peer->handleHaveTransactions(m);
2655 protocol::TMGetObjectByHash tmBH;
2656 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2657 tmBH.set_query(
true);
2659 JLOG(
pJournal_.trace()) <<
"received TMHaveTransactions " << m->hashes_size();
2665 JLOG(
pJournal_.error()) <<
"TMHaveTransactions with invalid hash size";
2672 auto txn =
app_.getMasterTransaction().fetchFromCache(hash);
2674 JLOG(
pJournal_.trace()) <<
"checking transaction " << (bool)txn;
2678 JLOG(
pJournal_.debug()) <<
"adding transaction to request";
2680 auto obj = tmBH.add_objects();
2681 obj->set_hash(hash.
data(), hash.
size());
2692 JLOG(
pJournal_.trace()) <<
"transaction request object is " << tmBH.objects_size();
2694 if (tmBH.objects_size() > 0)
2703 JLOG(
pJournal_.error()) <<
"TMTransactions: tx reduce-relay is disabled";
2708 JLOG(
pJournal_.trace()) <<
"received TMTransactions " << m->transactions_size();
2710 overlay_.addTxMetrics(m->transactions_size());
2716 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2726 if (!m->has_validatorpubkey())
2731 auto validator = m->validatorpubkey();
2741 if (key == self->app_.getValidationPublicKey())
2743 JLOG(self->pJournal_.debug())
2744 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2748 std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
2751 self->squelch_.removeSquelch(key);
2758 JLOG(self->pJournal_.debug())
2759 <<
"onMessage: TMSquelch " << slice <<
" " << self->id() <<
" " << duration;
2770 (void)lockedRecentLock;
2784 if (
app_.getFeeTrack().isLoadedLocal() ||
2785 (
app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2786 (
app_.getJobQueue().getJobCount(
JtPack) > 10))
2788 JLOG(
pJournal_.info()) <<
"Too busy to make fetch pack";
2794 JLOG(
pJournal_.warn()) <<
"FetchPack hash size malformed";
2805 auto const pap = &
app_;
2806 app_.getJobQueue().addJob(
JtPack,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2807 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2814 protocol::TMTransactions reply;
2816 JLOG(
pJournal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2817 << packet->objects_size();
2821 JLOG(
pJournal_.error()) <<
"doTransactions, invalid number of hashes";
2828 auto const& obj = packet->objects(i);
2838 auto txn =
app_.getMasterTransaction().fetchFromCache(hash);
2843 <<
"doTransactions, transaction not found " <<
Slice(hash.
data(), hash.
size());
2849 auto tx = reply.add_transactions();
2850 auto sttx = txn->getSTransaction();
2852 tx->set_rawtransaction(s.
data(), s.
size());
2855 tx->set_receivetimestamp(
app_.getTimeKeeper().now().time_since_epoch().count());
2856 tx->set_deferred(txn->getSubmitResult().queued);
2859 if (reply.transactions_size() > 0)
2866 bool checkSignature,
2893 JLOG(
pJournal_.warn()) <<
"Ignoring Network relayed Tx containing "
2894 "tfInnerBatchTxn (checkSignature).";
2901 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2902 (stx->getFieldU32(sfLastLedgerSequence) <
app_.getLedgerMaster().getValidLedgerIndex()))
2904 JLOG(
pJournal_.info()) <<
"Marking transaction " << stx->getTransactionID()
2905 <<
"as BAD because it's expired";
2919 "xrpl::PeerImp::checkTransaction Transaction created "
2923 JLOG(
pJournal_.debug()) <<
"Processing " << (batch ?
"batch" :
"unsolicited")
2924 <<
" pseudo-transaction tx " << tx->getID();
2926 app_.getMasterTransaction().canonicalize(&tx);
2928 auto const toSkip =
app_.getHashRouter().shouldRelay(tx->getID());
2932 <<
"Passing skipped pseudo pseudo-transaction tx " << tx->getID();
2933 app_.getOverlay().relay(tx->getID(), {}, *toSkip);
2937 JLOG(
pJournal_.debug()) <<
"Charging for pseudo-transaction tx " << tx->getID();
2949 app_.getHashRouter(), *stx,
app_.getLedgerMaster().getValidatedRules());
2952 if (!validReason.empty())
2954 JLOG(
pJournal_.debug()) <<
"Exception checking transaction: " << validReason;
2974 if (!reason.
empty())
2976 JLOG(
pJournal_.debug()) <<
"Exception checking transaction: " << reason;
2988 JLOG(
pJournal_.warn()) <<
"Exception in " << __func__ <<
": " << ex.
what();
2990 using namespace std::string_literals;
3002 JLOG(
pJournal_.trace()) <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
3004 XRPL_ASSERT(packet,
"xrpl::PeerImp::checkPropose : non-null packet");
3008 std::string const desc{
"Proposal fails sig check"};
3018 relay =
app_.getOPs().processTrustedProposal(peerPos);
3022 relay =
app_.config().relayUntrustedProposals == 1 ||
cluster();
3033 if (!haveMessage.empty())
3038 std::move(haveMessage),
3039 protocol::mtPROPOSE_LEDGER);
3050 if (!val->isValid())
3052 std::string const desc{
"Validation forwarded by peer is invalid"};
3067 auto haveMessage =
overlay_.relay(*packet, key, val->getSignerPublic());
3068 if (!haveMessage.empty())
3071 key, val->getSignerPublic(), std::move(haveMessage), protocol::mtVALIDATION);
3077 JLOG(
pJournal_.trace()) <<
"Exception processing validation: " << ex.
what();
3078 using namespace std::string_literals;
3093 if (p->hasTxSet(rootHash) && p.get() != skip)
3095 auto score = p->getScore(
true);
3096 if (!ret || (score > retScore))
3121 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3123 auto score = p->getScore(
true);
3124 if (!ret || (score > retScore))
3138 protocol::TMLedgerData& ledgerData)
3140 JLOG(
pJournal_.trace()) <<
"sendLedgerBase: Base data";
3143 addRaw(ledger->header(), s);
3146 auto const& stateMap{ledger->stateMap()};
3147 if (stateMap.getHash() != beast::kZero)
3152 stateMap.serializeRoot(
root);
3153 ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(),
root.getLength());
3155 if (ledger->header().txHash != beast::kZero)
3157 auto const& txMap{ledger->txMap()};
3158 if (txMap.getHash() != beast::kZero)
3162 txMap.serializeRoot(
root);
3163 ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(),
root.getLength());
3175 JLOG(
pJournal_.trace()) <<
"getLedger: Ledger";
3179 if (m->has_ledgerhash())
3183 ledger =
app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3186 JLOG(
pJournal_.trace()) <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3188 if (m->has_querytype() && !m->has_requestcookie())
3192 overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0,
this))
3194 m->set_requestcookie(
id());
3196 JLOG(
pJournal_.debug()) <<
"getLedger: Request relayed to peer";
3200 JLOG(
pJournal_.trace()) <<
"getLedger: Failed to find peer to relay request";
3204 else if (m->has_ledgerseq())
3207 if (m->ledgerseq() <
app_.getLedgerMaster().getEarliestFetch())
3209 JLOG(
pJournal_.debug()) <<
"getLedger: Early ledger sequence request";
3213 ledger =
app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3217 <<
"getLedger: Don't have ledger with sequence " << m->ledgerseq();
3221 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3223 ledger =
app_.getLedgerMaster().getClosedLedger();
3229 auto const ledgerSeq{ledger->header().seq};
3230 if (m->has_ledgerseq())
3232 if (ledgerSeq != m->ledgerseq())
3235 if (!m->has_requestcookie())
3239 JLOG(
pJournal_.warn()) <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3242 else if (ledgerSeq <
app_.getLedgerMaster().getEarliestFetch())
3245 JLOG(
pJournal_.debug()) <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3250 JLOG(
pJournal_.debug()) <<
"getLedger: Unable to find ledger";
3259 JLOG(
pJournal_.trace()) <<
"getTxSet: TX set";
3265 if (m->has_querytype() && !m->has_requestcookie())
3270 m->set_requestcookie(
id());
3272 JLOG(
pJournal_.debug()) <<
"getTxSet: Request relayed";
3276 JLOG(
pJournal_.debug()) <<
"getTxSet: Failed to find relay peer";
3281 JLOG(
pJournal_.debug()) <<
"getTxSet: Failed to find TX set";
3292 if (!m->has_requestcookie())
3297 SHAMap const* map{
nullptr};
3298 protocol::TMLedgerData ledgerData;
3299 bool fatLeaves{
true};
3300 auto const itype{m->itype()};
3302 if (itype == protocol::liTS_CANDIDATE)
3304 if (sharedMap =
getTxSet(m); !sharedMap)
3306 map = sharedMap.
get();
3309 ledgerData.set_ledgerseq(0);
3310 ledgerData.set_ledgerhash(m->ledgerhash());
3311 ledgerData.set_type(protocol::liTS_CANDIDATE);
3312 if (m->has_requestcookie())
3313 ledgerData.set_requestcookie(m->requestcookie());
3322 JLOG(
pJournal_.debug()) <<
"processLedgerRequest: Large send queue";
3325 if (
app_.getFeeTrack().isLoadedLocal() && !
cluster())
3327 JLOG(
pJournal_.debug()) <<
"processLedgerRequest: Too busy";
3335 auto const ledgerHash{ledger->header().hash};
3336 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3337 ledgerData.set_ledgerseq(ledger->header().seq);
3338 ledgerData.set_type(itype);
3339 if (m->has_requestcookie())
3340 ledgerData.set_requestcookie(m->requestcookie());
3344 case protocol::liBASE:
3348 case protocol::liTX_NODE:
3349 map = &ledger->txMap();
3354 case protocol::liAS_NODE:
3355 map = &ledger->stateMap();
3357 <<
"processLedgerRequest: Account state map hash " <<
to_string(map->
getHash());
3362 JLOG(
pJournal_.error()) <<
"processLedgerRequest: Invalid ledger info type";
3369 JLOG(
pJournal_.warn()) <<
"processLedgerRequest: Unable to find map";
3374 if (m->nodeids_size() > 0)
3377 auto const queryDepth{m->has_querydepth() ? m->querydepth() : defaultDepth};
3393 if (map->
getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3396 <<
"processLedgerRequest: getNodeFat got " << data.size() <<
" nodes";
3398 for (
auto const& d : data)
3402 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3403 node->set_nodeid(d.first.getRawString());
3404 node->set_nodedata(d.second.data(), d.second.size());
3409 JLOG(
pJournal_.warn()) <<
"processLedgerRequest: getNodeFat returns false";
3417 case protocol::liBASE:
3419 info =
"Ledger base";
3422 case protocol::liTX_NODE:
3426 case protocol::liAS_NODE:
3430 case protocol::liTS_CANDIDATE:
3431 info =
"TS candidate";
3439 if (!m->has_ledgerhash())
3440 info +=
", no hash specified";
3443 <<
"processLedgerRequest: getNodeFat with nodeId " << *shaMapNodeId
3444 <<
" and ledger info type " << info <<
" throws exception: " << e.
what();
3448 JLOG(
pJournal_.info()) <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3449 <<
" nodes at depth " << queryDepth <<
", return "
3450 << ledgerData.nodes_size() <<
" nodes";
3453 if (ledgerData.nodes_size() == 0)
3486 int const missed =
std::max(0, requested - found);
3487 int const billableMisses =
std::min(missed, billable);
3488 int const billableHits = billable - billableMisses;
3511 static int const kSpRandomMax = 9999;
3515 static int const kSpHaveItem = 10000;
3520 static int const kSpLatency = 30;
3523 static int const kSpNoLatency = 8000;
3525 int score =
randInt(kSpRandomMax);
3528 score += kSpHaveItem;
3538 score -= latency->count() * kSpLatency;
3542 score -= kSpNoLatency;
3552 return latency_ >= kPeerHighLatency;
3558 using namespace std::chrono_literals;
3566 if (timeElapsedInSecs >= 1s)
3568 auto const avgBytes =
accumBytes_ / timeElapsedInSecs.count();
A version-independent IP address and port combination.
static std::optional< Endpoint > fromStringChecked(std::string const &s)
Create an Endpoint from a string.
static Endpoint fromString(std::string const &s)
static BaseUInt fromRaw(Container const &c)
static constexpr std::size_t size()
static std::size_t messageSize(::google::protobuf::Message const &message)
std::chrono::time_point< NetClock > time_point
std::chrono::duration< rep, period > duration
Child(OverlayImpl &overlay)
void forEach(UnaryFunc &&f) const
std::uint64_t rollingAvgBytes_
clock_type::time_point intervalStart_
boost::circular_buffer< std::uint64_t > rollingAvg_
void addMessage(std::uint64_t bytes)
std::uint64_t averageBytes() const
std::uint64_t totalBytes() const
std::uint64_t accumBytes_
std::uint64_t totalBytes_
void checkTracking(std::uint32_t validationSeq)
Check if the peer is tracking.
void onTimer(boost::system::error_code const &ec)
std::optional< std::chrono::milliseconds > latency_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
std::string getVersion() const
Return the version of xrpld that the peer is running, if reported.
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
ProtocolVersion protocol_
bool txReduceRelayEnabled_
void checkTransaction(HashRouterFlags flags, bool checkSignature, std::shared_ptr< STTx const > const &stx, bool batch)
std::shared_ptr< PeerFinder::Slot > const & slot()
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
http_request_type request_
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
bool txReduceRelayEnabled() const override
std::shared_ptr< PeerFinder::Slot > const slot_
compression::Compressed Compressed
boost::beast::http::fields const & headers_
Compressed compressionEnabled_
std::chrono::steady_clock clock_type
boost::system::error_code error_code
static Resource::Charge computeGetObjectByHashFee(int const requested, int const found)
Compute the per-message resource charge for a TMGetObjectByHash request based on how much work was ac...
void addLedger(uint256 const &hash, std::scoped_lock< std::mutex > const &lockedRecentLock)
boost::beast::multi_buffer readBuffer_
std::string const & fingerprint() const override
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
bool ledgerReplayEnabled_
void sendTxQueue() override
Send aggregated transactions' hashes.
struct xrpl::PeerImp::@337373043150231020277011015352151251117171316327 metrics_
std::unique_ptr< stream_type > streamPtr_
uint256 closedLedgerHash_
reduce_relay::Squelch< UptimeClock > squelch_
PeerImp(PeerImp const &)=delete
void checkValidation(std::shared_ptr< STValidation > const &val, uint256 const &key, std::shared_ptr< protocol::TMValidation > const &packet)
void cycleStatus() override
std::shared_mutex nameMutex_
std::string domain() const
std::atomic< Tracking > tracking_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
clock_type::duration uptime() const
beast::WrappedSink pSink_
PublicKey const publicKey_
void processGetObjectByHash(std::shared_ptr< protocol::TMGetObjectByHash > const &m)
Process a generic-query TMGetObjectByHash message.
bool cluster() const override
Returns true if this connection is a member of the cluster.
std::queue< std::shared_ptr< Message > > sendQueue_
void checkPropose(bool isTrusted, std::shared_ptr< protocol::TMProposeSet > const &packet, RCLCxPeerPos peerPos)
void sendLedgerBase(std::shared_ptr< Ledger const > const &ledger, protocol::TMLedgerData &ledgerData)
Peer::id_t id() const override
beast::Journal const pJournal_
static std::string makePrefix(std::string const &fingerprint)
void onShutdown(error_code ec)
void handleHaveTransactions(std::shared_ptr< protocol::TMHaveTransactions > const &m)
Handle protocol message with hashes of transactions that have not been relayed by an upstream node do...
void onMessageUnknown(std::uint16_t type)
void processLedgerRequest(std::shared_ptr< protocol::TMGetLedger > const &m)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
protocol::TMStatusChange lastStatus_
clock_type::time_point const creationTime_
LedgerReplayMsgHandler ledgerReplayMsgHandler_
void fail(std::string const &reason)
std::unique_ptr< LoadEvent > loadEvent_
void send(std::shared_ptr< Message > const &m) override
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressedSize, bool isCompressed)
void doFetchPack(std::shared_ptr< protocol::TMGetObjectByHash > const &packet)
bool supportsFeature(ProtocolFeature f) const override
std::shared_ptr< Ledger const > getLedger(std::shared_ptr< protocol::TMGetLedger > const &m)
boost::asio::strand< boost::asio::executor > strand_
clock_type::time_point lastPingTime_
boost::circular_buffer< uint256 > recentLedgers_
int getScore(bool haveItem) const override
bool hasTxSet(uint256 const &hash) const override
boost::circular_buffer< uint256 > recentTxSets_
clock_type::time_point trackingTime_
beast::Journal const journal_
bool isHighLatency() const override
void cancelTimer() noexcept
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
std::shared_ptr< SHAMap const > getTxSet(std::shared_ptr< protocol::TMGetLedger > const &m) const
uint256 previousLedgerHash_
Resource::Consumer usage_
std::optional< std::uint32_t > lastPingSeq_
void onWriteMessage(error_code ec, std::size_t bytesTransferred)
bool crawl() const
Returns true if this connection will publicly share its IP address.
beast::IP::Endpoint const remoteAddress_
void onValidatorListMessage(std::string const &messageType, std::string const &manifest, std::uint32_t version, std::vector< ValidatorBlobInfo > const &blobs)
void doTransactions(std::shared_ptr< protocol::TMGetObjectByHash > const &packet)
Process peer's request to send missing transactions.
void onReadMessage(error_code ec, std::size_t bytesTransferred)
hash_map< PublicKey, std::size_t > publisherListSequences_
json::Value json() override
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
std::uint32_t id_t
Uniquely identifies a peer.
Slice slice() const noexcept
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
ConsensusProposal< NodeID, uint256, uint256 > Proposal
bool checkSign() const
Verify the signing hash of the proposal.
An endpoint that consumes resources.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
bool getNodeFat(SHAMapNodeID const &wanted, std::vector< std::pair< SHAMapNodeID, Blob > > &data, bool fatLeaves, std::uint32_t depth) const
SHAMapHash getHash() const
void const * getDataPtr() const
std::size_t size() const noexcept
void const * data() const noexcept
An immutable linear range of bytes.
static Category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
static std::vector< ValidatorBlobInfo > parseBlobs(std::uint32_t version, json::Value const &body)
Pull the blob/signature/manifest information out of the appropriate Json body fields depending on the...
T duration_cast(T... args)
T emplace_back(T... args)
@ Object
object value (collection of name/value pairs).
Charge const kFeeRequestNoReply
Charge const kFeeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const kFeeTrivialPeer
Charge const kFeeInvalidData
Charge const kFeeHeavyBurdenPeer
Charge const kFeeModerateBurdenPeer
Charge const kFeeInvalidSignature
Charge const kFeeUselessData
static constexpr auto kCostBandMedium
static constexpr auto kCostBandLarge
constexpr std::size_t kReadBufferBytes
Size of buffer used to read from the socket.
static constexpr auto kSoftMaxReplyNodes
The soft cap on the number of ledger entries in a single reply.
static constexpr auto kBandMediumMax
static constexpr auto kSendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
static constexpr auto kCostPerLookupHit
Cost of one cache-hit lookup.
static constexpr auto kFreeObjectsPerRequest
TMGetObjectByHash differential pricing.
static constexpr auto kTargetSendQueue
How many messages we consider reasonable sustained on a send queue.
static constexpr auto kCostPerLookupMiss
Cost of one node-store miss, in units of kCostPerLookupHit.
static constexpr auto kCostBandSmall
Size-band surcharges.
static constexpr auto kHardMaxReplyNodes
The hard cap on the number of ledger entries in a single reply.
static constexpr auto kMaxQueryDepth
The maximum number of levels to search.
static constexpr std::uint32_t kDivergedLedgerLimit
How many ledgers off a server has to be before we consider it diverged.
static constexpr auto kBandSmallMax
Cutoffs that decide which size band a request falls into.
static constexpr auto kDropSendQueue
How many messages on a send queue before we refuse queries.
static constexpr std::uint32_t kConvergedLedgerLimit
How many ledgers off a server can be and we will still consider it converged.
static constexpr auto kSendQueueLogFreq
How often to log send queue size.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
static constexpr std::size_t kMaxTxQueueSize
static constexpr auto kIdled
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::string base64Decode(std::string_view data)
constexpr FlagValue tfInnerBatchTxn
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ Valid
Signature and local checks are good / passed.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< AccountID > parseBase58(std::string const &s)
Parse AccountID from checked, base58 string.
std::uint32_t LedgerIndex
A ledger index.
Stopwatch & stopwatch()
Returns an instance of a wall clock.
std::string strHex(FwdIt begin, FwdIt end)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules)
Checks transaction signature and local checks.
@ UnsupportedVersion
List version is not supported.
@ Expired
List is expired, but has the largest non-pending sequence seen so far.
@ SameSequence
Same sequence as current list.
@ KnownSequence
Future sequence already seen.
@ Pending
List will be valid in the future.
@ Invalid
Invalid format or signature.
@ Untrusted
List signed by untrusted publisher key.
@ Stale
Trusted publisher key, but seq is too old.
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safeCast(Src s) noexcept
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
@ ValidatorListPropagation
@ ValidatorList2Propagation
std::enable_if_t< std::is_integral_v< Integral >, Integral > randInt()
static constexpr char kFeatureLedgerReplay[]
Number root(Number f, unsigned d)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::string to_string(BaseUInt< Bits, Tag > const &a)
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
static constexpr char kFeatureTxrr[]
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address publicIp, beast::IP::Address remoteIp, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
constexpr ProtocolVersion makeProtocol(std::uint16_t major, std::uint16_t minor)
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
bool peerFeatureEnabled(Headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
std::pair< std::uint16_t, std::uint16_t > ProtocolVersion
Represents a particular version of the peer-to-peer protocol.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
static bool stringIsUInt256Sized(std::string const &pBuffStr)
static constexpr char kFeatureCompr[]
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
static constexpr char kFeatureVprr[]
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
std::enable_if_t< std::is_same_v< T, char >||std::is_same_v< T, unsigned char >, Slice > makeSlice(std::array< T, N > const &a)
constexpr bool any(HashRouterFlags flags)
T shared_from_this(T... args)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items