20#include <xrpld/app/ledger/AccountStateSF.h> 
   21#include <xrpld/app/ledger/InboundLedger.h> 
   22#include <xrpld/app/ledger/InboundLedgers.h> 
   23#include <xrpld/app/ledger/LedgerMaster.h> 
   24#include <xrpld/app/ledger/TransactionStateSF.h> 
   25#include <xrpld/app/main/Application.h> 
   26#include <xrpld/core/JobQueue.h> 
   27#include <xrpld/overlay/Overlay.h> 
   29#include <xrpl/basics/Log.h> 
   30#include <xrpl/protocol/HashPrefix.h> 
   31#include <xrpl/protocol/jss.h> 
   32#include <xrpl/resource/Fees.h> 
   33#include <xrpl/shamap/SHAMapNodeID.h> 
   35#include <boost/iterator/function_output_iterator.hpp> 
   42using namespace std::chrono_literals;
 
   88          app.journal(
"InboundLedger"))
 
   92    , mHaveTransactions(
false)
 
   97    , mReceiveDispatched(
false)
 
   98    , mPeerSet(std::move(peerSet))
 
  100    JLOG(journal_.trace()) << 
"Acquiring ledger " << hash_;
 
 
  121    JLOG(
journal_.
debug()) << 
"Acquiring ledger we already have in " 
  122                           << 
" local store. " << 
hash_;
 
  126        "ripple::InboundLedger::init : valid ledger fees");
 
 
  142    auto const& peerIds = 
