rippled
Loading...
Searching...
No Matches
InboundLedgers.cpp
1#include <xrpld/app/ledger/InboundLedgers.h>
2#include <xrpld/app/ledger/LedgerMaster.h>
3#include <xrpld/app/main/Application.h>
4#include <xrpld/app/misc/NetworkOPs.h>
5
6#include <xrpl/basics/DecayingSample.h>
7#include <xrpl/basics/Log.h>
8#include <xrpl/basics/scope.h>
9#include <xrpl/beast/container/aged_map.h>
10#include <xrpl/core/JobQueue.h>
11#include <xrpl/core/PerfLog.h>
12#include <xrpl/protocol/jss.h>
13
14#include <exception>
15#include <memory>
16#include <mutex>
17#include <vector>
18
19namespace xrpl {
20
22{
23private:
26 // measures ledgers per second, constants are important
29
30public:
31 // How long before we try again to acquire the same ledger
32 static constexpr std::chrono::minutes const kReacquireInterval{5};
33
35 Application& app,
36 clock_type& clock,
37 beast::insight::Collector::ptr const& collector,
39 : app_(app)
40 , fetchRate_(clock.now())
41 , j_(app.journal("InboundLedger"))
42 , m_clock(clock)
43 , mRecentFailures(clock)
44 , mCounter(collector->make_counter("ledger_fetches"))
45 , mPeerSetBuilder(std::move(peerSetBuilder))
46 {
47 }
48
52 uint256 const& hash,
53 std::uint32_t seq,
54 InboundLedger::Reason reason) override
55 {
56 auto doAcquire = [&, seq, reason]() -> std::shared_ptr<Ledger const> {
57 XRPL_ASSERT(
58 hash.isNonZero(),
59 "xrpl::InboundLedgersImp::acquire::doAcquire : nonzero hash");
60
61 // probably not the right rule
65 return {};
66
67 bool isNew = true;
69 {
71 if (stopping_)
72 {
73 return {};
74 }
75
76 auto it = mLedgers.find(hash);
77 if (it != mLedgers.end())
78 {
79 isNew = false;
80 inbound = it->second;
81 }
82 else
83 {
85 app_,
86 hash,
87 seq,
88 reason,
90 mPeerSetBuilder->build());
91 mLedgers.emplace(hash, inbound);
92 inbound->init(sl);
93 ++mCounter;
94 }
95 }
96
97 if (inbound->isFailed())
98 return {};
99
100 if (!isNew)
101 inbound->update(seq);
102
103 if (!inbound->isComplete())
104 return {};
105
106 return inbound->getLedger();
107 };
108 using namespace std::chrono_literals;
110 doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
111
112 return ledger;
113 }
114
115 void
117 uint256 const& hash,
118 std::uint32_t seq,
119 InboundLedger::Reason reason) override
120 {
122 try
123 {
124 if (pendingAcquires_.contains(hash))
125 return;
126 pendingAcquires_.insert(hash);
127 scope_unlock unlock(lock);
128 acquire(hash, seq, reason);
129 }
130 catch (std::exception const& e)
131 {
132 JLOG(j_.warn())
133 << "Exception thrown for acquiring new inbound ledger " << hash
134 << ": " << e.what();
135 }
136 catch (...)
137 {
138 JLOG(j_.warn())
139 << "Unknown exception thrown for acquiring new inbound ledger "
140 << hash;
141 }
142 pendingAcquires_.erase(hash);
143 }
144
146 find(uint256 const& hash) override
147 {
148 XRPL_ASSERT(
149 hash.isNonZero(), "xrpl::InboundLedgersImp::find : nonzero input");
150
152
153 {
155
156 auto it = mLedgers.find(hash);
157 if (it != mLedgers.end())
158 {
159 ret = it->second;
160 }
161 }
162
163 return ret;
164 }
165
166 /*
167 This gets called when
168 "We got some data from an inbound ledger"
169
170 inboundLedgerTrigger:
171 "What do we do with this partial data?"
172 Figures out what to do with the responses to our requests for information.
173
174 */
175 // means "We got some data from an inbound ledger"
176
177 // VFALCO TODO Remove the dependency on the Peer object.
180 bool
182 LedgerHash const& hash,
185 {
186 if (auto ledger = find(hash))
187 {
188 JLOG(j_.trace()) << "Got data (" << packet->nodes().size()
189 << ") for acquiring ledger: " << hash;
190
191 // Stash the data for later processing and see if we need to
192 // dispatch
193 if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
195 jtLEDGER_DATA, "processLedgerData", [ledger]() {
196 ledger->runData();
197 });
198
199 return true;
200 }
201
202 JLOG(j_.trace()) << "Got data for ledger " << hash
203 << " which we're no longer acquiring";
204
205 // If it's state node data, stash it because it still might be
206 // useful.
207 if (packet->type() == protocol::liAS_NODE)
208 {
210 jtLEDGER_DATA, "gotStaleData", [this, packet]() {
211 gotStaleData(packet);
212 });
213 }
214
215 return false;
216 }
217
218 void
219 logFailure(uint256 const& h, std::uint32_t seq) override
220 {
222
223 mRecentFailures.emplace(h, seq);
224 }
225
226 bool
227 isFailure(uint256 const& h) override
228 {
230
232 return mRecentFailures.find(h) != mRecentFailures.end();
233 }
234
241 void
243 {
244 Serializer s;
245 try
246 {
247 for (int i = 0; i < packet_ptr->nodes().size(); ++i)
248 {
249 auto const& node = packet_ptr->nodes(i);
250
251 if (!node.has_nodeid() || !node.has_nodedata())
252 return;
253
254 auto newNode =
255 SHAMapTreeNode::makeFromWire(makeSlice(node.nodedata()));
256
257 if (!newNode)
258 return;
259
260 s.erase();
261 newNode->serializeWithPrefix(s);
262
264 newNode->getHash().as_uint256(),
266 }
267 }
268 catch (std::exception const&)
269 {
270 }
271 }
272
273 void
274 clearFailures() override
275 {
277
278 mRecentFailures.clear();
279 mLedgers.clear();
280 }
281
283 fetchRate() override
284 {
286 return 60 * fetchRate_.value(m_clock.now());
287 }
288
289 // Should only be called with an inboundledger that has
290 // a reason of history
291 void
293 {
296 }
297
299 getInfo() override
300 {
302
304
305 {
307
308 acqs.reserve(mLedgers.size());
309 for (auto const& it : mLedgers)
310 {
311 XRPL_ASSERT(
312 it.second,
313 "xrpl::InboundLedgersImp::getInfo : non-null ledger");
314 acqs.push_back(it);
315 }
316 for (auto const& it : mRecentFailures)
317 {
318 if (it.second > 1)
319 ret[std::to_string(it.second)][jss::failed] = true;
320 else
321 ret[to_string(it.first)][jss::failed] = true;
322 }
323 }
324
325 for (auto const& it : acqs)
326 {
327 // getJson is expensive, so call without the lock
328 std::uint32_t seq = it.second->getSeq();
329 if (seq > 1)
330 ret[std::to_string(seq)] = it.second->getJson(0);
331 else
332 ret[to_string(it.first)] = it.second->getJson(0);
333 }
334
335 return ret;
336 }
337
338 void
339 gotFetchPack() override
340 {
342 {
344
345 acquires.reserve(mLedgers.size());
346 for (auto const& it : mLedgers)
347 {
348 XRPL_ASSERT(
349 it.second,
350 "xrpl::InboundLedgersImp::gotFetchPack : non-null "
351 "ledger");
352 acquires.push_back(it.second);
353 }
354 }
355
356 for (auto const& acquire : acquires)
357 {
358 acquire->checkLocal();
359 }
360 }
361
362 void
363 sweep() override
364 {
365 auto const start = m_clock.now();
366
367 // Make a list of things to sweep, while holding the lock
369 std::size_t total;
370
371 {
373 MapType::iterator it(mLedgers.begin());
374 total = mLedgers.size();
375
376 stuffToSweep.reserve(total);
377
378 while (it != mLedgers.end())
379 {
380 auto const la = it->second->getLastAction();
381
382 if (la > start)
383 {
384 it->second->touch();
385 ++it;
386 }
387 else if ((la + std::chrono::minutes(1)) < start)
388 {
389 stuffToSweep.push_back(it->second);
390 // shouldn't cause the actual final delete
391 // since we are holding a reference in the vector.
392 it = mLedgers.erase(it);
393 }
394 else
395 {
396 ++it;
397 }
398 }
399
401 }
402
403 JLOG(j_.debug())
404 << "Swept " << stuffToSweep.size() << " out of " << total
405 << " inbound ledgers. Duration: "
406 << std::chrono::duration_cast<std::chrono::milliseconds>(
407 m_clock.now() - start)
408 .count()
409 << "ms";
410 }
411
412 void
413 stop() override
414 {
415 ScopedLockType lock(mLock);
416 stopping_ = true;
417 mLedgers.clear();
418 mRecentFailures.clear();
419 }
420
422 cacheSize() override
423 {
424 ScopedLockType lock(mLock);
425 return mLedgers.size();
426 }
427
428private:
430
433
434 bool stopping_ = false;
437
439
441
443
446};
447
448//------------------------------------------------------------------------------
449
452 Application& app,
454 beast::insight::Collector::ptr const& collector)
455{
457 app, clock, collector, make_PeerSetBuilder(app));
458}
459
460} // namespace xrpl
T begin(T... args)
Represents a JSON value.
Definition json_value.h:131
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
A metric for measuring an integral value.
Definition Counter.h:20
virtual LedgerMaster & getLedgerMaster()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
Sampling function using exponential decay to provide a continuous value.
void add(double value, time_point now)
double value(time_point now)
InboundLedgersImp(Application &app, clock_type &clock, beast::insight::Collector::ptr const &collector, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
beast::aged_map< uint256, std::uint32_t > mRecentFailures
Json::Value getInfo() override
void onLedgerFetched() override
Called when a complete ledger is obtained.
static constexpr std::chrono::minutes const kReacquireInterval
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
std::recursive_mutex mLock
std::unique_ptr< PeerSetBuilder > mPeerSetBuilder
DecayWindow< 30, clock_type > fetchRate_
void acquireAsync(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
std::set< uint256 > pendingAcquires_
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet) override
We received a TMLedgerData from a peer.
beast::Journal const j_
std::size_t cacheSize() override
beast::insight::Counter mCounter
void clearFailures() override
bool isFailure(uint256 const &h) override
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
void logFailure(uint256 const &h, std::uint32_t seq) override
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
Manages the lifetime of inbound ledgers.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:148
void addFetchPack(uint256 const &hash, std::shared_ptr< Blob > data)
virtual bool isNeedNetworkLedger()=0
static intr_ptr::SharedPtr< SHAMapTreeNode > makeFromWire(Slice rawNode)
Blob::iterator begin()
Definition Serializer.h:233
Blob::iterator end()
Definition Serializer.h:238
bool isNonZero() const
Definition base_uint.h:526
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:212
T clear(T... args)
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T is_same_v
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
STL namespace.
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:167
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
std::unique_ptr< InboundLedgers > make_InboundLedgers(Application &app, InboundLedgers::clock_type &clock, beast::insight::Collector::ptr const &collector)
@ jtLEDGER_DATA
Definition Job.h:46
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:225
std::unique_ptr< PeerSetBuilder > make_PeerSetBuilder(Application &app)
Definition PeerSet.cpp:126
T push_back(T... args)
T ref(T... args)
T reserve(T... args)
T size(T... args)
T to_string(T... args)
T what(T... args)