xrpld
Loading...
Searching...
No Matches
InboundLedgers.cpp
1#include <xrpld/app/ledger/InboundLedgers.h>
2
3#include <xrpld/app/ledger/InboundLedger.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/main/Application.h>
6#include <xrpld/overlay/PeerSet.h>
7
8#include <xrpl/basics/Blob.h>
9#include <xrpl/basics/DecayingSample.h>
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/Slice.h>
12#include <xrpl/basics/UnorderedContainers.h>
13#include <xrpl/basics/base_uint.h>
14#include <xrpl/basics/scope.h>
15#include <xrpl/beast/container/aged_map.h>
16#include <xrpl/beast/container/detail/aged_ordered_container.h>
17#include <xrpl/beast/insight/Collector.h>
18#include <xrpl/beast/utility/instrumentation.h>
19#include <xrpl/core/Job.h>
20#include <xrpl/core/JobQueue.h>
21#include <xrpl/core/PerfLog.h>
22#include <xrpl/json/json_value.h>
23#include <xrpl/protocol/RippleLedgerHash.h>
24#include <xrpl/protocol/Serializer.h>
25#include <xrpl/protocol/jss.h>
26#include <xrpl/server/NetworkOPs.h>
27#include <xrpl/shamap/SHAMapTreeNode.h>
28
29#include <xrpl.pb.h>
30
31#include <chrono>
32#include <cstddef>
33#include <cstdint>
34#include <exception>
35#include <functional>
36#include <memory>
37#include <mutex>
38#include <set>
39#include <string>
40#include <utility>
41#include <vector>
42
43namespace xrpl {
44
46{
47private:
50 // measures ledgers per second, constants are important
53
54public:
55 // How long before we try again to acquire the same ledger
57
59 Application& app,
60 clock_type& clock,
61 beast::insight::Collector::ptr const& collector,
63 : app_(app)
64 , fetchRate_(clock.now())
65 , j_(app.getJournal("InboundLedger"))
66 , clock_(clock)
67 , recentFailures_(clock)
68 , counter_(collector->makeCounter("ledger_fetches"))
69 , peerSetBuilder_(std::move(peerSetBuilder))
70 {
71 }
72
75 acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
76 {
77 auto doAcquire = [&, seq, reason]() -> std::shared_ptr<Ledger const> {
78 XRPL_ASSERT(
79 hash.isNonZero(), "xrpl::InboundLedgersImp::acquire::doAcquire : nonzero hash");
80
81 // probably not the right rule
82 if (app_.getOPs().isNeedNetworkLedger() && (reason != InboundLedger::Reason::GENERIC) &&
84 return {};
85
86 bool isNew = true;
88 {
90 if (stopping_)
91 {
92 return {};
93 }
94
95 auto it = ledgers_.find(hash);
96 if (it != ledgers_.end())
97 {
98 isNew = false;
99 inbound = it->second;
100 }
101 else
102 {
104 app_, hash, seq, reason, std::ref(clock_), peerSetBuilder_->build());
105 ledgers_.emplace(hash, inbound);
106 inbound->init(sl);
107 ++counter_;
108 }
109 }
110
111 if (inbound->isFailed())
112 return {};
113
114 if (!isNew)
115 inbound->update(seq);
116
117 if (!inbound->isComplete())
118 return {};
119
120 return inbound->getLedger();
121 };
122 using namespace std::chrono_literals;
124 perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
125
126 return ledger;
127 }
128
129 void
130 acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
131 {
133 try
134 {
135 if (pendingAcquires_.contains(hash))
136 return;
137 pendingAcquires_.insert(hash);
138 ScopeUnlock const unlock(lock);
139 acquire(hash, seq, reason);
140 }
141 catch (std::exception const& e)
142 {
143 JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
144 << e.what();
145 }
146 catch (...)
147 {
148 JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
149 }
150 pendingAcquires_.erase(hash);
151 }
152
154 find(uint256 const& hash) override
155 {
156 XRPL_ASSERT(hash.isNonZero(), "xrpl::InboundLedgersImp::find : nonzero input");
157
159
160 {
161 ScopedLockType const sl(lock_);
162
163 auto it = ledgers_.find(hash);
164 if (it != ledgers_.end())
165 {
166 ret = it->second;
167 }
168 }
169
170 return ret;
171 }
172
173 /*
174 This gets called when
175 "We got some data from an inbound ledger"
176
177 inboundLedgerTrigger:
178 "What do we do with this partial data?"
179 Figures out what to do with the responses to our requests for information.
180
181 */
182 // means "We got some data from an inbound ledger"
183
184 // VFALCO TODO Remove the dependency on the Peer object.
187 bool
189 LedgerHash const& hash,
192 {
193 if (auto ledger = find(hash))
194 {
195 JLOG(j_.trace()) << "Got data (" << packet->nodes().size()
196 << ") for acquiring ledger: " << hash;
197
198 // Stash the data for later processing and see if we need to
199 // dispatch
200 if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
201 {
202 app_.getJobQueue().addJob(
203 JtLedgerData, "ProcessLData", [ledger]() { ledger->runData(); });
204 }
205
206 return true;
207 }
208
209 JLOG(j_.trace()) << "Got data for ledger " << hash << " which we're no longer acquiring";
210
211 // If it's state node data, stash it because it still might be
212 // useful.
213 if (packet->type() == protocol::liAS_NODE)
214 {
215 app_.getJobQueue().addJob(
216 JtLedgerData, "GotStaleData", [this, packet]() { gotStaleData(packet); });
217 }
218
219 return false;
220 }
221
222 void
223 logFailure(uint256 const& h, std::uint32_t seq) override
224 {
225 ScopedLockType const sl(lock_);
226
227 recentFailures_.emplace(h, seq);
228 }
229
230 bool
231 isFailure(uint256 const& h) override
232 {
233 ScopedLockType const sl(lock_);
234
236 return recentFailures_.find(h) != recentFailures_.end();
237 }
238
245 void
247 {
248 Serializer s;
249 try
250 {
251 for (int i = 0; i < packetPtr->nodes().size(); ++i)
252 {
253 auto const& node = packetPtr->nodes(i);
254
255 if (!node.has_nodeid() || !node.has_nodedata())
256 return;
257
258 auto newNode = SHAMapTreeNode::makeFromWire(makeSlice(node.nodedata()));
259
260 if (!newNode)
261 return;
262
263 s.erase();
264 newNode->serializeWithPrefix(s);
265
266 app_.getLedgerMaster().addFetchPack(
267 newNode->getHash().asUInt256(), std::make_shared<Blob>(s.begin(), s.end()));
268 }
269 }
270 catch (std::exception const&) // NOLINT(bugprone-empty-catch)
271 {
272 }
273 }
274
275 void
276 clearFailures() override
277 {
278 ScopedLockType const sl(lock_);
279
280 recentFailures_.clear();
281 ledgers_.clear();
282 }
283
285 fetchRate() override
286 {
288 return 60 * fetchRate_.value(clock_.now());
289 }
290
291 // Should only be called with an inboundledger that has
292 // a reason of history
293 void
295 {
297 fetchRate_.add(1, clock_.now());
298 }
299
301 getInfo() override
302 {
304
306
307 {
308 ScopedLockType const sl(lock_);
309
310 acqs.reserve(ledgers_.size());
311 for (auto const& it : ledgers_)
312 {
313 XRPL_ASSERT(it.second, "xrpl::InboundLedgersImp::getInfo : non-null ledger");
314 acqs.emplace_back(it);
315 }
316 for (auto const& it : recentFailures_)
317 {
318 if (it.second > 1)
319 {
320 ret[std::to_string(it.second)][jss::failed] = true;
321 }
322 else
323 {
324 ret[to_string(it.first)][jss::failed] = true;
325 }
326 }
327 }
328
329 for (auto const& it : acqs)
330 {
331 // getJson is expensive, so call without the lock
332 std::uint32_t const seq = it.second->getSeq();
333 if (seq > 1)
334 {
335 ret[std::to_string(seq)] = it.second->getJson(0);
336 }
337 else
338 {
339 ret[to_string(it.first)] = it.second->getJson(0);
340 }
341 }
342
343 return ret;
344 }
345
346 void
347 gotFetchPack() override
348 {
350 {
351 ScopedLockType const sl(lock_);
352
353 acquires.reserve(ledgers_.size());
354 for (auto const& it : ledgers_)
355 {
356 XRPL_ASSERT(
357 it.second,
358 "xrpl::InboundLedgersImp::gotFetchPack : non-null "
359 "ledger");
360 acquires.push_back(it.second);
361 }
362 }
363
364 for (auto const& acquire : acquires)
365 {
366 acquire->checkLocal();
367 }
368 }
369
370 void
371 sweep() override
372 {
373 auto const start = clock_.now();
374
375 // Make a list of things to sweep, while holding the lock
377 std::size_t total = 0;
378
379 {
380 ScopedLockType const sl(lock_);
381 MapType::iterator it(ledgers_.begin());
382 total = ledgers_.size();
383
384 stuffToSweep.reserve(total);
385
386 while (it != ledgers_.end())
387 {
388 auto const la = it->second->getLastAction();
389
390 if (la > start)
391 {
392 it->second->touch();
393 ++it;
394 }
395 else if ((la + std::chrono::minutes(1)) < start)
396 {
397 stuffToSweep.push_back(it->second);
398 // shouldn't cause the actual final delete
399 // since we are holding a reference in the vector.
400 it = ledgers_.erase(it);
401 }
402 else
403 {
404 ++it;
405 }
406 }
407
409 }
410
411 JLOG(j_.debug())
412 << "Swept " << stuffToSweep.size() << " out of " << total
413 << " inbound ledgers. Duration: "
415 << "ms";
416 }
417
418 void
419 stop() override
420 {
421 ScopedLockType const lock(lock_);
422 stopping_ = true;
423 ledgers_.clear();
424 recentFailures_.clear();
425 }
426
428 cacheSize() override
429 {
430 ScopedLockType const lock(lock_);
431 return ledgers_.size();
432 }
433
434private:
436
439
440 bool stopping_ = false;
443
445
447
449
452};
453
454//------------------------------------------------------------------------------
455
458 Application& app,
460 beast::insight::Collector::ptr const& collector)
461{
462 return std::make_unique<InboundLedgersImp>(app, clock, collector, makePeerSetBuilder(app));
463}
464
465} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:38
std::shared_ptr< Collector > ptr
Definition Collector.h:26
A metric for measuring an integral value.
Definition Counter.h:19
Represents a JSON value.
Definition json_value.h:130
bool isNonZero() const
Definition base_uint.h:549
Sampling function using exponential decay to provide a continuous value.
InboundLedgersImp(Application &app, clock_type &clock, beast::insight::Collector::ptr const &collector, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packetPtr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
std::recursive_mutex lock_
static constexpr std::chrono::minutes kReacquireInterval
void onLedgerFetched() override
Called when a complete ledger is obtained.
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
DecayWindow< 30, clock_type > fetchRate_
hash_map< uint256, std::shared_ptr< InboundLedger > > MapType
std::unique_lock< std::recursive_mutex > ScopedLockType
void acquireAsync(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
std::set< uint256 > pendingAcquires_
beast::aged_map< uint256, std::uint32_t > recentFailures_
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet) override
We received a TMLedgerData from a peer.
beast::Journal const j_
json::Value getInfo() override
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
beast::insight::Counter counter_
std::size_t cacheSize() override
void clearFailures() override
bool isFailure(uint256 const &h) override
void logFailure(uint256 const &h, std::uint32_t seq) override
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
Manages the lifetime of inbound ledgers.
beast::AbstractClock< std::chrono::steady_clock > clock_type
static SHAMapTreeNodePtr makeFromWire(Slice rawNode)
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:202
Blob::iterator begin()
Definition Serializer.h:226
Blob::iterator end()
Definition Serializer.h:231
T duration_cast(T... args)
T emplace_back(T... args)
T make_shared(T... args)
T make_unique(T... args)
std::enable_if_t< IsAgedContainer< AgedContainer >::value, std::size_t > expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
detail::AgedOrderedContainer< false, true, Key, T, Clock, Compare, Allocator > aged_map
Definition aged_map.h:17
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:162
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
@ JtLedgerData
Definition Job.h:47
uint256 LedgerHash
std::unique_ptr< PeerSetBuilder > makePeerSetBuilder(Application &app)
Definition PeerSet.cpp:137
std::unique_ptr< InboundLedgers > makeInboundLedgers(Application &app, InboundLedgers::clock_type &clock, beast::insight::Collector::ptr const &collector)
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
BaseUInt< 256 > uint256
Definition base_uint.h:562
std::enable_if_t< std::is_same_v< T, char >||std::is_same_v< T, unsigned char >, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
T push_back(T... args)
T ref(T... args)
T reserve(T... args)
T size(T... args)
T to_string(T... args)
T what(T... args)