rippled
Loading...
Searching...
No Matches
TransactionAcquire.cpp
1#include <xrpld/app/ledger/ConsensusTransSetSF.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/detail/TransactionAcquire.h>
5#include <xrpld/app/main/Application.h>
6
7#include <xrpl/server/NetworkOPs.h>
8
9#include <memory>
10
11namespace xrpl {
12
13using namespace std::chrono_literals;
14
15// Timeout interval in milliseconds
16auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
17
18enum {
21};
22
24 : TimeoutCounter(app, hash, TX_ACQUIRE_TIMEOUT, {jtTXN_DATA, "TxAcq", {}}, app.journal("TransactionAcquire"))
25 , mHaveRoot(false)
26 , mPeerSet(std::move(peerSet))
27{
28 mMap = std::make_shared<SHAMap>(SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
29 mMap->setUnbacked();
30}
31
32void
34{
35 // We hold a PeerSet lock and so cannot do real work here
36
37 if (failed_)
38 {
39 JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
40 }
41 else
42 {
43 JLOG(journal_.debug()) << "Acquired TX set " << hash_;
44 mMap->setImmutable();
45
46 uint256 const& hash(hash_);
47 std::shared_ptr<SHAMap> const& map(mMap);
48 auto const pap = &app_;
49 // Note that, when we're in the process of shutting down, addJob()
50 // may reject the request. If that happens then giveSet() will
51 // not be called. That's fine. According to David the giveSet() call
52 // just updates the consensus and related structures when we acquire
53 // a transaction set. No need to update them if we're shutting down.
55 jtTXN_DATA, "ComplAcquire", [pap, hash, map]() { pap->getInboundTransactions().giveSet(hash, map, true); });
56 }
57}
58
59void
61{
63 {
64 failed_ = true;
65 done();
66 return;
67 }
68
70 trigger(nullptr);
71
72 addPeers(1);
73}
74
80
81void
83{
84 if (complete_)
85 {
86 JLOG(journal_.info()) << "trigger after complete";
87 return;
88 }
89 if (failed_)
90 {
91 JLOG(journal_.info()) << "trigger after fail";
92 return;
93 }
94
95 if (!mHaveRoot)
96 {
97 JLOG(journal_.trace()) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer") << " no root";
98 protocol::TMGetLedger tmGL;
99 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
100 tmGL.set_itype(protocol::liTS_CANDIDATE);
101 tmGL.set_querydepth(3); // We probably need the whole thing
102
103 if (timeouts_ != 0)
104 tmGL.set_querytype(protocol::qtINDIRECT);
105
106 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
107 mPeerSet->sendRequest(tmGL, peer);
108 }
109 else if (!mMap->isValid())
110 {
111 failed_ = true;
112 done();
113 }
114 else
115 {
117 auto nodes = mMap->getMissingNodes(256, &sf);
118
119 if (nodes.empty())
120 {
121 if (mMap->isValid())
122 complete_ = true;
123 else
124 failed_ = true;
125
126 done();
127 return;
128 }
129
130 protocol::TMGetLedger tmGL;
131 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
132 tmGL.set_itype(protocol::liTS_CANDIDATE);
133
134 if (timeouts_ != 0)
135 tmGL.set_querytype(protocol::qtINDIRECT);
136
137 for (auto const& node : nodes)
138 {
139 *tmGL.add_nodeids() = node.first.getRawString();
140 }
141 mPeerSet->sendRequest(tmGL, peer);
142 }
143}
144
148 std::shared_ptr<Peer> const& peer)
149{
151
152 if (complete_)
153 {
154 JLOG(journal_.trace()) << "TX set complete";
155 return SHAMapAddNode();
156 }
157
158 if (failed_)
159 {
160 JLOG(journal_.trace()) << "TX set failed";
161 return SHAMapAddNode();
162 }
163
164 try
165 {
166 if (data.empty())
167 return SHAMapAddNode::invalid();
168
170
171 for (auto const& d : data)
172 {
173 if (d.first.isRoot())
174 {
175 if (mHaveRoot)
176 JLOG(journal_.debug()) << "Got root TXS node, already have it";
177 else if (!mMap->addRootNode(SHAMapHash{hash_}, d.second, nullptr).isGood())
178 {
179 JLOG(journal_.warn()) << "TX acquire got bad root node";
180 }
181 else
182 mHaveRoot = true;
183 }
184 else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
185 {
186 JLOG(journal_.warn()) << "TX acquire got bad non-root node";
187 return SHAMapAddNode::invalid();
188 }
189 }
190
191 trigger(peer);
192 progress_ = true;
193 return SHAMapAddNode::useful();
194 }
195 catch (std::exception const& ex)
196 {
197 JLOG(journal_.error()) << "Peer " << peer->id() << " sent us junky transaction node data: " << ex.what();
198 return SHAMapAddNode::invalid();
199 }
200}
201
202void
204{
205 mPeerSet->addPeers(
206 limit, [this](auto peer) { return peer->hasTxSet(hash_); }, [this](auto peer) { trigger(peer); });
207}
208
209void
211{
213
214 addPeers(numPeers);
215
216 setTimer(sl);
217}
218
219void
228
229} // namespace xrpl
Stream error() const
Definition Journal.h:318
Stream debug() const
Definition Journal.h:300
Stream info() const
Definition Journal.h:306
Stream trace() const
Severity stream access functions.
Definition Journal.h:294
Stream warn() const
Definition Journal.h:312
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:145
static SHAMapAddNode useful()
static SHAMapAddNode invalid()
Identifies a node inside a SHAMap.
std::string getRawString() const
virtual NodeCache & getTempNodeCache()=0
virtual JobQueue & getJobQueue()=0
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
bool progress_
Whether forward progress has been made.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::shared_ptr< SHAMap > mMap
void addPeers(std::size_t limit)
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > mPeerSet
iterator begin()
Definition base_uint.h:112
static constexpr std::size_t size()
Definition base_uint.h:494
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ jtTXN_DATA
Definition Job.h:48
auto constexpr TX_ACQUIRE_TIMEOUT
T what(T... args)