1#include <xrpld/app/ledger/LedgerReplayTask.h>
3#include <xrpld/app/ledger/InboundLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerReplayer.h>
6#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
7#include <xrpld/app/ledger/detail/SkipListAcquire.h>
8#include <xrpld/app/ledger/detail/TimeoutCounter.h>
9#include <xrpld/app/main/Application.h>
11#include <xrpl/basics/Log.h>
12#include <xrpl/basics/base_uint.h>
13#include <xrpl/beast/utility/instrumentation.h>
14#include <xrpl/core/Job.h>
26 uint256 const& finishLedgerHash,
31 finishLedgerHash.
isNonZero() && totalNumLedgers > 0,
32 "xrpl::LedgerReplayTask::TaskParameter::TaskParameter : valid "
51 "xrpl::LedgerReplayTask::TaskParameter::update : nonzero start hash");
67 if (existingTask.
full)
69 auto const& exList = existingTask.
skipList;
91 .jobName =
"LedReplTask",
93 app.getJournal(
"LedgerReplayTask"))
94 , inboundLedgers_(inboundLedgers)
96 , parameter_(parameter)
101 , skipListAcquirer_(skipListAcquirer)
103 JLOG(journal_.trace()) <<
"Create " << hash_;
118 if (
auto sptr = wptr.
lock(); sptr)
126 auto const skipListData = sptr->skipListAcquirer_->getData();
127 sptr->updateSkipList(hash, skipListData->ledgerSeq, skipListData->skipList);
158 <<
"Got start ledger " <<
parameter_.startHash <<
" for task " <<
hash_;
168 JLOG(
journal_.trace()) <<
"Delta " << deltaHash <<
" ready for task " <<
hash_;
178 << (
parameter_.full ?
", full parameter" :
", waiting to fill parameter")
182 bool const shouldTry =
193 parent_->seq() + 1 == delta->ledgerSeq_,
194 "xrpl::LedgerReplayTask::tryAdvance : consecutive sequence");
195 if (
auto l = delta->tryBuild(
parent_); l)
198 <<
"Task " <<
hash_ <<
" got ledger " << l->header().hash
229 JLOG(
journal_.error()) <<
"Parameter update failed " <<
hash_;
248 JLOG(
journal_.debug()) <<
"LedgerReplayTask Failed, too many timeouts " <<
hash_;
266 delta->addDataCallback(
parameter_.reason, [wptr](
bool good,
uint256 const& hash) {
267 if (auto sptr = wptr.lock(); sptr)
275 sptr->deltaReady(hash);
280 ScopedLockType
const sl(mtx_);
283 JLOG(journal_.trace()) <<
"addDelta task " << hash_ <<
" deltaIndex=" << deltaToBuild_
284 <<
" totalDeltas=" << deltas_.size();
286 deltas_.empty() || deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_,
287 "xrpl::LedgerReplayTask::addDelta : no deltas or consecutive "
289 deltas_.push_back(delta);
Manages the lifetime of inbound ledgers.
InboundLedger::Reason reason
std::uint32_t totalLedgers
TaskParameter(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
constructor
std::vector< uint256 > skipList
bool update(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
fill all the fields that was not filled during construction
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
void trigger(ScopedLockType &sl)
Trigger another round.
void onTimer(bool progress, ScopedLockType &sl) override
Hook called from invokeOnTimer().
LedgerReplayer & replayer_
~LedgerReplayTask() override
LedgerReplayTask(Application &app, InboundLedgers &inboundLedgers, LedgerReplayer &replayer, std::shared_ptr< SkipListAcquire > &skipListAcquirer, TaskParameter const ¶meter)
Constructor.
InboundLedgers & inboundLedgers_
void updateSkipList(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
Update this task (by a SkipListAcquire subtask) when skip list is ready.
void tryAdvance(ScopedLockType &sl)
Try to build more ledgers.
void deltaReady(uint256 const &deltaHash)
Notify this task (by a LedgerDeltaAcquire subtask) that a delta is ready.
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::vector< std::shared_ptr< LedgerDeltaAcquire > > deltas_
bool finished() const
return if the task is finished
void addDelta(std::shared_ptr< LedgerDeltaAcquire > const &delta)
add a new LedgerDeltaAcquire subtask
std::shared_ptr< Ledger const > parent_
std::shared_ptr< SkipListAcquire > skipListAcquirer_
void init()
Start the task.
Manages the lifetime of ledger replay tasks.
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
constexpr std::uint32_t kTaskMaxTimeoutsMinimum
constexpr std::uint32_t kTaskMaxTimeoutsMultiplier
constexpr std::uint32_t kMaxQueuedTasks
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
T shared_from_this(T... args)