1#include <xrpld/app/ledger/LedgerReplayer.h>
2#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
12 , inboundLedgers_(inboundLedgers)
13 , peerSetBuilder_(
std::move(peerSetBuilder))
14 , j_(app.getJournal(
"LedgerReplayer"))
27 uint256 const& finishLedgerHash,
31 finishLedgerHash.
isNonZero() && totalNumLedgers > 0 &&
33 "xrpl::LedgerReplayer::replay : valid inputs");
40 bool newSkipList =
false;
47 JLOG(
j_.
info()) <<
"Too many replay tasks, dropping new task " << parameter.
finishHash_;
51 for (
auto const& t :
tasks_)
55 JLOG(
j_.
info()) <<
"Task " << parameter.
finishHash_ <<
" with " << totalNumLedgers
56 <<
" ledgers merged into an existing task.";
60 JLOG(
j_.
info()) <<
"Replay " << totalNumLedgers <<
" ledgers. Finish ledger hash "
65 skipList = i->second.lock();
96 auto const& parameter = task->getTaskParameter();
97 JLOG(
j_.
trace()) <<
"Creating " << parameter.totalLedgers_ - 1 <<
" deltas";
98 if (parameter.totalLedgers_ > 1)
101 std::find(parameter.skipList_.begin(), parameter.skipList_.end(), parameter.startHash_);
102 auto const wasLast = skipListItem == parameter.skipList_.end();
105 auto const isLast = skipListItem == parameter.skipList_.end();
107 if (wasLast || isLast)
109 JLOG(
j_.
error()) <<
"Task parameter error when creating deltas "
110 << parameter.finishHash_;
115 seq <= parameter.finishSeq_ && skipListItem != parameter.skipList_.end();
116 ++seq, ++skipListItem)
119 bool newDelta =
false;
124 auto i =
deltas_.find(*skipListItem);
126 delta = i->second.lock();
132 deltas_[*skipListItem] = delta;
137 task->addDelta(delta);
147 boost::intrusive_ptr<SHAMapItem const>
const& item)
155 skipList = i->second.lock();
164 skipList->processData(info.
seq, item);
178 delta = i->second.lock();
187 delta->processData(info, std::move(txns));
196 JLOG(
j_.
debug()) <<
"Sweeping, LedgerReplayer has " <<
tasks_.size() <<
" tasks, "
203 [
this](
auto const& t) ->
bool {
206 JLOG(j_.debug()) <<
"Sweep task " << t->getTaskParameter().finishHash_;
213 auto removeCannotLocked = [](
auto& subTasks) {
214 for (
auto it = subTasks.begin(); it != subTasks.end();)
216 if (
auto item = it->second.lock(); !item)
218 it = subTasks.erase(it);
229 JLOG(j_.
debug()) <<
" LedgerReplayer sweep lock duration "
230 << std::chrono::duration_cast<std::chrono::milliseconds>(
237LedgerReplayer::stop()
239 JLOG(j_.
info()) <<
"Stopping...";
242 std::for_each(tasks_.begin(), tasks_.end(), [](
auto& i) { i->cancel(); });
244 auto lockAndCancel = [](
auto& i) {
245 if (
auto sptr = i.second.lock(); sptr)
250 std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
252 std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
256 JLOG(j_.
info()) <<
"Stopped";
Stream trace() const
Severity stream access functions.
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
InboundLedgers & inboundLedgers_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
void gotReplayDelta(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
void gotSkipList(LedgerHeader const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
void sweep()
Remove completed tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
virtual bool isStopping() const =0
std::uint32_t constexpr MAX_TASKS
std::uint32_t constexpr MAX_TASK_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.