1#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundLedger.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/main/Application.h>
6#include <xrpld/overlay/PeerSet.h>
8#include <xrpl/basics/Blob.h>
9#include <xrpl/basics/DecayingSample.h>
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/Slice.h>
12#include <xrpl/basics/UnorderedContainers.h>
13#include <xrpl/basics/base_uint.h>
14#include <xrpl/basics/scope.h>
15#include <xrpl/beast/container/aged_map.h>
16#include <xrpl/beast/container/detail/aged_ordered_container.h>
17#include <xrpl/beast/insight/Collector.h>
18#include <xrpl/beast/utility/instrumentation.h>
19#include <xrpl/core/Job.h>
20#include <xrpl/core/JobQueue.h>
21#include <xrpl/core/PerfLog.h>
22#include <xrpl/json/json_value.h>
23#include <xrpl/protocol/RippleLedgerHash.h>
24#include <xrpl/protocol/Serializer.h>
25#include <xrpl/protocol/jss.h>
26#include <xrpl/server/NetworkOPs.h>
27#include <xrpl/shamap/SHAMapTreeNode.h>
65 ,
j_(app.getJournal(
"InboundLedger"))
68 ,
counter_(collector->makeCounter(
"ledger_fetches"))
79 hash.
isNonZero(),
"xrpl::InboundLedgersImp::acquire::doAcquire : nonzero hash");
111 if (inbound->isFailed())
115 inbound->update(seq);
117 if (!inbound->isComplete())
120 return inbound->getLedger();
122 using namespace std::chrono_literals;
143 JLOG(
j_.warn()) <<
"Exception thrown for acquiring new inbound ledger " << hash <<
": "
148 JLOG(
j_.warn()) <<
"Unknown exception thrown for acquiring new inbound ledger " << hash;
156 XRPL_ASSERT(hash.
isNonZero(),
"xrpl::InboundLedgersImp::find : nonzero input");
193 if (
auto ledger =
find(hash))
195 JLOG(
j_.trace()) <<
"Got data (" << packet->nodes().size()
196 <<
") for acquiring ledger: " << hash;
202 app_.getJobQueue().addJob(
203 JtLedgerData,
"ProcessLData", [ledger]() { ledger->runData(); });
209 JLOG(
j_.trace()) <<
"Got data for ledger " << hash <<
" which we're no longer acquiring";
213 if (packet->type() == protocol::liAS_NODE)
215 app_.getJobQueue().addJob(
251 for (
int i = 0; i < packetPtr->nodes().size(); ++i)
253 auto const& node = packetPtr->nodes(i);
255 if (!node.has_nodeid() || !node.has_nodedata())
264 newNode->serializeWithPrefix(s);
266 app_.getLedgerMaster().addFetchPack(
313 XRPL_ASSERT(it.second,
"xrpl::InboundLedgersImp::getInfo : non-null ledger");
324 ret[
to_string(it.first)][jss::failed] =
true;
329 for (
auto const& it : acqs)
339 ret[
to_string(it.first)] = it.second->getJson(0);
358 "xrpl::InboundLedgersImp::gotFetchPack : non-null "
364 for (
auto const&
acquire : acquires)
373 auto const start =
clock_.now();
381 MapType::iterator it(
ledgers_.begin());
388 auto const la = it->second->getLastAction();
412 <<
"Swept " << stuffToSweep.
size() <<
" out of " << total
413 <<
" inbound ledgers. Duration: "
A generic endpoint for log messages.
std::shared_ptr< Collector > ptr
A metric for measuring an integral value.
Sampling function using exponential decay to provide a continuous value.
InboundLedgersImp(Application &app, clock_type &clock, beast::insight::Collector::ptr const &collector, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packetPtr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
std::recursive_mutex lock_
static constexpr std::chrono::minutes kReacquireInterval
void onLedgerFetched() override
Called when a complete ledger is obtained.
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
DecayWindow< 30, clock_type > fetchRate_
hash_map< uint256, std::shared_ptr< InboundLedger > > MapType
std::unique_lock< std::recursive_mutex > ScopedLockType
void acquireAsync(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
void gotFetchPack() override
std::set< uint256 > pendingAcquires_
beast::aged_map< uint256, std::uint32_t > recentFailures_
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet) override
We received a TMLedgerData from a peer.
json::Value getInfo() override
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
beast::insight::Counter counter_
std::size_t cacheSize() override
std::mutex fetchRateMutex_
void clearFailures() override
bool isFailure(uint256 const &h) override
void logFailure(uint256 const &h, std::uint32_t seq) override
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
std::mutex acquiresMutex_
Manages the lifetime of inbound ledgers.
beast::AbstractClock< std::chrono::steady_clock > clock_type
static SHAMapTreeNodePtr makeFromWire(Slice rawNode)
Automatically unlocks and re-locks a unique_lock object.
T duration_cast(T... args)
T emplace_back(T... args)
std::enable_if_t< IsAgedContainer< AgedContainer >::value, std::size_t > expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
detail::AgedOrderedContainer< false, true, Key, T, Clock, Compare, Allocator > aged_map
@ Object
object value (collection of name/value pairs).
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string to_string(BaseUInt< Bits, Tag > const &a)
std::unique_ptr< PeerSetBuilder > makePeerSetBuilder(Application &app)
std::unique_ptr< InboundLedgers > makeInboundLedgers(Application &app, InboundLedgers::clock_type &clock, beast::insight::Collector::ptr const &collector)
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
std::enable_if_t< std::is_same_v< T, char >||std::is_same_v< T, unsigned char >, Slice > makeSlice(std::array< T, N > const &a)