1#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/AccountStateSF.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerMaster.h>
6#include <xrpld/app/ledger/TransactionStateSF.h>
7#include <xrpld/app/ledger/detail/TimeoutCounter.h>
8#include <xrpld/app/main/Application.h>
9#include <xrpld/overlay/Message.h>
10#include <xrpld/overlay/Overlay.h>
11#include <xrpld/overlay/PeerSet.h>
13#include <xrpl/basics/Blob.h>
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/Slice.h>
16#include <xrpl/basics/base_uint.h>
17#include <xrpl/beast/utility/instrumentation.h>
18#include <xrpl/core/Job.h>
19#include <xrpl/core/JobQueue.h>
20#include <xrpl/json/json_value.h>
21#include <xrpl/nodestore/Database.h>
22#include <xrpl/nodestore/NodeObject.h>
23#include <xrpl/protocol/HashPrefix.h>
24#include <xrpl/protocol/Indexes.h>
25#include <xrpl/protocol/LedgerHeader.h>
26#include <xrpl/protocol/Rules.h>
27#include <xrpl/protocol/Serializer.h>
28#include <xrpl/protocol/SystemParameters.h>
29#include <xrpl/protocol/jss.h>
30#include <xrpl/resource/Fees.h>
31#include <xrpl/shamap/SHAMapNodeID.h>
32#include <xrpl/shamap/SHAMapSyncFilter.h>
34#include <boost/iterator/function_output_iterator.hpp>
56using namespace std::chrono_literals;
81 {.jobType =
JtLedgerData, .jobName =
"InboundLedger", .jobLimit = 5},
82 app.getJournal(
"InboundLedger"))
86 , peerSet_(std::move(peerSet))
88 JLOG(journal_.trace()) <<
"Acquiring ledger " << hash_;
109 JLOG(
journal_.debug()) <<
"Acquiring ledger we already have in "
110 <<
" local store. " <<
hash_;
113 "xrpl::InboundLedger::init : valid ledger fees");
129 auto const& peerIds =
peerSet_->getPeerIds();
131 peerIds, [
this](
auto id) {
return (
app_.getOverlay().findPeerByShortID(
id) !=
nullptr); });
140 if ((seq != 0) && (
seq_ == 0))
176 if (entry.second->
type() == protocol::liAS_NODE)
177 app_.getInboundLedgers().gotStaleData(entry.second);
204 for (
auto const& n : mn)
231 auto makeLedger = [&,
this](
Blob const& data) {
232 JLOG(
journal_.trace()) <<
"Ledger header found in fetch pack";
233 Rules const rules{
app_.config().features};
249 JLOG(
journal_.trace()) <<
"Ledger header found in local store";
251 makeLedger(nodeObject->getData());
256 auto& dstDB{
ledger_->stateMap().family().db()};
259 Blob blob{nodeObject->getData()};
266 auto data =
app_.getLedgerMaster().getFetchPack(
hash_);
270 JLOG(
journal_.trace()) <<
"Ledger header found in fetch pack";
277 ledger_->stateMap().family().db().store(
290 if (
ledger_->header().txHash.isZero())
292 JLOG(
journal_.trace()) <<
"No TXNs to fetch";
302 JLOG(
journal_.trace()) <<
"Had full txn map locally";
311 if (
ledger_->header().accountHash.isZero())
313 JLOG(
journal_.fatal()) <<
"We are acquiring a ledger with a zero account hash";
318 if (
ledger_->stateMap().fetchRoot(
SHAMapHash{ledger_->header().accountHash}, &filter))
322 JLOG(
journal_.trace()) <<
"Had full AS map locally";
330 JLOG(
journal_.debug()) <<
"Had everything locally";
334 "xrpl::InboundLedger::tryDB : valid ledger fees");
374 JLOG(
journal_.debug()) <<
"No progress(" << pc <<
") for ledger " <<
hash_;
394 [
this](
auto peer) {
return peer->hasLedger(
hash_,
seq_); },
424 XRPL_ASSERT(
complete_ ||
failed_,
"xrpl::InboundLedger::done : complete or failed");
430 "xrpl::InboundLedger::done : valid ledger fees");
435 app_.getInboundLedgers().onLedgerFetched();
445 if (self->complete_ && !self->failed_)
447 self->app_.getLedgerMaster().checkAccept(self->getLedger());
448 self->app_.getLedgerMaster().tryAdvance();
452 self->app_.getInboundLedgers().logFailure(self->hash_, self->seq_);
474 ss <<
"Trigger acquiring ledger " <<
hash_;
476 ss <<
" from " << peer;
499 protocol::TMGetLedger tmGL;
500 tmGL.set_ledgerhash(
hash_.begin(),
hash_.size());
505 tmGL.set_querytype(protocol::qtINDIRECT);
513 protocol::TMGetObjectByHash tmBH;
514 bool typeSet =
false;
515 tmBH.set_query(
true);
516 tmBH.set_ledgerhash(
hash_.begin(),
hash_.size());
517 for (
auto const& p : need)
519 JLOG(
journal_.debug()) <<
"Want: " << p.second;
523 tmBH.set_type(p.first);
527 if (p.first == tmBH.type())
529 protocol::TMIndexedObject* io = tmBH.add_objects();
530 io->set_hash(p.second.begin(), p.second.size());
532 io->set_ledgerseq(
seq_);
537 auto const& peerIds =
peerSet_->getPeerIds();
539 if (
auto p =
app_.getOverlay().findPeerByShortID(
id))
548 JLOG(
journal_.info()) <<
"getNeededHashes says acquire is complete";
561 tmGL.set_itype(protocol::liBASE);
563 tmGL.set_ledgerseq(
seq_);
564 JLOG(
journal_.trace()) <<
"Sending header request to "
565 << (peer ?
"selected peer" :
"all peers");
571 tmGL.set_ledgerseq(
ledger_->header().seq);
576 tmGL.set_querydepth(0);
578 else if (peer && peer->isHighLatency())
581 tmGL.set_querydepth(2);
585 tmGL.set_querydepth(1);
594 "xrpl::InboundLedger::trigger : non-null ledger to read state "
597 if (!
ledger_->stateMap().isValid())
601 else if (
ledger_->stateMap().getHash().isZero())
604 tmGL.set_itype(protocol::liAS_NODE);
607 <<
"Sending AS root request to " << (peer ?
"selected peer" :
"all peers");
625 if (!
ledger_->stateMap().isValid())
643 tmGL.set_itype(protocol::liAS_NODE);
644 for (
auto const&
id : nodes)
646 *(tmGL.add_nodeids()) =
id.first.getRawString();
649 JLOG(
journal_.trace()) <<
"Sending AS node request (" << nodes.size()
650 <<
") to " << (peer ?
"selected peer" :
"all peers");
655 JLOG(
journal_.trace()) <<
"All AS nodes filtered";
665 "xrpl::InboundLedger::trigger : non-null ledger to read "
666 "transactions from");
668 if (!
ledger_->txMap().isValid())
672 else if (
ledger_->txMap().getHash().isZero())
675 tmGL.set_itype(protocol::liTX_NODE);
678 <<
"Sending TX root request to " << (peer ?
"selected peer" :
"all peers");
690 if (!
ledger_->txMap().isValid())
708 tmGL.set_itype(protocol::liTX_NODE);
709 for (
auto const& n : nodes)
711 *(tmGL.add_nodeids()) = n.first.getRawString();
713 JLOG(
journal_.trace()) <<
"Sending TX node request (" << nodes.size() <<
") to "
714 << (peer ?
"selected peer" :
"all peers");
719 JLOG(
journal_.trace()) <<
"All TX nodes filtered";
741 nodes, [
this](
auto const& item) {
return recentNodes_.count(item.second) == 0; });
746 if (dup.begin() == nodes.begin())
748 JLOG(
journal_.trace()) <<
"filterNodes: all duplicates";
758 JLOG(
journal_.trace()) <<
"filterNodes: pruning duplicates";
760 nodes.erase(dup.begin(), dup.end());
765 if (nodes.size() > limit)
768 for (
auto const& n : nodes)
780 JLOG(
journal_.trace()) <<
"got header acquiring ledger " <<
hash_;
785 auto* f = &
app_.getNodeFamily();
786 Rules const rules{
app_.config().features};
790 JLOG(
journal_.warn()) <<
"Acquire hash mismatch: " <<
ledger_->header().hash
803 s.
addRaw(data.data(), data.size());
806 if (
ledger_->header().txHash.isZero())
809 if (
ledger_->header().accountHash.isZero())
812 ledger_->txMap().setSynching();
813 ledger_->stateMap().setSynching();
826 JLOG(
journal_.warn()) <<
"Missing ledger header";
830 if (packet.type() == protocol::liTX_NODE)
844 auto [map, rootHash, filter] =
846 if (packet.type() == protocol::liTX_NODE)
852 ledger_->txMap().family().db(),
app_.getLedgerMaster())};
858 ledger_->stateMap().family().db(),
app_.getLedgerMaster())};
863 auto const f = filter.get();
865 for (
auto const& node : packet.nodes())
872 if (nodeID->isRoot())
874 san += map.addRootNode(rootHash,
makeSlice(node.nodedata()), f);
878 san += map.addKnownNode(*nodeID,
makeSlice(node.nodedata()), f);
883 JLOG(
journal_.warn()) <<
"Received bad node data";
890 JLOG(
journal_.error()) <<
"Received bad node data: " << e.
what();
895 if (!map.isSynching())
897 if (packet.type() == protocol::liTX_NODE)
929 UNREACHABLE(
"xrpl::InboundLedger::takeAsRootNode : no ledger header");
955 UNREACHABLE(
"xrpl::InboundLedger::takeTxRootNode : no ledger header");
981 ret.
emplace_back(protocol::TMGetObjectByHash::otSTATE_NODE, h);
990 ret.
emplace_back(protocol::TMGetObjectByHash::otTRANSACTION_NODE, h);
1030 if (packet.type() == protocol::liBASE)
1032 if (packet.nodes().empty())
1034 JLOG(
journal_.warn()) << peer->id() <<
": empty header data";
1049 JLOG(
journal_.warn()) <<
"Got invalid header data";
1057 if (!
haveState_ && (packet.nodes().size() > 1) &&
1060 JLOG(
journal_.warn()) <<
"Included AS root invalid";
1066 JLOG(
journal_.warn()) <<
"Included TX root invalid";
1071 JLOG(
journal_.warn()) <<
"Included AS/TX root invalid: " << ex.
what();
1072 using namespace std::string_literals;
1084 if ((packet.type() == protocol::liTX_NODE) || (packet.type() == protocol::liAS_NODE))
1086 if (packet.nodes().empty())
1088 JLOG(
journal_.info()) << peer->id() <<
": response with no nodes";
1096 for (
auto const& node : packet.nodes())
1098 if (!node.has_nodeid() || !node.has_nodedata())
1100 JLOG(
journal_.warn()) <<
"Got bad node";
1109 JLOG(
journal_.debug()) <<
"Ledger "
1110 << ((packet.type() == protocol::liTX_NODE) ?
"TX" :
"AS")
1111 <<
" node stats: " << san.
get();
1139 auto i =
counts.find(peer);
1142 counts.emplace(std::move(peer), dataCount);
1145 i->second =
std::max(i->second, dataCount);
1156 while (i !=
counts.end())
1158 if (i->second < thresh)
1178 auto outFunc = [&f](
auto&& v) { f(v.first); };
1190 counts.begin(),
counts.end(), boost::make_function_output_iterator(outFunc), n, rng);
1228 for (
auto& entry : data)
1230 if (
auto peer = entry.first.lock())
1232 int const count =
processData(peer, *(entry.second));
1233 dataCounts.
update(std::move(peer), count);
1256 ret[jss::complete] =
true;
1259 ret[jss::failed] =
true;
1262 ret[jss::peers] =
static_cast<int>(
peerSet_->getPeerIds().size());
1281 ret[jss::needed_state_hashes] = hv;
1291 ret[jss::needed_transaction_hashes] = hv;
T back_inserter(T... args)
Value & append(Value const &value)
Append value to array at the end.
json::Value getJson(int)
Return a json::ValueType::Object.
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void runData()
Process pending TMLedgerData Query the a random sample of the 'best' peers.
void tryDB(NodeStore::Database &srcDB)
std::size_t getPeerCount() const
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter const *filter) const
int processData(std::shared_ptr< Peer > peer, protocol::TMLedgerData const &data)
Process one TMLedgerData Returns the number of useful nodes.
void filterNodes(std::vector< std::pair< SHAMapNodeID, uint256 > > &nodes, TriggerReason reason)
std::shared_ptr< Ledger > ledger_
bool takeAsRootNode(Slice const &data, SHAMapAddNode &)
Process AS root node received from a peer Call with a lock.
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > receivedData_
bool takeHeader(std::string const &data)
Take ledger header data Call with a lock.
std::mutex receivedDataLock_
bool takeTxRootNode(Slice const &data, SHAMapAddNode &)
Process AS root node received from a peer Call with a lock.
std::unique_ptr< PeerSet > peerSet_
void addPeers()
Add more peers to the set, if possible.
beast::AbstractClock< std::chrono::steady_clock > clock_type
void init(ScopedLockType &collectionLock)
void update(std::uint32_t seq)
void receiveNode(protocol::TMLedgerData const &packet, SHAMapAddNode &)
Process node data received from a peer Call with a lock.
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter const *filter) const
std::set< uint256 > recentNodes_
bool gotData(std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > const &)
Stash a TMLedgerData received from a peer for later processing Returns 'true' if we need to dispatch.
std::vector< neededHash_t > getNeededHashes()
~InboundLedger() override
Persistency layer for NodeObject.
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::Synchronous, bool duplicate=false)
Fetch a node object.
Rules controlling protocol behavior.
Identifies a node inside a SHAMap.
std::string getRawString() const
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter const *filter)
Check for nodes in the SHAMap not available.
SHAMapHash getHash() const
int addRaw(Blob const &vector)
An immutable linear range of bytes.
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
T emplace_back(T... args)
@ Array
array value (ordered list)
@ Object
object value (collection of name/value pairs).
Charge const kFeeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const kFeeInvalidData
Keylet const & feeSettings() noexcept
The (fixed) index of the object containing the ledger fees.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
static constexpr auto kReqNodesReply
Number root(Number f, unsigned d)
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
std::string to_string(BaseUInt< Bits, Tag > const &a)
static constexpr std::uint32_t kXrpLedgerEarliestFees
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
static constexpr auto kMissingNodesFind
static constexpr auto kLedgerBecomeAggressiveThreshold
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter const *filter)
static constexpr auto kPeerCountAdd
static constexpr auto kLedgerTimeoutRetriesMax
static constexpr auto kPeerCountStart
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
@ LedgerMaster
ledger master data for signing
constexpr auto kLedgerAcquireTimeout
std::vector< unsigned char > Blob
Storage for linear binary data.
static constexpr auto kReqNodes
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
std::enable_if_t< std::is_same_v< T, char >||std::is_same_v< T, unsigned char >, Slice > makeSlice(std::array< T, N > const &a)
T shared_from_this(T... args)
T stable_partition(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void update(std::shared_ptr< Peer > &&peer, int dataCount)
void sampleN(std::size_t n, F &&f)