1#include <xrpld/app/ledger/InboundLedgers.h> 
    2#include <xrpld/app/ledger/LedgerReplayTask.h> 
    3#include <xrpld/app/ledger/LedgerReplayer.h> 
    4#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h> 
    5#include <xrpld/app/ledger/detail/SkipListAcquire.h> 
   11    uint256 const& finishLedgerHash,
 
   13    : reason_(r), finishHash_(finishLedgerHash), totalLedgers_(totalNumLedgers)
 
   16        finishLedgerHash.
isNonZero() && totalNumLedgers > 0,
 
   17        "ripple::LedgerReplayTask::TaskParameter::TaskParameter : valid " 
 
   27    if (finishHash_ != hash || sList.
size() + 1 < totalLedgers_ || full_)
 
   33    startHash_ = skipList_[skipList_.size() - totalLedgers_];
 
   35        startHash_.isNonZero(),
 
   36        "ripple::LedgerReplayTask::TaskParameter::update : nonzero start hash");
 
   37    startSeq_ = finishSeq_ - totalLedgers_ + 1;
 
 
   46    if (reason_ == existingTask.
reason_)
 
   54        if (existingTask.
full_)
 
   56            auto const& exList = existingTask.
skipList_;
 
   57            if (
auto i = 
std::find(exList.begin(), exList.end(), finishHash_);
 
   61                    totalLedgers_ + (exList.end() - i) - 1;
 
 
   77          parameter.finishHash_,
 
   78          LedgerReplayParameters::TASK_TIMEOUT,
 
   82          app.journal(
"LedgerReplayTask"))
 
   83    , inboundLedgers_(inboundLedgers)
 
   85    , parameter_(parameter)
 
   88          parameter.totalLedgers_ *
 
   90    , skipListAcquirer_(skipListAcquirer)
 
   92    JLOG(journal_.trace()) << 
"Create " << hash_;
 
 
  107        if (
auto sptr = wptr.
lock(); sptr)
 
  115                auto const skipListData = sptr->skipListAcquirer_->getData();
 
  116                sptr->updateSkipList(
 
  117                    hash, skipListData->ledgerSeq, skipListData->skipList);
 
 
  161    JLOG(
journal_.
trace()) << 
"Delta " << deltaHash << 
" ready for task " 
 
  173                                                : 
", waiting to fill parameter")
 
  175                           << 
", totalDeltas=" << 
deltas_.
size() << 
", parent " 
  189                parent_->seq() + 1 == delta->ledgerSeq_,
 
  190                "ripple::LedgerReplayTask::tryAdvance : consecutive sequence");
 
  191            if (
auto l = delta->tryBuild(
parent_); l)
 
  194                    << 
"Task " << 
hash_ << 
" got ledger " << l->info().hash
 
 
  244            << 
"LedgerReplayTask Failed, too many timeouts " << 
hash_;
 
 
  262    delta->addDataCallback(
 
  264            if (auto sptr = wptr.lock(); sptr)
 
  269                    sptr->deltaReady(hash);
 
  273    ScopedLockType sl(mtx_);
 
  276        JLOG(journal_.trace())
 
  277            << 
"addDelta task " << hash_ << 
" deltaIndex=" << deltaToBuild_
 
  278            << 
" totalDeltas=" << deltas_.size();
 
  281                deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_,
 
  282            "ripple::LedgerReplayTask::addDelta : no deltas or consecutive " 
  284        deltas_.push_back(delta);
 
 
  289LedgerReplayTask::finished()
 const 
 
Stream trace() const
Severity stream access functions.
 
virtual LedgerMaster & getLedgerMaster()=0
 
Manages the lifetime of inbound ledgers.
 
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
 
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
 
TaskParameter(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
constructor
 
bool update(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
fill all the fields that was not filled during construction
 
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
 
std::vector< uint256 > skipList_
 
InboundLedger::Reason reason_
 
std::uint32_t totalLedgers_
 
void trigger(ScopedLockType &sl)
Trigger another round.
 
std::vector< std::shared_ptr< LedgerDeltaAcquire > > deltas_
 
void addDelta(std::shared_ptr< LedgerDeltaAcquire > const &delta)
add a new LedgerDeltaAcquire subtask
 
void deltaReady(uint256 const &deltaHash)
Notify this task (by a LedgerDeltaAcquire subtask) that a delta is ready.
 
LedgerReplayer & replayer_
 
void init()
Start the task.
 
InboundLedgers & inboundLedgers_
 
std::shared_ptr< SkipListAcquire > skipListAcquirer_
 
void updateSkipList(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
Update this task (by a SkipListAcquire subtask) when skip list is ready.
 
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
 
LedgerReplayTask(Application &app, InboundLedgers &inboundLedgers, LedgerReplayer &replayer, std::shared_ptr< SkipListAcquire > &skipListAcquirer, TaskParameter &¶meter)
Constructor.
 
void tryAdvance(ScopedLockType &sl)
Try to build more ledgers.
 
std::shared_ptr< Ledger const  > parent_
 
void onTimer(bool progress, ScopedLockType &sl) override
Hook called from invokeOnTimer().
 
Manages the lifetime of ledger replay tasks.
 
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
 
This class is an "active" object.
 
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
 
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
 
std::recursive_mutex mtx_
 
static constexpr std::size_t size()
 
T emplace_back(T... args)
 
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MINIMUM
 
std::uint32_t constexpr MAX_QUEUED_TASKS
 
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MULTIPLIER
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
T shared_from_this(T... args)