rippled
Loading...
Searching...
No Matches
InboundLedgers.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/ledger/InboundLedgers.h>
21#include <xrpld/app/ledger/LedgerMaster.h>
22#include <xrpld/app/main/Application.h>
23#include <xrpld/app/misc/NetworkOPs.h>
24#include <xrpld/core/JobQueue.h>
25#include <xrpld/perflog/PerfLog.h>
26
27#include <xrpl/basics/DecayingSample.h>
28#include <xrpl/basics/Log.h>
29#include <xrpl/basics/scope.h>
30#include <xrpl/beast/container/aged_map.h>
31#include <xrpl/protocol/jss.h>
32
33#include <exception>
34#include <memory>
35#include <mutex>
36#include <vector>
37
38namespace ripple {
39
41{
42private:
45 // measures ledgers per second, constants are important
48
49public:
50 // How long before we try again to acquire the same ledger
51 static constexpr std::chrono::minutes const kReacquireInterval{5};
52
54 Application& app,
55 clock_type& clock,
56 beast::insight::Collector::ptr const& collector,
58 : app_(app)
59 , fetchRate_(clock.now())
60 , j_(app.journal("InboundLedger"))
61 , m_clock(clock)
62 , mRecentFailures(clock)
63 , mCounter(collector->make_counter("ledger_fetches"))
64 , mPeerSetBuilder(std::move(peerSetBuilder))
65 {
66 }
67
71 uint256 const& hash,
72 std::uint32_t seq,
73 InboundLedger::Reason reason) override
74 {
75 auto doAcquire = [&, seq, reason]() -> std::shared_ptr<Ledger const> {
76 XRPL_ASSERT(
77 hash.isNonZero(),
78 "ripple::InboundLedgersImp::acquire::doAcquire : nonzero hash");
79
80 // probably not the right rule
84 return {};
85
86 bool isNew = true;
88 {
90 if (stopping_)
91 {
92 return {};
93 }
94
95 auto it = mLedgers.find(hash);
96 if (it != mLedgers.end())
97 {
98 isNew = false;
99 inbound = it->second;
100 }
101 else
102 {
104 app_,
105 hash,
106 seq,
107 reason,
109 mPeerSetBuilder->build());
110 mLedgers.emplace(hash, inbound);
111 inbound->init(sl);
112 ++mCounter;
113 }
114 }
115
116 if (inbound->isFailed())
117 return {};
118
119 if (!isNew)
120 inbound->update(seq);
121
122 if (!inbound->isComplete())
123 return {};
124
125 return inbound->getLedger();
126 };
127 using namespace std::chrono_literals;
129 doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
130
131 return ledger;
132 }
133
134 void
136 uint256 const& hash,
137 std::uint32_t seq,
138 InboundLedger::Reason reason) override
139 {
141 try
142 {
143 if (pendingAcquires_.contains(hash))
144 return;
145 pendingAcquires_.insert(hash);
146 scope_unlock unlock(lock);
147 acquire(hash, seq, reason);
148 }
149 catch (std::exception const& e)
150 {
151 JLOG(j_.warn())
152 << "Exception thrown for acquiring new inbound ledger " << hash
153 << ": " << e.what();
154 }
155 catch (...)
156 {
157 JLOG(j_.warn())
158 << "Unknown exception thrown for acquiring new inbound ledger "
159 << hash;
160 }
161 pendingAcquires_.erase(hash);
162 }
163
165 find(uint256 const& hash) override
166 {
167 XRPL_ASSERT(
168 hash.isNonZero(),
169 "ripple::InboundLedgersImp::find : nonzero input");
170
172
173 {
175
176 auto it = mLedgers.find(hash);
177 if (it != mLedgers.end())
178 {
179 ret = it->second;
180 }
181 }
182
183 return ret;
184 }
185
186 /*
187 This gets called when
188 "We got some data from an inbound ledger"
189
190 inboundLedgerTrigger:
191 "What do we do with this partial data?"
192 Figures out what to do with the responses to our requests for information.
193
194 */
195 // means "We got some data from an inbound ledger"
196
197 // VFALCO TODO Remove the dependency on the Peer object.
200 bool
202 LedgerHash const& hash,
205 {
206 if (auto ledger = find(hash))
207 {
208 JLOG(j_.trace()) << "Got data (" << packet->nodes().size()
209 << ") for acquiring ledger: " << hash;
210
211 // Stash the data for later processing and see if we need to
212 // dispatch
213 if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
215 jtLEDGER_DATA, "processLedgerData", [ledger]() {
216 ledger->runData();
217 });
218
219 return true;
220 }
221
222 JLOG(j_.trace()) << "Got data for ledger " << hash
223 << " which we're no longer acquiring";
224
225 // If it's state node data, stash it because it still might be
226 // useful.
227 if (packet->type() == protocol::liAS_NODE)
228 {
230 jtLEDGER_DATA, "gotStaleData", [this, packet]() {
231 gotStaleData(packet);
232 });
233 }
234
235 return false;
236 }
237
238 void
239 logFailure(uint256 const& h, std::uint32_t seq) override
240 {
242
243 mRecentFailures.emplace(h, seq);
244 }
245
246 bool
247 isFailure(uint256 const& h) override
248 {
250
252 return mRecentFailures.find(h) != mRecentFailures.end();
253 }
254
261 void
263 {
264 Serializer s;
265 try
266 {
267 for (int i = 0; i < packet_ptr->nodes().size(); ++i)
268 {
269 auto const& node = packet_ptr->nodes(i);
270
271 if (!node.has_nodeid() || !node.has_nodedata())
272 return;
273
274 auto newNode =
275 SHAMapTreeNode::makeFromWire(makeSlice(node.nodedata()));
276
277 if (!newNode)
278 return;
279
280 s.erase();
281 newNode->serializeWithPrefix(s);
282
284 newNode->getHash().as_uint256(),
286 }
287 }
288 catch (std::exception const&)
289 {
290 }
291 }
292
293 void
294 clearFailures() override
295 {
297
298 mRecentFailures.clear();
299 mLedgers.clear();
300 }
301
303 fetchRate() override
304 {
306 return 60 * fetchRate_.value(m_clock.now());
307 }
308
309 // Should only be called with an inboundledger that has
310 // a reason of history
311 void
313 {
316 }
317
319 getInfo() override
320 {
322
324
325 {
327
328 acqs.reserve(mLedgers.size());
329 for (auto const& it : mLedgers)
330 {
331 XRPL_ASSERT(
332 it.second,
333 "ripple::InboundLedgersImp::getInfo : non-null ledger");
334 acqs.push_back(it);
335 }
336 for (auto const& it : mRecentFailures)
337 {
338 if (it.second > 1)
339 ret[std::to_string(it.second)][jss::failed] = true;
340 else
341 ret[to_string(it.first)][jss::failed] = true;
342 }
343 }
344
345 for (auto const& it : acqs)
346 {
347 // getJson is expensive, so call without the lock
348 std::uint32_t seq = it.second->getSeq();
349 if (seq > 1)
350 ret[std::to_string(seq)] = it.second->getJson(0);
351 else
352 ret[to_string(it.first)] = it.second->getJson(0);
353 }
354
355 return ret;
356 }
357
358 void
359 gotFetchPack() override
360 {
362 {
364
365 acquires.reserve(mLedgers.size());
366 for (auto const& it : mLedgers)
367 {
368 XRPL_ASSERT(
369 it.second,
370 "ripple::InboundLedgersImp::gotFetchPack : non-null "
371 "ledger");
372 acquires.push_back(it.second);
373 }
374 }
375
376 for (auto const& acquire : acquires)
377 {
378 acquire->checkLocal();
379 }
380 }
381
382 void
383 sweep() override
384 {
385 auto const start = m_clock.now();
386
387 // Make a list of things to sweep, while holding the lock
389 std::size_t total;
390
391 {
393 MapType::iterator it(mLedgers.begin());
394 total = mLedgers.size();
395
396 stuffToSweep.reserve(total);
397
398 while (it != mLedgers.end())
399 {
400 auto const la = it->second->getLastAction();
401
402 if (la > start)
403 {
404 it->second->touch();
405 ++it;
406 }
407 else if ((la + std::chrono::minutes(1)) < start)
408 {
409 stuffToSweep.push_back(it->second);
410 // shouldn't cause the actual final delete
411 // since we are holding a reference in the vector.
412 it = mLedgers.erase(it);
413 }
414 else
415 {
416 ++it;
417 }
418 }
419
421 }
422
423 JLOG(j_.debug())
424 << "Swept " << stuffToSweep.size() << " out of " << total
425 << " inbound ledgers. Duration: "
426 << std::chrono::duration_cast<std::chrono::milliseconds>(
427 m_clock.now() - start)
428 .count()
429 << "ms";
430 }
431
432 void
433 stop() override
434 {
435 ScopedLockType lock(mLock);
436 stopping_ = true;
437 mLedgers.clear();
438 mRecentFailures.clear();
439 }
440
442 cacheSize() override
443 {
444 ScopedLockType lock(mLock);
445 return mLedgers.size();
446 }
447
448private:
450
453
454 bool stopping_ = false;
457
459
461
463
466};
467
468//------------------------------------------------------------------------------
469
472 Application& app,
474 beast::insight::Collector::ptr const& collector)
475{
477 app, clock, collector, make_PeerSetBuilder(app));
478}
479
480} // namespace ripple
T begin(T... args)
Represents a JSON value.
Definition json_value.h:149
A generic endpoint for log messages.
Definition Journal.h:60
Stream debug() const
Definition Journal.h:328
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
Stream warn() const
Definition Journal.h:340
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
A metric for measuring an integral value.
Definition Counter.h:39
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
Sampling function using exponential decay to provide a continuous value.
double value(time_point now)
void add(double value, time_point now)
DecayWindow< 30, clock_type > fetchRate_
std::size_t cacheSize() override
std::recursive_mutex mLock
Json::Value getInfo() override
beast::Journal const j_
void logFailure(uint256 const &h, std::uint32_t seq) override
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
beast::aged_map< uint256, std::uint32_t > mRecentFailures
std::set< uint256 > pendingAcquires_
beast::insight::Counter mCounter
std::unique_ptr< PeerSetBuilder > mPeerSetBuilder
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
void acquireAsync(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
InboundLedgersImp(Application &app, clock_type &clock, beast::insight::Collector::ptr const &collector, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
static constexpr std::chrono::minutes const kReacquireInterval
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet) override
We received a TMLedgerData from a peer.
void onLedgerFetched() override
Called when a complete ledger is obtained.
bool isFailure(uint256 const &h) override
Manages the lifetime of inbound ledgers.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:168
void addFetchPack(uint256 const &hash, std::shared_ptr< Blob > data)
virtual bool isNeedNetworkLedger()=0
static intr_ptr::SharedPtr< SHAMapTreeNode > makeFromWire(Slice rawNode)
Blob::iterator begin()
Definition Serializer.h:252
Blob::iterator end()
Definition Serializer.h:257
bool isNonZero() const
Definition base_uint.h:545
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:231
T clear(T... args)
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T is_same_v
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:187
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::unique_ptr< PeerSetBuilder > make_PeerSetBuilder(Application &app)
Definition PeerSet.cpp:144
std::unique_ptr< InboundLedgers > make_InboundLedgers(Application &app, InboundLedgers::clock_type &clock, beast::insight::Collector::ptr const &collector)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:244
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
@ jtLEDGER_DATA
Definition Job.h:66
STL namespace.
T push_back(T... args)
T ref(T... args)
T reserve(T... args)
T size(T... args)
T to_string(T... args)
T what(T... args)