rippled
Loading...
Searching...
No Matches
LedgerReplayer.cpp
1#include <xrpld/app/ledger/LedgerReplayer.h>
2#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
4
5namespace ripple {
6
8 Application& app,
9 InboundLedgers& inboundLedgers,
11 : app_(app)
12 , inboundLedgers_(inboundLedgers)
13 , peerSetBuilder_(std::move(peerSetBuilder))
14 , j_(app.journal("LedgerReplayer"))
15{
16}
17
23
24void
27 uint256 const& finishLedgerHash,
28 std::uint32_t totalNumLedgers)
29{
30 XRPL_ASSERT(
31 finishLedgerHash.isNonZero() && totalNumLedgers > 0 &&
33 "ripple::LedgerReplayer::replay : valid inputs");
34
36 r, finishLedgerHash, totalNumLedgers);
37
40 bool newSkipList = false;
41 {
43 if (app_.isStopping())
44 return;
46 {
47 JLOG(j_.info()) << "Too many replay tasks, dropping new task "
48 << parameter.finishHash_;
49 return;
50 }
51
52 for (auto const& t : tasks_)
53 {
54 if (parameter.canMergeInto(t->getTaskParameter()))
55 {
56 JLOG(j_.info()) << "Task " << parameter.finishHash_ << " with "
57 << totalNumLedgers
58 << " ledgers merged into an existing task.";
59 return;
60 }
61 }
62 JLOG(j_.info()) << "Replay " << totalNumLedgers
63 << " ledgers. Finish ledger hash "
64 << parameter.finishHash_;
65
66 auto i = skipLists_.find(parameter.finishHash_);
67 if (i != skipLists_.end())
68 skipList = i->second.lock();
69
70 if (!skipList) // cannot find, or found but cannot lock
71 {
73 app_,
75 parameter.finishHash_,
76 peerSetBuilder_->build());
77 skipLists_[parameter.finishHash_] = skipList;
78 newSkipList = true;
79 }
80
82 app_, inboundLedgers_, *this, skipList, std::move(parameter));
83 tasks_.push_back(task);
84 }
85
86 if (newSkipList)
87 skipList->init(1);
88 // task init after skipList init, could save a timeout
89 task->init();
90}
91
92void
94{
95 {
96 // TODO for use cases like Consensus (i.e. totalLedgers = 1 or small):
97 // check if the last closed or validated ledger l the local node has
98 // is in the skip list and is an ancestor of parameter.startLedger
99 // that has to be downloaded, if so expand the task to start with l.
100 }
101
102 auto const& parameter = task->getTaskParameter();
103 JLOG(j_.trace()) << "Creating " << parameter.totalLedgers_ - 1 << " deltas";
104 if (parameter.totalLedgers_ > 1)
105 {
106 auto skipListItem = std::find(
107 parameter.skipList_.begin(),
108 parameter.skipList_.end(),
109 parameter.startHash_);
110 if (skipListItem == parameter.skipList_.end() ||
111 ++skipListItem == parameter.skipList_.end())
112 {
113 JLOG(j_.error()) << "Task parameter error when creating deltas "
114 << parameter.finishHash_;
115 return;
116 }
117
118 for (std::uint32_t seq = parameter.startSeq_ + 1;
119 seq <= parameter.finishSeq_ &&
120 skipListItem != parameter.skipList_.end();
121 ++seq, ++skipListItem)
122 {
124 bool newDelta = false;
125 {
127 if (app_.isStopping())
128 return;
129 auto i = deltas_.find(*skipListItem);
130 if (i != deltas_.end())
131 delta = i->second.lock();
132
133 if (!delta) // cannot find, or found but cannot lock
134 {
136 app_,
138 *skipListItem,
139 seq,
140 peerSetBuilder_->build());
141 deltas_[*skipListItem] = delta;
142 newDelta = true;
143 }
144 }
145
146 task->addDelta(delta);
147 if (newDelta)
148 delta->init(1);
149 }
150 }
151}
152
153void
155 LedgerInfo const& info,
156 boost::intrusive_ptr<SHAMapItem const> const& item)
157{
159 {
161 auto i = skipLists_.find(info.hash);
162 if (i == skipLists_.end())
163 return;
164 skipList = i->second.lock();
165 if (!skipList)
166 {
167 skipLists_.erase(i);
168 return;
169 }
170 }
171
172 if (skipList)
173 skipList->processData(info.seq, item);
174}
175
176void
178 LedgerInfo const& info,
180{
182 {
184 auto i = deltas_.find(info.hash);
185 if (i == deltas_.end())
186 return;
187 delta = i->second.lock();
188 if (!delta)
189 {
190 deltas_.erase(i);
191 return;
192 }
193 }
194
195 if (delta)
196 delta->processData(info, std::move(txns));
197}
198
199void
201{
202 auto const start = std::chrono::steady_clock::now();
203 {
205 JLOG(j_.debug()) << "Sweeping, LedgerReplayer has " << tasks_.size()
206 << " tasks, " << skipLists_.size()
207 << " skipLists, and " << deltas_.size() << " deltas.";
208
209 tasks_.erase(
211 tasks_.begin(),
212 tasks_.end(),
213 [this](auto const& t) -> bool {
214 if (t->finished())
215 {
216 JLOG(j_.debug()) << "Sweep task "
217 << t->getTaskParameter().finishHash_;
218 return true;
219 }
220 return false;
221 }),
222 tasks_.end());
223
224 auto removeCannotLocked = [](auto& subTasks) {
225 for (auto it = subTasks.begin(); it != subTasks.end();)
226 {
227 if (auto item = it->second.lock(); !item)
228 {
229 it = subTasks.erase(it);
230 }
231 else
232 ++it;
233 }
234 };
235 removeCannotLocked(skipLists_);
236 removeCannotLocked(deltas_);
237 }
238 JLOG(j_.debug()) << " LedgerReplayer sweep lock duration "
239 << std::chrono::duration_cast<std::chrono::milliseconds>(
241 .count()
242 << "ms";
243}
244
245void
246LedgerReplayer::stop()
247{
248 JLOG(j_.info()) << "Stopping...";
249 {
252 tasks_.begin(), tasks_.end(), [](auto& i) { i->cancel(); });
253 tasks_.clear();
254 auto lockAndCancel = [](auto& i) {
255 if (auto sptr = i.second.lock(); sptr)
256 {
257 sptr->cancel();
258 }
259 };
260 std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
261 skipLists_.clear();
262 std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
263 deltas_.clear();
264 }
265
266 JLOG(j_.info()) << "Stopped";
267}
268
269} // namespace ripple
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
virtual bool isStopping() const =0
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
void gotReplayDelta(LedgerInfo const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
void sweep()
Remove completed tasks.
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
void gotSkipList(LedgerInfo const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
InboundLedgers & inboundLedgers_
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
bool isNonZero() const
Definition base_uint.h:526
T find(T... args)
T for_each(T... args)
T is_same_v
std::uint32_t constexpr MAX_TASK_SIZE
std::uint32_t constexpr MAX_TASKS
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
STL namespace.
T remove_if(T... args)
Information about the notional ledger backing the view.