rippled
Loading...
Searching...
No Matches
TransactionAcquire.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/ledger/ConsensusTransSetSF.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/detail/TransactionAcquire.h>
24#include <xrpld/app/main/Application.h>
25#include <xrpld/app/misc/NetworkOPs.h>
26
27#include <memory>
28
29namespace ripple {
30
31using namespace std::chrono_literals;
32
33// Timeout interval in milliseconds
34auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
35
36enum {
39};
40
42 Application& app,
43 uint256 const& hash,
46 app,
47 hash,
49 {jtTXN_DATA, "TransactionAcquire", {}},
50 app.journal("TransactionAcquire"))
51 , mHaveRoot(false)
52 , mPeerSet(std::move(peerSet))
53{
55 SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
56 mMap->setUnbacked();
57}
58
59void
61{
62 // We hold a PeerSet lock and so cannot do real work here
63
64 if (failed_)
65 {
66 JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
67 }
68 else
69 {
70 JLOG(journal_.debug()) << "Acquired TX set " << hash_;
71 mMap->setImmutable();
72
73 uint256 const& hash(hash_);
74 std::shared_ptr<SHAMap> const& map(mMap);
75 auto const pap = &app_;
76 // Note that, when we're in the process of shutting down, addJob()
77 // may reject the request. If that happens then giveSet() will
78 // not be called. That's fine. According to David the giveSet() call
79 // just updates the consensus and related structures when we acquire
80 // a transaction set. No need to update them if we're shutting down.
82 jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
83 pap->getInboundTransactions().giveSet(hash, map, true);
84 });
85 }
86}
87
88void
90{
92 {
93 failed_ = true;
94 done();
95 return;
96 }
97
99 trigger(nullptr);
100
101 addPeers(1);
102}
103
109
110void
112{
113 if (complete_)
114 {
115 JLOG(journal_.info()) << "trigger after complete";
116 return;
117 }
118 if (failed_)
119 {
120 JLOG(journal_.info()) << "trigger after fail";
121 return;
122 }
123
124 if (!mHaveRoot)
125 {
126 JLOG(journal_.trace()) << "TransactionAcquire::trigger "
127 << (peer ? "havePeer" : "noPeer") << " no root";
128 protocol::TMGetLedger tmGL;
129 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
130 tmGL.set_itype(protocol::liTS_CANDIDATE);
131 tmGL.set_querydepth(3); // We probably need the whole thing
132
133 if (timeouts_ != 0)
134 tmGL.set_querytype(protocol::qtINDIRECT);
135
136 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
137 mPeerSet->sendRequest(tmGL, peer);
138 }
139 else if (!mMap->isValid())
140 {
141 failed_ = true;
142 done();
143 }
144 else
145 {
147 auto nodes = mMap->getMissingNodes(256, &sf);
148
149 if (nodes.empty())
150 {
151 if (mMap->isValid())
152 complete_ = true;
153 else
154 failed_ = true;
155
156 done();
157 return;
158 }
159
160 protocol::TMGetLedger tmGL;
161 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
162 tmGL.set_itype(protocol::liTS_CANDIDATE);
163
164 if (timeouts_ != 0)
165 tmGL.set_querytype(protocol::qtINDIRECT);
166
167 for (auto const& node : nodes)
168 {
169 *tmGL.add_nodeids() = node.first.getRawString();
170 }
171 mPeerSet->sendRequest(tmGL, peer);
172 }
173}
174
178 std::shared_ptr<Peer> const& peer)
179{
181
182 if (complete_)
183 {
184 JLOG(journal_.trace()) << "TX set complete";
185 return SHAMapAddNode();
186 }
187
188 if (failed_)
189 {
190 JLOG(journal_.trace()) << "TX set failed";
191 return SHAMapAddNode();
192 }
193
194 try
195 {
196 if (data.empty())
197 return SHAMapAddNode::invalid();
198
200
201 for (auto const& d : data)
202 {
203 if (d.first.isRoot())
204 {
205 if (mHaveRoot)
206 JLOG(journal_.debug())
207 << "Got root TXS node, already have it";
208 else if (!mMap->addRootNode(
209 SHAMapHash{hash_}, d.second, nullptr)
210 .isGood())
211 {
212 JLOG(journal_.warn()) << "TX acquire got bad root node";
213 }
214 else
215 mHaveRoot = true;
216 }
217 else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
218 {
219 JLOG(journal_.warn()) << "TX acquire got bad non-root node";
220 return SHAMapAddNode::invalid();
221 }
222 }
223
224 trigger(peer);
225 progress_ = true;
226 return SHAMapAddNode::useful();
227 }
228 catch (std::exception const& ex)
229 {
230 JLOG(journal_.error())
231 << "Peer " << peer->id()
232 << " sent us junky transaction node data: " << ex.what();
233 return SHAMapAddNode::invalid();
234 }
235}
236
237void
239{
240 mPeerSet->addPeers(
241 limit,
242 [this](auto peer) { return peer->hasTxSet(hash_); },
243 [this](auto peer) { trigger(peer); });
244}
245
246void
248{
250
251 addPeers(numPeers);
252
253 setTimer(sl);
254}
255
256void
265
266} // namespace ripple
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
Stream warn() const
Definition Journal.h:340
virtual NodeCache & getTempNodeCache()=0
virtual JobQueue & getJobQueue()=0
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:168
static SHAMapAddNode invalid()
static SHAMapAddNode useful()
Identifies a node inside a SHAMap.
std::string getRawString() const
This class is an "active" object.
bool progress_
Whether forward progress has been made.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
std::shared_ptr< SHAMap > mMap
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void addPeers(std::size_t limit)
std::unique_ptr< PeerSet > mPeerSet
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
iterator begin()
Definition base_uint.h:136
static constexpr std::size_t size()
Definition base_uint.h:526
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
auto constexpr TX_ACQUIRE_TIMEOUT
@ jtTXN_DATA
Definition Job.h:69
T what(T... args)