rippled
Loading...
Searching...
No Matches
LedgerReplayTask.cpp
1#include <xrpld/app/ledger/InboundLedgers.h>
2#include <xrpld/app/ledger/LedgerReplayTask.h>
3#include <xrpld/app/ledger/LedgerReplayer.h>
4#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
5#include <xrpld/app/ledger/detail/SkipListAcquire.h>
6
7namespace ripple {
8
11 uint256 const& finishLedgerHash,
12 std::uint32_t totalNumLedgers)
13 : reason_(r), finishHash_(finishLedgerHash), totalLedgers_(totalNumLedgers)
14{
15 XRPL_ASSERT(
16 finishLedgerHash.isNonZero() && totalNumLedgers > 0,
17 "ripple::LedgerReplayTask::TaskParameter::TaskParameter : valid "
18 "inputs");
19}
20
21bool
23 uint256 const& hash,
24 std::uint32_t seq,
25 std::vector<uint256> const& sList)
26{
27 if (finishHash_ != hash || sList.size() + 1 < totalLedgers_ || full_)
28 return false;
29
30 finishSeq_ = seq;
31 skipList_ = sList;
32 skipList_.emplace_back(finishHash_);
33 startHash_ = skipList_[skipList_.size() - totalLedgers_];
34 XRPL_ASSERT(
35 startHash_.isNonZero(),
36 "ripple::LedgerReplayTask::TaskParameter::update : nonzero start hash");
37 startSeq_ = finishSeq_ - totalLedgers_ + 1;
38 full_ = true;
39 return true;
40}
41
42bool
44 TaskParameter const& existingTask) const
45{
46 if (reason_ == existingTask.reason_)
47 {
48 if (finishHash_ == existingTask.finishHash_ &&
49 totalLedgers_ <= existingTask.totalLedgers_)
50 {
51 return true;
52 }
53
54 if (existingTask.full_)
55 {
56 auto const& exList = existingTask.skipList_;
57 if (auto i = std::find(exList.begin(), exList.end(), finishHash_);
58 i != exList.end())
59 {
60 return existingTask.totalLedgers_ >=
61 totalLedgers_ + (exList.end() - i) - 1;
62 }
63 }
64 }
65
66 return false;
67}
68
70 Application& app,
71 InboundLedgers& inboundLedgers,
72 LedgerReplayer& replayer,
73 std::shared_ptr<SkipListAcquire>& skipListAcquirer,
74 TaskParameter&& parameter)
76 app,
77 parameter.finishHash_,
78 LedgerReplayParameters::TASK_TIMEOUT,
80 "LedgerReplayTask",
82 app.journal("LedgerReplayTask"))
83 , inboundLedgers_(inboundLedgers)
84 , replayer_(replayer)
85 , parameter_(parameter)
86 , maxTimeouts_(std::max(
88 parameter.totalLedgers_ *
90 , skipListAcquirer_(skipListAcquirer)
91{
92 JLOG(journal_.trace()) << "Create " << hash_;
93}
94
96{
97 JLOG(journal_.trace()) << "Destroy " << hash_;
98}
99
100void
102{
103 JLOG(journal_.debug()) << "Task start " << hash_;
104
106 skipListAcquirer_->addDataCallback([wptr](bool good, uint256 const& hash) {
107 if (auto sptr = wptr.lock(); sptr)
108 {
109 if (!good)
110 {
111 sptr->cancel();
112 }
113 else
114 {
115 auto const skipListData = sptr->skipListAcquirer_->getData();
116 sptr->updateSkipList(
117 hash, skipListData->ledgerSeq, skipListData->skipList);
118 }
119 }
120 });
121
123 if (!isDone())
124 {
125 trigger(sl);
126 setTimer(sl);
127 }
128}
129
130void
132{
133 JLOG(journal_.trace()) << "trigger " << hash_;
134 if (!parameter_.full_)
135 return;
136
137 if (!parent_)
138 {
140 if (!parent_)
141 {
146 }
147 if (parent_)
148 {
149 JLOG(journal_.trace())
150 << "Got start ledger " << parameter_.startHash_ << " for task "
151 << hash_;
152 }
153 }
154
155 tryAdvance(sl);
156}
157
158void
160{
161 JLOG(journal_.trace()) << "Delta " << deltaHash << " ready for task "
162 << hash_;
164 if (!isDone())
165 tryAdvance(sl);
166}
167
168void
170{
171 JLOG(journal_.trace()) << "tryAdvance task " << hash_
172 << (parameter_.full_ ? ", full parameter"
173 : ", waiting to fill parameter")
174 << ", deltaIndex=" << deltaToBuild_
175 << ", totalDeltas=" << deltas_.size() << ", parent "
176 << (parent_ ? parent_->info().hash : uint256());
177
178 bool shouldTry = parent_ && parameter_.full_ &&
179 parameter_.totalLedgers_ - 1 == deltas_.size();
180 if (!shouldTry)
181 return;
182
183 try
184 {
185 for (; deltaToBuild_ < deltas_.size(); ++deltaToBuild_)
186 {
187 auto& delta = deltas_[deltaToBuild_];
188 XRPL_ASSERT(
189 parent_->seq() + 1 == delta->ledgerSeq_,
190 "ripple::LedgerReplayTask::tryAdvance : consecutive sequence");
191 if (auto l = delta->tryBuild(parent_); l)
192 {
193 JLOG(journal_.debug())
194 << "Task " << hash_ << " got ledger " << l->info().hash
195 << " deltaIndex=" << deltaToBuild_
196 << " totalDeltas=" << deltas_.size();
197 parent_ = l;
198 }
199 else
200 return;
201 }
202
203 complete_ = true;
204 JLOG(journal_.info()) << "Completed " << hash_;
205 }
206 catch (std::runtime_error const&)
207 {
208 failed_ = true;
209 }
210}
211
212void
214 uint256 const& hash,
215 std::uint32_t seq,
216 std::vector<uint256> const& sList)
217{
218 {
220 if (isDone())
221 return;
222 if (!parameter_.update(hash, seq, sList))
223 {
224 JLOG(journal_.error()) << "Parameter update failed " << hash_;
225 failed_ = true;
226 return;
227 }
228 }
229
232 if (!isDone())
233 trigger(sl);
234}
235
236void
238{
239 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
241 {
242 failed_ = true;
243 JLOG(journal_.debug())
244 << "LedgerReplayTask Failed, too many timeouts " << hash_;
245 }
246 else
247 {
248 trigger(sl);
249 }
250}
251
257
258void
260{
262 delta->addDataCallback(
263 parameter_.reason_, [wptr](bool good, uint256 const& hash) {
264 if (auto sptr = wptr.lock(); sptr)
265 {
266 if (!good)
267 sptr->cancel();
268 else
269 sptr->deltaReady(hash);
270 }
271 });
272
273 ScopedLockType sl(mtx_);
274 if (!isDone())
275 {
276 JLOG(journal_.trace())
277 << "addDelta task " << hash_ << " deltaIndex=" << deltaToBuild_
278 << " totalDeltas=" << deltas_.size();
279 XRPL_ASSERT(
280 deltas_.empty() ||
281 deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_,
282 "ripple::LedgerReplayTask::addDelta : no deltas or consecutive "
283 "sequence", );
284 deltas_.push_back(delta);
285 }
286}
287
288bool
289LedgerReplayTask::finished() const
290{
291 ScopedLockType sl(mtx_);
292 return isDone();
293}
294
295} // namespace ripple
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
virtual LedgerMaster & getLedgerMaster()=0
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
TaskParameter(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
constructor
bool update(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
fill all the fields that was not filled during construction
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
void trigger(ScopedLockType &sl)
Trigger another round.
std::vector< std::shared_ptr< LedgerDeltaAcquire > > deltas_
void addDelta(std::shared_ptr< LedgerDeltaAcquire > const &delta)
add a new LedgerDeltaAcquire subtask
void deltaReady(uint256 const &deltaHash)
Notify this task (by a LedgerDeltaAcquire subtask) that a delta is ready.
void init()
Start the task.
InboundLedgers & inboundLedgers_
std::shared_ptr< SkipListAcquire > skipListAcquirer_
void updateSkipList(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
Update this task (by a SkipListAcquire subtask) when skip list is ready.
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
LedgerReplayTask(Application &app, InboundLedgers &inboundLedgers, LedgerReplayer &replayer, std::shared_ptr< SkipListAcquire > &skipListAcquirer, TaskParameter &&parameter)
Constructor.
void tryAdvance(ScopedLockType &sl)
Try to build more ledgers.
std::shared_ptr< Ledger const > parent_
void onTimer(bool progress, ScopedLockType &sl) override
Hook called from invokeOnTimer().
Manages the lifetime of ledger replay tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
This class is an "active" object.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
static constexpr std::size_t size()
Definition base_uint.h:507
bool isNonZero() const
Definition base_uint.h:526
T emplace_back(T... args)
T find(T... args)
T lock(T... args)
T max(T... args)
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MINIMUM
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MULTIPLIER
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
base_uint< 256 > uint256
Definition base_uint.h:539
@ jtREPLAY_TASK
Definition Job.h:42
T size(T... args)