xrpld
Loading...
Searching...
No Matches
TransactionAcquire.cpp
1#include <xrpld/app/ledger/detail/TransactionAcquire.h>
2
3#include <xrpld/app/ledger/ConsensusTransSetSF.h>
4#include <xrpld/app/ledger/InboundTransactions.h>
5#include <xrpld/app/ledger/detail/TimeoutCounter.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/overlay/PeerSet.h>
8
9#include <xrpl/basics/Log.h>
10#include <xrpl/basics/Slice.h>
11#include <xrpl/basics/base_uint.h>
12#include <xrpl/core/Job.h>
13#include <xrpl/server/NetworkOPs.h>
14#include <xrpl/shamap/SHAMap.h>
15#include <xrpl/shamap/SHAMapAddNode.h>
16#include <xrpl/shamap/SHAMapMissingNode.h>
17
18#include <xrpl.pb.h>
19
20#include <algorithm>
21#include <cstddef>
22#include <exception>
23#include <memory>
24#include <utility>
25#include <vector>
26
27namespace xrpl {
28
29using namespace std::chrono_literals;
30
31// Timeout interval in milliseconds
32constexpr auto kTxAcquireTimeout = 250ms;
33
34static constexpr auto kNormTimeouts = 4;
35static constexpr auto kMaxTimeouts = 20;
36
38 Application& app,
39 uint256 const& hash,
42 app,
43 hash,
45 {.jobType = JtTxnData, .jobName = "TxAcq", .jobLimit = {}},
46 app.getJournal("TransactionAcquire"))
47 , peerSet_(std::move(peerSet))
48{
49 map_ = std::make_shared<SHAMap>(SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
50 map_->setUnbacked();
51}
52
53void
55{
56 // We hold a PeerSet lock and so cannot do real work here
57
58 if (failed_)
59 {
60 JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
61 }
62 else
63 {
64 JLOG(journal_.debug()) << "Acquired TX set " << hash_;
65 map_->setImmutable();
66
67 uint256 const& hash(hash_);
68 std::shared_ptr<SHAMap> const& map(map_);
69 auto const pap = &app_;
70 // Note that, when we're in the process of shutting down, addJob()
71 // may reject the request. If that happens then giveSet() will
72 // not be called. That's fine. According to David the giveSet() call
73 // just updates the consensus and related structures when we acquire
74 // a transaction set. No need to update them if we're shutting down.
75 app_.getJobQueue().addJob(JtTxnData, "ComplAcquire", [pap, hash, map]() {
76 pap->getInboundTransactions().giveSet(hash, map, true);
77 });
78 }
79}
80
81void
83{
85 {
86 failed_ = true;
87 done();
88 return;
89 }
90
92 trigger(nullptr);
93
94 addPeers(1);
95}
96
102
103void
105{
106 if (complete_)
107 {
108 JLOG(journal_.info()) << "trigger after complete";
109 return;
110 }
111 if (failed_)
112 {
113 JLOG(journal_.info()) << "trigger after fail";
114 return;
115 }
116
117 if (!haveRoot_)
118 {
119 JLOG(journal_.trace()) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer")
120 << " no root";
121 protocol::TMGetLedger tmGL;
122 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
123 tmGL.set_itype(protocol::liTS_CANDIDATE);
124 tmGL.set_querydepth(3); // We probably need the whole thing
125
126 if (timeouts_ != 0)
127 tmGL.set_querytype(protocol::qtINDIRECT);
128
129 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
130 peerSet_->sendRequest(tmGL, peer);
131 }
132 else if (!map_->isValid())
133 {
134 failed_ = true;
135 done();
136 }
137 else
138 {
139 ConsensusTransSetSF sf(app_, app_.getTempNodeCache());
140 auto nodes = map_->getMissingNodes(256, &sf);
141
142 if (nodes.empty())
143 {
144 if (map_->isValid())
145 {
146 complete_ = true;
147 }
148 else
149 {
150 failed_ = true;
151 }
152
153 done();
154 return;
155 }
156
157 protocol::TMGetLedger tmGL;
158 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
159 tmGL.set_itype(protocol::liTS_CANDIDATE);
160
161 if (timeouts_ != 0)
162 tmGL.set_querytype(protocol::qtINDIRECT);
163
164 for (auto const& node : nodes)
165 {
166 *tmGL.add_nodeids() = node.first.getRawString();
167 }
168 peerSet_->sendRequest(tmGL, peer);
169 }
170}
171
175 std::shared_ptr<Peer> const& peer)
176{
177 ScopedLockType const sl(mtx_);
178
179 if (complete_)
180 {
181 JLOG(journal_.trace()) << "TX set complete";
182 return SHAMapAddNode();
183 }
184
185 if (failed_)
186 {
187 JLOG(journal_.trace()) << "TX set failed";
188 return SHAMapAddNode();
189 }
190
191 try
192 {
193 if (data.empty())
194 return SHAMapAddNode::invalid();
195
196 ConsensusTransSetSF sf(app_, app_.getTempNodeCache());
197
198 for (auto const& d : data)
199 {
200 if (d.first.isRoot())
201 {
202 if (haveRoot_)
203 {
204 JLOG(journal_.debug()) << "Got root TXS node, already have it";
205 }
206 else if (!map_->addRootNode(SHAMapHash{hash_}, d.second, nullptr).isGood())
207 {
208 JLOG(journal_.warn()) << "TX acquire got bad root node";
209 }
210 else
211 {
212 haveRoot_ = true;
213 }
214 }
215 else if (!map_->addKnownNode(d.first, d.second, &sf).isGood())
216 {
217 JLOG(journal_.warn()) << "TX acquire got bad non-root node";
218 return SHAMapAddNode::invalid();
219 }
220 }
221
222 trigger(peer);
223 progress_ = true;
224 return SHAMapAddNode::useful();
225 }
226 catch (std::exception const& ex)
227 {
228 JLOG(journal_.error()) << "Peer " << peer->id()
229 << " sent us junky transaction node data: " << ex.what();
230 return SHAMapAddNode::invalid();
231 }
232}
233
234void
236{
237 peerSet_->addPeers(
238 limit,
239 [this](auto peer) { return peer->hasTxSet(hash_); },
240 [this](auto peer) { trigger(peer); });
241}
242
243void
245{
247
248 addPeers(numPeers);
249
250 setTimer(sl);
251}
252
253void
261
262} // namespace xrpl
static SHAMapAddNode useful()
static SHAMapAddNode invalid()
Identifies a node inside a SHAMap.
std::string getRawString() const
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
bool progress_
Whether forward progress has been made.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::shared_ptr< SHAMap > map_
void addPeers(std::size_t limit)
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > peerSet_
T make_shared(T... args)
T min(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ JtTxnData
Definition Job.h:50
static constexpr auto kNormTimeouts
static constexpr auto kMaxTimeouts
constexpr auto kTxAcquireTimeout
BaseUInt< 256 > uint256
Definition base_uint.h:562
T what(T... args)