1#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
3#include <xrpld/app/ledger/BuildLedger.h>
4#include <xrpld/app/ledger/InboundLedger.h>
5#include <xrpld/app/ledger/LedgerReplay.h>
6#include <xrpld/app/ledger/LedgerReplayer.h>
7#include <xrpld/app/ledger/detail/TimeoutCounter.h>
8#include <xrpld/app/main/Application.h>
9#include <xrpld/overlay/Peer.h>
10#include <xrpld/overlay/PeerSet.h>
12#include <xrpl/basics/Log.h>
13#include <xrpl/basics/base_uint.h>
14#include <xrpl/basics/contract.h>
15#include <xrpl/beast/utility/instrumentation.h>
16#include <xrpl/core/Job.h>
17#include <xrpl/core/JobQueue.h>
18#include <xrpl/ledger/ApplyView.h>
19#include <xrpl/protocol/LedgerHeader.h>
20#include <xrpl/protocol/Rules.h>
46 .jobName =
"LedReplDelta",
48 app.getJournal(
"LedgerReplayDelta"))
49 , inboundLedgers_(inboundLedgers)
50 , ledgerSeq_(ledgerSeq)
51 , peerSet_(std::move(peerSet))
53 JLOG(journal_.trace()) <<
"Create " << hash_ <<
" Seq " << ledgerSeq;
95 JLOG(
journal_.trace()) <<
"Add a peer " << peer->id() <<
" for " <<
hash_;
96 protocol::TMReplayDeltaRequest request;
97 request.set_ledgerhash(
hash_.data(),
hash_.size());
98 peerSet_->sendRequest(request, peer);
151 Rules const rules{
app_.config().features};
164 JLOG(
journal_.error()) <<
"failed to create a (info only) ledger from verified data " <<
hash_;
182 JLOG(
journal_.debug()) <<
"task added to a finished LedgerDeltaAcquire " <<
hash_;
200 "xrpl::LedgerDeltaAcquire::tryBuild : parent sequence match");
202 parent->header().hash ==
replayTemp_->header().parentHash,
203 "xrpl::LedgerDeltaAcquire::tryBuild : parent hash match");
216 JLOG(
journal_.error()) <<
"tryBuild failed " <<
hash_ <<
" with parent "
217 << parent->header().hash;
224 JLOG(
journal_.debug()) <<
"onLedgerBuilt " <<
hash_ << (reason ?
" for a new reason" :
"");
227 bool firstTime =
true;
231 reasons.push_back(*reason);
234 app_.getJobQueue().addJob(
236 for (
auto reason : reasons)
241 app.getLedgerMaster().storeLedger(ledger);
250 app.getLedgerMaster().tryAdvance();
257 XRPL_ASSERT(
isDone(),
"xrpl::LedgerDeltaAcquire::notify : is done");
263 for (
auto& cb : toCall)
Manages the lifetime of inbound ledgers.
std::uint32_t const ledgerSeq_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
void init(int numPeers)
Start the LedgerDeltaAcquire task.
std::shared_ptr< Ledger const > fullLedger_
std::uint32_t noFeaturePeerCount_
void onLedgerBuilt(ScopedLockType &sl, std::optional< InboundLedger::Reason > reason={})
Process a newly built ledger, such as store it.
LedgerDeltaAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::uint32_t ledgerSeq, std::unique_ptr< PeerSet > peerSet)
Constructor.
InboundLedgers & inboundLedgers_
std::vector< OnDeltaDataCB > dataReadyCallbacks_
~LedgerDeltaAcquire() override
void processData(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&orderedTxns)
Process the data extracted from a peer's reply.
std::unique_ptr< PeerSet > peerSet_
std::set< InboundLedger::Reason > reasons_
void notify(ScopedLockType &sl)
Call the OnDeltaDataCB callbacks.
std::shared_ptr< Ledger const > replayTemp_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::shared_ptr< Ledger const > tryBuild(std::shared_ptr< Ledger const > const &parent)
Try to build the ledger if not already.
std::map< std::uint32_t, std::shared_ptr< STTx const > > orderedTxns_
void addDataCallback(InboundLedger::Reason reason, OnDeltaDataCB &&cb)
Add a reason and a callback to the LedgerDeltaAcquire subtask.
std::function< void(bool successful, uint256 const &hash)> OnDeltaDataCB
A callback used to notify that the delta's data is ready or failed.
Rules controlling protocol behavior.
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
constexpr auto kSubTaskFallbackTimeout
constexpr auto kMaxNoFeaturePeerCount
constexpr std::uint32_t kSubTaskMaxTimeouts
constexpr std::uint32_t kMaxQueuedTasks
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, bool const closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
T shared_from_this(T... args)