mPeerSet->getPeerIds();
 
  143    return std::count_if(peerIds.begin(), peerIds.end(), [
this](
auto id) {
 
  144        return (app_.overlay().findPeerByShortID(id) != nullptr);
 
 
  154    if ((seq != 0) && (
mSeq == 0))
 
 
  186        if (entry.second->
type() == protocol::liAS_NODE)
 
  192            << 
"Acquire " << 
hash_ << 
" abort " 
 
  217            for (
auto const& n : mn)
 
 
  245        auto makeLedger = [&, 
this](
Blob const& data) {
 
  246            JLOG(
journal_.
trace()) << 
"Ledger header found in fetch pack";
 
  257                    << 
" cannot be a ledger";
 
  266            JLOG(
journal_.
trace()) << 
"Ledger header found in local store";
 
  268            makeLedger(nodeObject->getData());
 
  273            auto& dstDB{
mLedger->stateMap().family().db()};
 
  276                Blob blob{nodeObject->getData()};
 
  288            JLOG(
journal_.
trace()) << 
"Ledger header found in fetch pack";
 
  295            mLedger->stateMap().family().db().store(
 
  308        if (
mLedger->info().txHash.isZero())
 
  317            if (
mLedger->txMap().fetchRoot(
 
  331        if (
mLedger->info().accountHash.isZero())
 
  334                << 
"We are acquiring a ledger with a zero account hash";
 
  340        if (
mLedger->stateMap().fetchRoot(
 
  341                SHAMapHash{mLedger->info().accountHash}, &filter))
 
  358            "ripple::InboundLedger::tryDB : valid ledger fees");
 
 
  401            << 
"No progress(" << pc << 
") for ledger " << 
hash_;
 
 
  421        [
this](
auto peer) { 
return peer->hasLedger(
hash_, 
mSeq); },
 
 
  454        "ripple::InboundLedger::done : complete or failed");
 
  461            "ripple::InboundLedger::done : valid ledger fees");
 
  477            if (self->complete_ && !self->failed_)
 
  479                self->app_.getLedgerMaster().checkAccept(self->getLedger());
 
  480                self->app_.getLedgerMaster().tryAdvance();
 
  483                self->app_.getInboundLedgers().logFailure(
 
  484                    self->hash_, self->mSeq);
 
 
  498            << 
"Trigger on ledger: " << 
hash_ << (
complete_ ? 
" completed" : 
"")
 
  506        ss << 
"Trigger acquiring ledger " << 
hash_;
 
  508            ss << 
" from " << peer;
 
  528    protocol::TMGetLedger tmGL;
 
  534        tmGL.set_querytype(protocol::qtINDIRECT);
 
  543                protocol::TMGetObjectByHash tmBH;
 
  544                bool typeSet = 
false;
 
  545                tmBH.set_query(
true);
 
  547                for (
auto const& p : need)
 
  553                        tmBH.set_type(p.first);
 
  557                    if (p.first == tmBH.type())
 
  559                        protocol::TMIndexedObject* io = tmBH.add_objects();
 
  560                        io->set_hash(p.second.begin(), p.second.size());
 
  562                            io->set_ledgerseq(
mSeq);
 
  568                auto const& peerIds = 
mPeerSet->getPeerIds();
 
  570                    peerIds.begin(), peerIds.end(), [
this, &packet](
auto id) {
 
  571                        if (auto p = app_.overlay().findPeerByShortID(id))
 
  581                    << 
"getNeededHashes says acquire is complete";
 
  592    if (!mHaveHeader && !failed_)
 
  594        tmGL.set_itype(protocol::liBASE);
 
  596            tmGL.set_ledgerseq(mSeq);
 
  597        JLOG(journal_.trace()) << 
"Sending header request to " 
  598                               << (peer ? 
"selected peer" : 
"all peers");
 
  599        mPeerSet->sendRequest(tmGL, peer);
 
  604        tmGL.set_ledgerseq(mLedger->info().seq);
 
  606    if (reason != TriggerReason::reply)
 
  609        tmGL.set_querydepth(0);
 
  611    else if (peer && peer->isHighLatency())
 
  614        tmGL.set_querydepth(2);
 
  617        tmGL.set_querydepth(1);
 
  621    if (mHaveHeader && !mHaveState && !failed_)
 
  625            "ripple::InboundLedger::trigger : non-null ledger to read state " 
  628        if (!mLedger->stateMap().isValid())
 
  632        else if (mLedger->stateMap().getHash().isZero())
 
  635            tmGL.set_itype(protocol::liAS_NODE);
 
  636            *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
 
  637            JLOG(journal_.trace()) << 
"Sending AS root request to " 
  638                                   << (peer ? 
"selected peer" : 
"all peers");
 
  639            mPeerSet->sendRequest(tmGL, peer);
 
  644            AccountStateSF filter(
 
  645                mLedger->stateMap().family().db(), app_.getLedgerMaster());
 
  654            if (!failed_ && !complete_ && !mHaveState)
 
  658                    if (!mLedger->stateMap().isValid())
 
  664                        if (mHaveTransactions)
 
  670                    filterNodes(nodes, reason);
 
  674                        tmGL.set_itype(protocol::liAS_NODE);
 
  675                        for (
auto const& 
id : nodes)
 
  677                            *(tmGL.add_nodeids()) = 
id.first.getRawString();
 
  680                        JLOG(journal_.trace())
 
  681                            << 
"Sending AS node request (" << nodes.size()
 
  683                            << (peer ? 
"selected peer" : 
"all peers");
 
  684                        mPeerSet->sendRequest(tmGL, peer);
 
  689                        JLOG(journal_.trace()) << 
"All AS nodes filtered";
 
  696    if (mHaveHeader && !mHaveTransactions && !failed_)
 
  700            "ripple::InboundLedger::trigger : non-null ledger to read " 
  701            "transactions from");
 
  703        if (!mLedger->txMap().isValid())
 
  707        else if (mLedger->txMap().getHash().isZero())
 
  710            tmGL.set_itype(protocol::liTX_NODE);
 
  711            *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
 
  712            JLOG(journal_.trace()) << 
"Sending TX root request to " 
  713                                   << (peer ? 
"selected peer" : 
"all peers");
 
  714            mPeerSet->sendRequest(tmGL, peer);
 
  719            TransactionStateSF filter(
 
  720                mLedger->txMap().family().db(), app_.getLedgerMaster());
 
  727                if (!mLedger->txMap().isValid())
 
  731                    mHaveTransactions = 
true;
 
  739                filterNodes(nodes, reason);
 
  743                    tmGL.set_itype(protocol::liTX_NODE);
 
  744                    for (
auto const& n : nodes)
 
  746                        *(tmGL.add_nodeids()) = n.first.getRawString();
 
  748                    JLOG(journal_.trace())
 
  749                        << 
"Sending TX node request (" << nodes.size()
 
  750                        << 
") to " << (peer ? 
"selected peer" : 
"all peers");
 
  751                    mPeerSet->sendRequest(tmGL, peer);
 
  756                    JLOG(journal_.trace()) << 
"All TX nodes filtered";
 
  762    if (complete_ || failed_)
 
  764        JLOG(journal_.debug())
 
  765            << 
"Done:" << (complete_ ? 
" complete" : 
"")
 
  766            << (failed_ ? 
" failed " : 
" ") << mLedger->info().seq;
 
 
  773InboundLedger::filterNodes(
 
  780        nodes.begin(), nodes.end(), [
this](
auto const& item) {
 
  781            return mRecentNodes.count(item.second) == 0;
 
  787    if (dup == nodes.begin())
 
  789        JLOG(journal_.trace()) << 
"filterNodes: all duplicates";
 
  791        if (reason != TriggerReason::timeout)
 
  799        JLOG(journal_.trace()) << 
"filterNodes: pruning duplicates";
 
  801        nodes.erase(dup, nodes.end());
 
  807    if (nodes.size() > limit)
 
  810    for (
auto const& n : nodes)
 
  811        mRecentNodes.insert(n.second);
 
 
  822    JLOG(journal_.trace()) << 
"got header acquiring ledger " << hash_;
 
  824    if (complete_ || failed_ || mHaveHeader)
 
  827    auto* f = &app_.getNodeFamily();
 
  830    if (mLedger->info().hash != hash_ ||
 
  831        (mSeq != 0 && mSeq != mLedger->info().seq))
 
  833        JLOG(journal_.warn())
 
  834            << 
"Acquire hash mismatch: " << mLedger->info().hash
 
  840        mSeq = mLedger->info().seq;
 
  841    mLedger->stateMap().setLedgerSeq(mSeq);
 
  842    mLedger->txMap().setLedgerSeq(mSeq);
 
  846    s.
add32(HashPrefix::ledgerMaster);
 
  847    s.
addRaw(data.data(), data.size());
 
  850    if (mLedger->info().txHash.isZero())
 
  851        mHaveTransactions = 
true;
 
  853    if (mLedger->info().accountHash.isZero())
 
  856    mLedger->txMap().setSynching();
 
  857    mLedger->stateMap().setSynching();
 
 
  866InboundLedger::receiveNode(protocol::TMLedgerData& packet, 
SHAMapAddNode& san)
 
  870        JLOG(journal_.warn()) << 
"Missing ledger header";
 
  874    if (packet.type() == protocol::liTX_NODE)
 
  876        if (mHaveTransactions || failed_)
 
  882    else if (mHaveState || failed_)
 
  888    auto [map, rootHash, filter] = [&]()
 
  890        if (packet.type() == protocol::liTX_NODE)
 
  895                    mLedger->txMap().family().db(), app_.getLedgerMaster())};
 
  900                mLedger->stateMap().family().db(), app_.getLedgerMaster())};
 
  905        auto const f = filter.get();
 
  907        for (
auto const& node : packet.nodes())
 
  914            if (nodeID->isRoot())
 
  916                san += map.addRootNode(rootHash, 
makeSlice(node.nodedata()), f);
 
  920                san += map.addKnownNode(*nodeID, 
makeSlice(node.nodedata()), f);
 
  925                JLOG(journal_.warn()) << 
"Received bad node data";
 
  932        JLOG(journal_.error()) << 
"Received bad node data: " << e.
what();
 
  937    if (!map.isSynching())
 
  939        if (packet.type() == protocol::liTX_NODE)
 
  940            mHaveTransactions = 
true;
 
  944        if (mHaveTransactions && mHaveState)
 
 
  958    if (failed_ || mHaveState)
 
  967        UNREACHABLE(
"ripple::InboundLedger::takeAsRootNode : no ledger header");
 
  973        mLedger->stateMap().family().db(), app_.getLedgerMaster());
 
  974    san += mLedger->stateMap().addRootNode(
 
  975        SHAMapHash{mLedger->info().accountHash}, data, &filter);
 
 
  985    if (failed_ || mHaveTransactions)
 
  994        UNREACHABLE(
"ripple::InboundLedger::takeTxRootNode : no ledger header");
 
 1000        mLedger->txMap().family().db(), app_.getLedgerMaster());
 
 1001    san += mLedger->txMap().addRootNode(
 
 1002        SHAMapHash{mLedger->info().txHash}, data, &filter);
 
 
 1007InboundLedger::getNeededHashes()
 
 1021            mLedger->stateMap().family().db(), app_.getLedgerMaster());
 
 1022        for (
auto const& h : neededStateHashes(4, &filter))
 
 1029    if (!mHaveTransactions)
 
 1032            mLedger->txMap().family().db(), app_.getLedgerMaster());
 
 1033        for (
auto const& h : neededTxHashes(4, &filter))
 
 1036                protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
 
 
 1047InboundLedger::gotData(
 
 1056    mReceivedData.emplace_back(peer, data);
 
 1058    if (mReceiveDispatched)
 
 1061    mReceiveDispatched = 
true;
 
 
 1074InboundLedger::processData(
 
 1076    protocol::TMLedgerData& packet)
 
 1078    if (packet.type() == protocol::liBASE)
 
 1080        if (packet.nodes().empty())
 
 1082            JLOG(journal_.warn()) << peer->id() << 
": empty header data";
 
 1084                Resource::feeMalformedRequest, 
"ledger_data empty header");
 
 1096                if (!takeHeader(packet.nodes(0).nodedata()))
 
 1098                    JLOG(journal_.warn()) << 
