1#include <xrpld/app/ledger/LedgerReplayer.h>
3#include <xrpld/app/ledger/InboundLedger.h>
4#include <xrpld/app/ledger/LedgerReplayTask.h>
5#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
6#include <xrpld/app/ledger/detail/SkipListAcquire.h>
7#include <xrpld/app/main/Application.h>
8#include <xrpld/overlay/PeerSet.h>
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/base_uint.h>
12#include <xrpl/beast/utility/instrumentation.h>
13#include <xrpl/protocol/LedgerHeader.h>
14#include <xrpl/protocol/STTx.h>
15#include <xrpl/shamap/SHAMapItem.h>
17#include <boost/smart_ptr/intrusive_ptr.hpp>
36 ,
j_(app.getJournal(
"LedgerReplayer"))
49 uint256 const& finishLedgerHash,
53 finishLedgerHash.
isNonZero() && totalNumLedgers > 0 &&
55 "xrpl::LedgerReplayer::replay : valid inputs");
62 bool newSkipList =
false;
65 if (
app_.isStopping())
69 JLOG(
j_.info()) <<
"Too many replay tasks, dropping new task " << parameter.
finishHash;
73 for (
auto const& t :
tasks_)
77 JLOG(
j_.info()) <<
"Task " << parameter.
finishHash <<
" with " << totalNumLedgers
78 <<
" ledgers merged into an existing task.";
82 JLOG(
j_.info()) <<
"Replay " << totalNumLedgers <<
" ledgers. Finish ledger hash "
87 skipList = i->second.lock();
118 auto const& parameter = task->getTaskParameter();
119 JLOG(
j_.trace()) <<
"Creating " << parameter.totalLedgers - 1 <<
" deltas";
120 if (parameter.totalLedgers > 1)
123 auto const wasLast = skipListItem == parameter.skipList.end();
126 auto const isLast = skipListItem == parameter.skipList.end();
128 if (wasLast || isLast)
130 JLOG(
j_.error()) <<
"Task parameter error when creating deltas "
131 << parameter.finishHash;
136 seq <= parameter.finishSeq && skipListItem != parameter.skipList.end();
137 ++seq, ++skipListItem)
140 bool newDelta =
false;
143 if (
app_.isStopping())
145 auto i =
deltas_.find(*skipListItem);
147 delta = i->second.lock();
153 deltas_[*skipListItem] = delta;
158 task->addDelta(delta);
168 boost::intrusive_ptr<SHAMapItem const>
const& item)
176 skipList = i->second.lock();
185 skipList->processData(info.
seq, item);
199 delta = i->second.lock();
208 delta->processData(info, std::move(txns));
217 JLOG(
j_.debug()) <<
"Sweeping, LedgerReplayer has " <<
tasks_.size() <<
" tasks, "
224 [
this](
auto const& t) ->
bool {
227 JLOG(
j_.debug()) <<
"Sweep task " << t->getTaskParameter().finishHash;
235 auto removeCannotLocked = [](
auto& subTasks) {
236 for (
auto it = subTasks.begin(); it != subTasks.end();)
238 if (
auto item = it->second.lock(); !item)
240 it = subTasks.erase(it);
251 JLOG(
j_.debug()) <<
" LedgerReplayer sweep lock duration "
261 JLOG(
j_.info()) <<
"Stopping...";
266 auto lockAndCancel = [](
auto& i) {
267 if (
auto sptr = i.second.lock(); sptr)
278 JLOG(
j_.info()) <<
"Stopped";
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
InboundLedgers & inboundLedgers_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
void gotReplayDelta(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message).
void gotSkipList(LedgerHeader const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message).
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
void sweep()
Remove completed tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
T duration_cast(T... args)
constexpr std::uint32_t kMaxTasks
constexpr std::uint32_t kMaxTaskSize
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.