rippled
Loading...
Searching...
No Matches
InboundLedgers.cpp
1#include <xrpld/app/ledger/InboundLedgers.h>
2#include <xrpld/app/ledger/LedgerMaster.h>
3#include <xrpld/app/main/Application.h>
4
5#include <xrpl/basics/DecayingSample.h>
6#include <xrpl/basics/scope.h>
7#include <xrpl/beast/container/aged_map.h>
8#include <xrpl/core/JobQueue.h>
9#include <xrpl/core/PerfLog.h>
10#include <xrpl/protocol/jss.h>
11#include <xrpl/server/NetworkOPs.h>
12
13#include <exception>
14#include <memory>
15#include <mutex>
16#include <vector>
17
18namespace xrpl {
19
21{
22private:
25 // measures ledgers per second, constants are important
28
29public:
30 // How long before we try again to acquire the same ledger
31 static constexpr std::chrono::minutes const kReacquireInterval{5};
32
34 Application& app,
35 clock_type& clock,
36 beast::insight::Collector::ptr const& collector,
38 : app_(app)
39 , fetchRate_(clock.now())
40 , j_(app.getJournal("InboundLedger"))
41 , m_clock(clock)
42 , mRecentFailures(clock)
43 , mCounter(collector->make_counter("ledger_fetches"))
44 , mPeerSetBuilder(std::move(peerSetBuilder))
45 {
46 }
47
50 acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
51 {
52 auto doAcquire = [&, seq, reason]() -> std::shared_ptr<Ledger const> {
53 XRPL_ASSERT(
54 hash.isNonZero(), "xrpl::InboundLedgersImp::acquire::doAcquire : nonzero hash");
55
56 // probably not the right rule
59 return {};
60
61 bool isNew = true;
63 {
65 if (stopping_)
66 {
67 return {};
68 }
69
70 auto it = mLedgers.find(hash);
71 if (it != mLedgers.end())
72 {
73 isNew = false;
74 inbound = it->second;
75 }
76 else
77 {
79 app_, hash, seq, reason, std::ref(m_clock), mPeerSetBuilder->build());
80 mLedgers.emplace(hash, inbound);
81 inbound->init(sl);
82 ++mCounter;
83 }
84 }
85
86 if (inbound->isFailed())
87 return {};
88
89 if (!isNew)
90 inbound->update(seq);
91
92 if (!inbound->isComplete())
93 return {};
94
95 return inbound->getLedger();
96 };
97 using namespace std::chrono_literals;
99 perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
100
101 return ledger;
102 }
103
104 void
105 acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
106 {
108 try
109 {
110 if (pendingAcquires_.contains(hash))
111 return;
112 pendingAcquires_.insert(hash);
113 scope_unlock const unlock(lock);
114 acquire(hash, seq, reason);
115 }
116 catch (std::exception const& e)
117 {
118 JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
119 << e.what();
120 }
121 catch (...)
122 {
123 JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
124 }
125 pendingAcquires_.erase(hash);
126 }
127
129 find(uint256 const& hash) override
130 {
131 XRPL_ASSERT(hash.isNonZero(), "xrpl::InboundLedgersImp::find : nonzero input");
132
134
135 {
136 ScopedLockType const sl(mLock);
137
138 auto it = mLedgers.find(hash);
139 if (it != mLedgers.end())
140 {
141 ret = it->second;
142 }
143 }
144
145 return ret;
146 }
147
148 /*
149 This gets called when
150 "We got some data from an inbound ledger"
151
152 inboundLedgerTrigger:
153 "What do we do with this partial data?"
154 Figures out what to do with the responses to our requests for information.
155
156 */
157 // means "We got some data from an inbound ledger"
158
159 // VFALCO TODO Remove the dependency on the Peer object.
162 bool
164 LedgerHash const& hash,
167 {
168 if (auto ledger = find(hash))
169 {
170 JLOG(j_.trace()) << "Got data (" << packet->nodes().size()
171 << ") for acquiring ledger: " << hash;
172
173 // Stash the data for later processing and see if we need to
174 // dispatch
175 if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
176 {
178 jtLEDGER_DATA, "ProcessLData", [ledger]() { ledger->runData(); });
179 }
180
181 return true;
182 }
183
184 JLOG(j_.trace()) << "Got data for ledger " << hash << " which we're no longer acquiring";
185
186 // If it's state node data, stash it because it still might be
187 // useful.
188 if (packet->type() == protocol::liAS_NODE)
189 {
191 jtLEDGER_DATA, "GotStaleData", [this, packet]() { gotStaleData(packet); });
192 }
193
194 return false;
195 }
196
197 void
198 logFailure(uint256 const& h, std::uint32_t seq) override
199 {
200 ScopedLockType const sl(mLock);
201
202 mRecentFailures.emplace(h, seq);
203 }
204
205 bool
206 isFailure(uint256 const& h) override
207 {
208 ScopedLockType const sl(mLock);
209
211 return mRecentFailures.find(h) != mRecentFailures.end();
212 }
213
220 void
222 {
223 Serializer s;
224 try
225 {
226 for (int i = 0; i < packet_ptr->nodes().size(); ++i)
227 {
228 auto const& node = packet_ptr->nodes(i);
229
230 if (!node.has_nodeid() || !node.has_nodedata())
231 return;
232
233 auto newNode = SHAMapTreeNode::makeFromWire(makeSlice(node.nodedata()));
234
235 if (!newNode)
236 return;
237
238 s.erase();
239 newNode->serializeWithPrefix(s);
240
242 newNode->getHash().as_uint256(), std::make_shared<Blob>(s.begin(), s.end()));
243 }
244 }
245 catch (std::exception const&) // NOLINT(bugprone-empty-catch)
246 {
247 }
248 }
249
250 void
251 clearFailures() override
252 {
253 ScopedLockType const sl(mLock);
254
255 mRecentFailures.clear();
256 mLedgers.clear();
257 }
258
260 fetchRate() override
261 {
263 return 60 * fetchRate_.value(m_clock.now());
264 }
265
266 // Should only be called with an inboundledger that has
267 // a reason of history
268 void
270 {
273 }
274
276 getInfo() override
277 {
279
281
282 {
283 ScopedLockType const sl(mLock);
284
285 acqs.reserve(mLedgers.size());
286 for (auto const& it : mLedgers)
287 {
288 XRPL_ASSERT(it.second, "xrpl::InboundLedgersImp::getInfo : non-null ledger");
289 acqs.push_back(it);
290 }
291 for (auto const& it : mRecentFailures)
292 {
293 if (it.second > 1)
294 {
295 ret[std::to_string(it.second)][jss::failed] = true;
296 }
297 else
298 {
299 ret[to_string(it.first)][jss::failed] = true;
300 }
301 }
302 }
303
304 for (auto const& it : acqs)
305 {
306 // getJson is expensive, so call without the lock
307 std::uint32_t const seq = it.second->getSeq();
308 if (seq > 1)
309 {
310 ret[std::to_string(seq)] = it.second->getJson(0);
311 }
312 else
313 {
314 ret[to_string(it.first)] = it.second->getJson(0);
315 }
316 }
317
318 return ret;
319 }
320
321 void
322 gotFetchPack() override
323 {
325 {
326 ScopedLockType const sl(mLock);
327
328 acquires.reserve(mLedgers.size());
329 for (auto const& it : mLedgers)
330 {
331 XRPL_ASSERT(
332 it.second,
333 "xrpl::InboundLedgersImp::gotFetchPack : non-null "
334 "ledger");
335 acquires.push_back(it.second);
336 }
337 }
338
339 for (auto const& acquire : acquires)
340 {
341 acquire->checkLocal();
342 }
343 }
344
345 void
346 sweep() override
347 {
348 auto const start = m_clock.now();
349
350 // Make a list of things to sweep, while holding the lock
352 std::size_t total = 0;
353
354 {
355 ScopedLockType const sl(mLock);
356 MapType::iterator it(mLedgers.begin());
357 total = mLedgers.size();
358
359 stuffToSweep.reserve(total);
360
361 while (it != mLedgers.end())
362 {
363 auto const la = it->second->getLastAction();
364
365 if (la > start)
366 {
367 it->second->touch();
368 ++it;
369 }
370 else if ((la + std::chrono::minutes(1)) < start)
371 {
372 stuffToSweep.push_back(it->second);
373 // shouldn't cause the actual final delete
374 // since we are holding a reference in the vector.
375 it = mLedgers.erase(it);
376 }
377 else
378 {
379 ++it;
380 }
381 }
382
384 }
385
386 JLOG(j_.debug())
387 << "Swept " << stuffToSweep.size() << " out of " << total
388 << " inbound ledgers. Duration: "
389 << std::chrono::duration_cast<std::chrono::milliseconds>(m_clock.now() - start).count()
390 << "ms";
391 }
392
393 void
394 stop() override
395 {
396 ScopedLockType const lock(mLock);
397 stopping_ = true;
398 mLedgers.clear();
399 mRecentFailures.clear();
400 }
401
403 cacheSize() override
404 {
405 ScopedLockType const lock(mLock);
406 return mLedgers.size();
407 }
408
409private:
411
414
415 bool stopping_ = false;
418
420
422
424
427};
428
429//------------------------------------------------------------------------------
430
433 Application& app,
435 beast::insight::Collector::ptr const& collector)
436{
437 return std::make_unique<InboundLedgersImp>(app, clock, collector, make_PeerSetBuilder(app));
438}
439
440} // namespace xrpl
T begin(T... args)
Represents a JSON value.
Definition json_value.h:130
A generic endpoint for log messages.
Definition Journal.h:40
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
A metric for measuring an integral value.
Definition Counter.h:19
Sampling function using exponential decay to provide a continuous value.
void add(double value, time_point now)
double value(time_point now)
InboundLedgersImp(Application &app, clock_type &clock, beast::insight::Collector::ptr const &collector, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
beast::aged_map< uint256, std::uint32_t > mRecentFailures
Json::Value getInfo() override
void onLedgerFetched() override
Called when a complete ledger is obtained.
static constexpr std::chrono::minutes const kReacquireInterval
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
std::recursive_mutex mLock
std::unique_ptr< PeerSetBuilder > mPeerSetBuilder
DecayWindow< 30, clock_type > fetchRate_
void acquireAsync(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
std::set< uint256 > pendingAcquires_
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet) override
We received a TMLedgerData from a peer.
beast::Journal const j_
std::size_t cacheSize() override
beast::insight::Counter mCounter
void clearFailures() override
bool isFailure(uint256 const &h) override
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
void logFailure(uint256 const &h, std::uint32_t seq) override
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
Manages the lifetime of inbound ledgers.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
void addFetchPack(uint256 const &hash, std::shared_ptr< Blob > data)
virtual bool isNeedNetworkLedger()=0
static intr_ptr::SharedPtr< SHAMapTreeNode > makeFromWire(Slice rawNode)
Blob::iterator begin()
Definition Serializer.h:226
Blob::iterator end()
Definition Serializer.h:231
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
bool isNonZero() const
Definition base_uint.h:518
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:202
T clear(T... args)
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T is_same_v
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
STL namespace.
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:162
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
std::unique_ptr< InboundLedgers > make_InboundLedgers(Application &app, InboundLedgers::clock_type &clock, beast::insight::Collector::ptr const &collector)
@ jtLEDGER_DATA
Definition Job.h:45
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
std::unique_ptr< PeerSetBuilder > make_PeerSetBuilder(Application &app)
Definition PeerSet.cpp:122
T push_back(T... args)
T ref(T... args)
T reserve(T... args)
T size(T... args)
T to_string(T... args)
T what(T... args)