1#include <xrpld/app/ledger/detail/TransactionAcquire.h>
3#include <xrpld/app/ledger/ConsensusTransSetSF.h>
4#include <xrpld/app/ledger/InboundTransactions.h>
5#include <xrpld/app/ledger/detail/TimeoutCounter.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/overlay/PeerSet.h>
9#include <xrpl/basics/Log.h>
10#include <xrpl/basics/Slice.h>
11#include <xrpl/basics/base_uint.h>
12#include <xrpl/core/Job.h>
13#include <xrpl/server/NetworkOPs.h>
14#include <xrpl/shamap/SHAMap.h>
15#include <xrpl/shamap/SHAMapAddNode.h>
16#include <xrpl/shamap/SHAMapMissingNode.h>
29using namespace std::chrono_literals;
45 {.jobType =
JtTxnData, .jobName =
"TxAcq", .jobLimit = {}},
46 app.getJournal(
"TransactionAcquire"))
47 , peerSet_(std::move(peerSet))
60 JLOG(
journal_.debug()) <<
"Failed to acquire TX set " <<
hash_;
69 auto const pap = &
app_;
75 app_.getJobQueue().addJob(
JtTxnData,
"ComplAcquire", [pap, hash, map]() {
76 pap->getInboundTransactions().giveSet(hash, map,
true);
108 JLOG(
journal_.info()) <<
"trigger after complete";
113 JLOG(
journal_.info()) <<
"trigger after fail";
119 JLOG(
journal_.trace()) <<
"TransactionAcquire::trigger " << (peer ?
"havePeer" :
"noPeer")
121 protocol::TMGetLedger tmGL;
122 tmGL.set_ledgerhash(
hash_.begin(),
hash_.size());
123 tmGL.set_itype(protocol::liTS_CANDIDATE);
124 tmGL.set_querydepth(3);
127 tmGL.set_querytype(protocol::qtINDIRECT);
132 else if (!
map_->isValid())
140 auto nodes =
map_->getMissingNodes(256, &sf);
157 protocol::TMGetLedger tmGL;
158 tmGL.set_ledgerhash(
hash_.begin(),
hash_.size());
159 tmGL.set_itype(protocol::liTS_CANDIDATE);
162 tmGL.set_querytype(protocol::qtINDIRECT);
164 for (
auto const& node : nodes)
166 *tmGL.add_nodeids() = node.first.getRawString();
181 JLOG(
journal_.trace()) <<
"TX set complete";
187 JLOG(
journal_.trace()) <<
"TX set failed";
198 for (
auto const& d : data)
200 if (d.first.isRoot())
204 JLOG(
journal_.debug()) <<
"Got root TXS node, already have it";
206 else if (!
map_->addRootNode(
SHAMapHash{hash_}, d.second,
nullptr).isGood())
208 JLOG(
journal_.warn()) <<
"TX acquire got bad root node";
215 else if (!
map_->addKnownNode(d.first, d.second, &sf).isGood())
217 JLOG(
journal_.warn()) <<
"TX acquire got bad non-root node";
228 JLOG(
journal_.error()) <<
"Peer " << peer->id()
229 <<
" sent us junky transaction node data: " << ex.
what();
239 [
this](
auto peer) {
return peer->hasTxSet(
hash_); },
240 [
this](
auto peer) {
trigger(peer); });
static SHAMapAddNode useful()
static SHAMapAddNode invalid()
Identifies a node inside a SHAMap.
std::string getRawString() const
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
bool progress_
Whether forward progress has been made.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
void init(int startPeers)
std::shared_ptr< SHAMap > map_
void addPeers(std::size_t limit)
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > peerSet_
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
static constexpr auto kNormTimeouts
static constexpr auto kMaxTimeouts
constexpr auto kTxAcquireTimeout
T shared_from_this(T... args)