"Got invalid header data";
 
 1100                        Resource::feeMalformedRequest,
 
 1101                        "ledger_data invalid header");
 
 1108            if (!mHaveState && (packet.nodes().size() > 1) &&
 
 1109                !takeAsRootNode(
makeSlice(packet.nodes(1).nodedata()), san))
 
 1111                JLOG(journal_.warn()) << 
"Included AS root invalid";
 
 1114            if (!mHaveTransactions && (packet.nodes().size() > 2) &&
 
 1115                !takeTxRootNode(
makeSlice(packet.nodes(2).nodedata()), san))
 
 1117                JLOG(journal_.warn()) << 
"Included TX root invalid";
 
 1122            JLOG(journal_.warn())
 
 1123                << 
"Included AS/TX root invalid: " << ex.
what();
 
 1124            using namespace std::string_literals;
 
 1125            peer->charge(Resource::feeInvalidData, 
"ledger_data "s + ex.
what());
 
 1136    if ((packet.type() == protocol::liTX_NODE) ||
 
 1137        (packet.type() == protocol::liAS_NODE))
 
 1139        std::string type = packet.type() == protocol::liTX_NODE ? 
"liTX_NODE: " 
 1142        if (packet.nodes().empty())
 
 1144            JLOG(journal_.info()) << peer->id() << 
