1#include <xrpld/app/ledger/LedgerReplayer.h> 
    2#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h> 
    3#include <xrpld/app/ledger/detail/SkipListAcquire.h> 
   12    , inboundLedgers_(inboundLedgers)
 
   13    , peerSetBuilder_(
std::move(peerSetBuilder))
 
   14    , j_(app.journal(
"LedgerReplayer"))
 
 
   27    uint256 const& finishLedgerHash,
 
   31        finishLedgerHash.
isNonZero() && totalNumLedgers > 0 &&
 
   33        "ripple::LedgerReplayer::replay : valid inputs");
 
   36        r, finishLedgerHash, totalNumLedgers);
 
   40    bool newSkipList = 
false;
 
   47            JLOG(
j_.
info()) << 
"Too many replay tasks, dropping new task " 
   52        for (
auto const& t : 
tasks_)
 
   58                                << 
" ledgers merged into an existing task.";
 
   62        JLOG(
j_.
info()) << 
"Replay " << totalNumLedgers
 
   63                        << 
" ledgers. Finish ledger hash " 
   68            skipList = i->second.lock();
 
 
  102    auto const& parameter = task->getTaskParameter();
 
  103    JLOG(
j_.
trace()) << 
"Creating " << parameter.totalLedgers_ - 1 << 
" deltas";
 
  104    if (parameter.totalLedgers_ > 1)
 
  107            parameter.skipList_.begin(),
 
  108            parameter.skipList_.end(),
 
  109            parameter.startHash_);
 
  110        if (skipListItem == parameter.skipList_.end() ||
 
  111            ++skipListItem == parameter.skipList_.end())
 
  113            JLOG(
j_.
error()) << 
"Task parameter error when creating deltas " 
  114                             << parameter.finishHash_;
 
  119             seq <= parameter.finishSeq_ &&
 
  120             skipListItem != parameter.skipList_.end();
 
  121             ++seq, ++skipListItem)
 
  124            bool newDelta = 
false;
 
  129                auto i = 
deltas_.find(*skipListItem);
 
  131                    delta = i->second.lock();
 
  141                    deltas_[*skipListItem] = delta;
 
  146            task->addDelta(delta);
 
 
  156    boost::intrusive_ptr<SHAMapItem const> 
const& item)
 
  164        skipList = i->second.lock();
 
  173        skipList->processData(info.
seq, item);
 
 
  187        delta = i->second.lock();
 
  196        delta->processData(info, std::move(txns));
 
 
  205        JLOG(
j_.
debug()) << 
"Sweeping, LedgerReplayer has " << 
tasks_.size()
 
  207                         << 
" skipLists, and " << 
deltas_.size() << 
" deltas.";
 
  213                [
this](
auto const& t) -> 
bool {
 
  216                        JLOG(j_.debug()) << 
"Sweep task " 
  217                                         << t->getTaskParameter().finishHash_;
 
  224        auto removeCannotLocked = [](
auto& subTasks) {
 
  225            for (
auto it = subTasks.begin(); it != subTasks.end();)
 
  227                if (
auto item = it->second.lock(); !item)
 
  229                    it = subTasks.erase(it);
 
  238    JLOG(j_.
debug()) << 
" LedgerReplayer sweep lock duration " 
  239                     << std::chrono::duration_cast<std::chrono::milliseconds>(
 
 
  246LedgerReplayer::stop()
 
  248    JLOG(j_.
info()) << 
"Stopping...";
 
  252            tasks_.begin(), tasks_.end(), [](
auto& i) { i->cancel(); });
 
  254        auto lockAndCancel = [](
auto& i) {
 
  255            if (
auto sptr = i.second.lock(); sptr)
 
  260        std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
 
  262        std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
 
  266    JLOG(j_.
info()) << 
"Stopped";
 
 
Stream trace() const
Severity stream access functions.
 
virtual bool isStopping() const =0
 
Manages the lifetime of inbound ledgers.
 
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
 
void gotReplayDelta(LedgerInfo const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
 
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
 
void sweep()
Remove completed tasks.
 
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
 
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
 
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
 
void gotSkipList(LedgerInfo const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
 
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
 
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
 
InboundLedgers & inboundLedgers_
 
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
 
std::uint32_t constexpr MAX_TASK_SIZE
 
std::uint32_t constexpr MAX_TASKS
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.