rippled
Loading...
Searching...
No Matches
LedgerReplayer.cpp
1#include <xrpld/app/ledger/LedgerReplayer.h>
2#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
4
5namespace xrpl {
6
8 Application& app,
9 InboundLedgers& inboundLedgers,
11 : app_(app)
12 , inboundLedgers_(inboundLedgers)
13 , peerSetBuilder_(std::move(peerSetBuilder))
14 , j_(app.getJournal("LedgerReplayer"))
15{
16}
17
23
24void
27 uint256 const& finishLedgerHash,
28 std::uint32_t totalNumLedgers)
29{
30 XRPL_ASSERT(
31 finishLedgerHash.isNonZero() && totalNumLedgers > 0 &&
33 "xrpl::LedgerReplayer::replay : valid inputs");
34
35 // NOLINTNEXTLINE(misc-const-correctness)
36 LedgerReplayTask::TaskParameter parameter(r, finishLedgerHash, totalNumLedgers);
37
40 bool newSkipList = false;
41 {
43 if (app_.isStopping())
44 return;
46 {
47 JLOG(j_.info()) << "Too many replay tasks, dropping new task " << parameter.finishHash_;
48 return;
49 }
50
51 for (auto const& t : tasks_)
52 {
53 if (parameter.canMergeInto(t->getTaskParameter()))
54 {
55 JLOG(j_.info()) << "Task " << parameter.finishHash_ << " with " << totalNumLedgers
56 << " ledgers merged into an existing task.";
57 return;
58 }
59 }
60 JLOG(j_.info()) << "Replay " << totalNumLedgers << " ledgers. Finish ledger hash "
61 << parameter.finishHash_;
62
63 auto i = skipLists_.find(parameter.finishHash_);
64 if (i != skipLists_.end())
65 skipList = i->second.lock();
66
67 if (!skipList) // cannot find, or found but cannot lock
68 {
70 app_, inboundLedgers_, parameter.finishHash_, peerSetBuilder_->build());
71 skipLists_[parameter.finishHash_] = skipList;
72 newSkipList = true;
73 }
74
76 app_, inboundLedgers_, *this, skipList, std::move(parameter));
77 tasks_.push_back(task);
78 }
79
80 if (newSkipList)
81 skipList->init(1);
82 // task init after skipList init, could save a timeout
83 task->init();
84}
85
86void
88{
89 {
90 // TODO for use cases like Consensus (i.e. totalLedgers = 1 or small):
91 // check if the last closed or validated ledger l the local node has
92 // is in the skip list and is an ancestor of parameter.startLedger
93 // that has to be downloaded, if so expand the task to start with l.
94 }
95
96 auto const& parameter = task->getTaskParameter();
97 JLOG(j_.trace()) << "Creating " << parameter.totalLedgers_ - 1 << " deltas";
98 if (parameter.totalLedgers_ > 1)
99 {
100 auto skipListItem =
101 std::find(parameter.skipList_.begin(), parameter.skipList_.end(), parameter.startHash_);
102 auto const wasLast = skipListItem == parameter.skipList_.end();
103 if (not wasLast)
104 ++skipListItem;
105 auto const isLast = skipListItem == parameter.skipList_.end();
106
107 if (wasLast || isLast)
108 {
109 JLOG(j_.error()) << "Task parameter error when creating deltas "
110 << parameter.finishHash_;
111 return;
112 }
113
114 for (std::uint32_t seq = parameter.startSeq_ + 1;
115 seq <= parameter.finishSeq_ && skipListItem != parameter.skipList_.end();
116 ++seq, ++skipListItem)
117 {
119 bool newDelta = false;
120 {
122 if (app_.isStopping())
123 return;
124 auto i = deltas_.find(*skipListItem);
125 if (i != deltas_.end())
126 delta = i->second.lock();
127
128 if (!delta) // cannot find, or found but cannot lock
129 {
131 app_, inboundLedgers_, *skipListItem, seq, peerSetBuilder_->build());
132 deltas_[*skipListItem] = delta;
133 newDelta = true;
134 }
135 }
136
137 task->addDelta(delta);
138 if (newDelta)
139 delta->init(1);
140 }
141 }
142}
143
144void
146 LedgerHeader const& info,
147 boost::intrusive_ptr<SHAMapItem const> const& item)
148{
150 {
152 auto i = skipLists_.find(info.hash);
153 if (i == skipLists_.end())
154 return;
155 skipList = i->second.lock();
156 if (!skipList)
157 {
158 skipLists_.erase(i);
159 return;
160 }
161 }
162
163 if (skipList)
164 skipList->processData(info.seq, item);
165}
166
167void
169 LedgerHeader const& info,
171{
173 {
175 auto i = deltas_.find(info.hash);
176 if (i == deltas_.end())
177 return;
178 delta = i->second.lock();
179 if (!delta)
180 {
181 deltas_.erase(i);
182 return;
183 }
184 }
185
186 if (delta)
187 delta->processData(info, std::move(txns));
188}
189
190void
192{
193 auto const start = std::chrono::steady_clock::now();
194 {
196 JLOG(j_.debug()) << "Sweeping, LedgerReplayer has " << tasks_.size() << " tasks, "
197 << skipLists_.size() << " skipLists, and " << deltas_.size() << " deltas.";
198
199 tasks_.erase(
201 tasks_.begin(),
202 tasks_.end(),
203 [this](auto const& t) -> bool {
204 if (t->finished())
205 {
206 JLOG(j_.debug()) << "Sweep task " << t->getTaskParameter().finishHash_;
207 return true;
208 }
209 return false;
210 }),
211 tasks_.end());
212
213 auto removeCannotLocked = [](auto& subTasks) {
214 for (auto it = subTasks.begin(); it != subTasks.end();)
215 {
216 if (auto item = it->second.lock(); !item)
217 {
218 it = subTasks.erase(it);
219 }
220 else
221 {
222 ++it;
223 }
224 }
225 };
226 removeCannotLocked(skipLists_);
227 removeCannotLocked(deltas_);
228 }
229 JLOG(j_.debug()) << " LedgerReplayer sweep lock duration "
230 << std::chrono::duration_cast<std::chrono::milliseconds>(
232 .count()
233 << "ms";
234}
235
236void
237LedgerReplayer::stop()
238{
239 JLOG(j_.info()) << "Stopping...";
240 {
241 std::lock_guard<std::mutex> const lock(mtx_);
242 std::for_each(tasks_.begin(), tasks_.end(), [](auto& i) { i->cancel(); });
243 tasks_.clear();
244 auto lockAndCancel = [](auto& i) {
245 if (auto sptr = i.second.lock(); sptr)
246 {
247 sptr->cancel();
248 }
249 };
250 std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
251 skipLists_.clear();
252 std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
253 deltas_.clear();
254 }
255
256 JLOG(j_.info()) << "Stopped";
257}
258
259} // namespace xrpl
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
InboundLedgers & inboundLedgers_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
void gotReplayDelta(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
void gotSkipList(LedgerHeader const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
void sweep()
Remove completed tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
virtual bool isStopping() const =0
bool isNonZero() const
Definition base_uint.h:518
T find(T... args)
T for_each(T... args)
T is_same_v
STL namespace.
std::uint32_t constexpr MAX_TASKS
std::uint32_t constexpr MAX_TASK_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T remove_if(T... args)
Information about the notional ledger backing the view.