": response with no nodes";
 
 1145            peer->charge(Resource::feeMalformedRequest, 
"ledger_data no nodes");
 
 1152        for (
auto const& node : packet.nodes())
 
 1154            if (!node.has_nodeid() || !node.has_nodedata())
 
 1156                JLOG(journal_.warn()) << 
"Got bad node";
 
 1158                    Resource::feeMalformedRequest, 
"ledger_data bad node");
 
 1164        receiveNode(packet, san);
 
 1166        JLOG(journal_.debug())
 
 1168            << ((packet.type() == protocol::liTX_NODE) ? 
"TX" : 
"AS")
 
 1169            << 
" node stats: " << san.
get();
 
 
 1196        maxCount = 
std::max(maxCount, dataCount);
 
 1197        auto i = counts.
find(peer);
 
 1198        if (i == counts.
end())
 
 1200            counts.
emplace(std::move(peer), dataCount);
 
 1203        i->second = 
std::max(i->second, dataCount);
 
 
 1212        auto const thresh = maxCount / 2;
 
 1213        auto i = counts.
begin();
 
 1214        while (i != counts.
end())
 
 1216            if (i->second < thresh)
 
 1217                i = counts.
erase(i);
 
 
 1232        auto outFunc = [&f](
auto&& v) { f(v.first); };
 
 1247            boost::make_function_output_iterator(outFunc),
 
 
 
 1259InboundLedger::runData()
 
 1264    decltype(mReceivedData) data;
 
 1278            if (mReceivedData.empty())
 
 1280                mReceiveDispatched = 
