rippled
Loading...
Searching...
No Matches
TransactionAcquire.cpp
1#include <xrpld/app/ledger/ConsensusTransSetSF.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/detail/TransactionAcquire.h>
5#include <xrpld/app/main/Application.h>
6
7#include <xrpl/server/NetworkOPs.h>
8
9#include <algorithm>
10#include <memory>
11
12namespace xrpl {
13
14using namespace std::chrono_literals;
15
16// Timeout interval in milliseconds
17auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
18
19enum {
22};
23
25 Application& app,
26 uint256 const& hash,
29 app,
30 hash,
32 {jtTXN_DATA, "TxAcq", {}},
33 app.getJournal("TransactionAcquire"))
34 , mPeerSet(std::move(peerSet))
35{
36 mMap = std::make_shared<SHAMap>(SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
37 mMap->setUnbacked();
38}
39
40void
42{
43 // We hold a PeerSet lock and so cannot do real work here
44
45 if (failed_)
46 {
47 JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
48 }
49 else
50 {
51 JLOG(journal_.debug()) << "Acquired TX set " << hash_;
52 mMap->setImmutable();
53
54 uint256 const& hash(hash_);
55 std::shared_ptr<SHAMap> const& map(mMap);
56 auto const pap = &app_;
57 // Note that, when we're in the process of shutting down, addJob()
58 // may reject the request. If that happens then giveSet() will
59 // not be called. That's fine. According to David the giveSet() call
60 // just updates the consensus and related structures when we acquire
61 // a transaction set. No need to update them if we're shutting down.
62 app_.getJobQueue().addJob(jtTXN_DATA, "ComplAcquire", [pap, hash, map]() {
63 pap->getInboundTransactions().giveSet(hash, map, true);
64 });
65 }
66}
67
68void
70{
72 {
73 failed_ = true;
74 done();
75 return;
76 }
77
79 trigger(nullptr);
80
81 addPeers(1);
82}
83
89
90void
92{
93 if (complete_)
94 {
95 JLOG(journal_.info()) << "trigger after complete";
96 return;
97 }
98 if (failed_)
99 {
100 JLOG(journal_.info()) << "trigger after fail";
101 return;
102 }
103
104 if (!mHaveRoot)
105 {
106 JLOG(journal_.trace()) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer")
107 << " no root";
108 protocol::TMGetLedger tmGL;
109 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
110 tmGL.set_itype(protocol::liTS_CANDIDATE);
111 tmGL.set_querydepth(3); // We probably need the whole thing
112
113 if (timeouts_ != 0)
114 tmGL.set_querytype(protocol::qtINDIRECT);
115
116 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
117 mPeerSet->sendRequest(tmGL, peer);
118 }
119 else if (!mMap->isValid())
120 {
121 failed_ = true;
122 done();
123 }
124 else
125 {
127 auto nodes = mMap->getMissingNodes(256, &sf);
128
129 if (nodes.empty())
130 {
131 if (mMap->isValid())
132 {
133 complete_ = true;
134 }
135 else
136 {
137 failed_ = true;
138 }
139
140 done();
141 return;
142 }
143
144 protocol::TMGetLedger tmGL;
145 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
146 tmGL.set_itype(protocol::liTS_CANDIDATE);
147
148 if (timeouts_ != 0)
149 tmGL.set_querytype(protocol::qtINDIRECT);
150
151 for (auto const& node : nodes)
152 {
153 *tmGL.add_nodeids() = node.first.getRawString();
154 }
155 mPeerSet->sendRequest(tmGL, peer);
156 }
157}
158
162 std::shared_ptr<Peer> const& peer)
163{
164 ScopedLockType const sl(mtx_);
165
166 if (complete_)
167 {
168 JLOG(journal_.trace()) << "TX set complete";
169 return SHAMapAddNode();
170 }
171
172 if (failed_)
173 {
174 JLOG(journal_.trace()) << "TX set failed";
175 return SHAMapAddNode();
176 }
177
178 try
179 {
180 if (data.empty())
181 return SHAMapAddNode::invalid();
182
184
185 for (auto const& d : data)
186 {
187 if (d.first.isRoot())
188 {
189 if (mHaveRoot)
190 {
191 JLOG(journal_.debug()) << "Got root TXS node, already have it";
192 }
193 else if (!mMap->addRootNode(SHAMapHash{hash_}, d.second, nullptr).isGood())
194 {
195 JLOG(journal_.warn()) << "TX acquire got bad root node";
196 }
197 else
198 {
199 mHaveRoot = true;
200 }
201 }
202 else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
203 {
204 JLOG(journal_.warn()) << "TX acquire got bad non-root node";
205 return SHAMapAddNode::invalid();
206 }
207 }
208
209 trigger(peer);
210 progress_ = true;
211 return SHAMapAddNode::useful();
212 }
213 catch (std::exception const& ex)
214 {
215 JLOG(journal_.error()) << "Peer " << peer->id()
216 << " sent us junky transaction node data: " << ex.what();
217 return SHAMapAddNode::invalid();
218 }
219}
220
221void
223{
224 mPeerSet->addPeers(
225 limit,
226 [this](auto peer) { return peer->hasTxSet(hash_); },
227 [this](auto peer) { trigger(peer); });
228}
229
230void
232{
234
235 addPeers(numPeers);
236
237 setTimer(sl);
238}
239
240void
248
249} // namespace xrpl
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
static SHAMapAddNode useful()
static SHAMapAddNode invalid()
Identifies a node inside a SHAMap.
std::string getRawString() const
virtual NodeCache & getTempNodeCache()=0
virtual JobQueue & getJobQueue()=0
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
bool progress_
Whether forward progress has been made.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::shared_ptr< SHAMap > mMap
void addPeers(std::size_t limit)
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > mPeerSet
iterator begin()
Definition base_uint.h:112
static constexpr std::size_t size()
Definition base_uint.h:499
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ jtTXN_DATA
Definition Job.h:48
auto constexpr TX_ACQUIRE_TIMEOUT
T what(T... args)