false;
 
 1284            data.swap(mReceivedData);
 
 1287        for (
auto& entry : data)
 
 1289            if (
auto peer = entry.first.lock())
 
 1291                int count = processData(peer, *(entry.second));
 
 1292                dataCounts.
update(std::move(peer), count);
 
 1301        trigger(peer, TriggerReason::reply);
 
 
 1306InboundLedger::getJson(
int)
 
 1312    ret[jss::hash] = to_string(hash_);
 
 1315        ret[jss::complete] = 
true;
 
 1318        ret[jss::failed] = 
true;
 
 1320    if (!complete_ && !failed_)
 
 1321        ret[jss::peers] = 
static_cast<int>(mPeerSet->getPeerIds().size());
 
 1323    ret[jss::have_header] = mHaveHeader;
 
 1327        ret[jss::have_state] = mHaveState;
 
 1328        ret[jss::have_transactions] = mHaveTransactions;
 
 1331    ret[jss::timeouts] = timeouts_;
 
 1333    if (mHaveHeader && !mHaveState)
 
 1336        for (
auto const& h : neededStateHashes(16, 
nullptr))
 
 1340        ret[jss::needed_state_hashes] = hv;
 
 1343    if (mHaveHeader && !mHaveTransactions)
 
 1346        for (
auto const& h : neededTxHashes(16, 
nullptr))
 
 1350        ret[jss::needed_transaction_hashes] = hv;
 
 
T back_inserter(T... args)
 
Value & append(Value const &value)
Append value to array at the end.
 
Stream trace() const
Severity stream access functions.
 
virtual Config & config()=0
 
virtual JobQueue & getJobQueue()=0
 
virtual InboundLedgers & getInboundLedgers()=0
 
virtual Family & getNodeFamily()=0
 
virtual LedgerMaster & getLedgerMaster()=0
 
virtual NodeStore::Database & db()=0
 
std::size_t getPeerCount() const
 
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
 
void init(ScopedLockType &collectionLock)
 
std::set< uint256 > mRecentNodes
 
void addPeers()
Add more peers to the set, if possible.
 
std::shared_ptr< Ledger > mLedger
 
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
 
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
 
void tryDB(NodeStore::Database &srcDB)
 
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
 
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
 
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
 
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
 
std::vector< neededHash_t > getNeededHashes()
 
void update(std::uint32_t seq)
 
std::unique_ptr< PeerSet > mPeerSet
 
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
 
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
 
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
 
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
 
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the coresponding hash from peers.
 
bool storeLedger(std::shared_ptr< Ledger const > ledger)
 
Persistency layer for NodeObject.
 
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
 
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
 
SHAMapHash getHash() const
 
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
 
int addRaw(Blob const &vector)
 
An immutable linear range of bytes.
 
This class is an "active" object.
 
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
 
bool progress_
Whether forward progress has been made.
 
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
 
std::recursive_mutex mtx_
 
static constexpr std::size_t size()
 
@ arrayValue
array value (ordered list)
 
@ objectValue
object value (collection of name/value pairs).
 
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
 
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
 
auto constexpr ledgerAcquireTimeout
 
@ ledgerBecomeAggressiveThreshold
 
@ ledgerTimeoutRetriesMax
 
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
 
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
 
Number root(Number f, unsigned d)
 
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
 
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
 
T shared_from_this(T... args)
 
T stable_partition(T... args)
 
std::unordered_map< std::shared_ptr< Peer >, int > counts
 
void sampleN(std::size_t n, F &&f)
 
void update(std::shared_ptr< Peer > &&peer, int dataCount)