xrpld
Loading...
Searching...
No Matches
LedgerReplayTask.cpp
1#include <xrpld/app/ledger/LedgerReplayTask.h>
2
3#include <xrpld/app/ledger/InboundLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerReplayer.h>
6#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
7#include <xrpld/app/ledger/detail/SkipListAcquire.h>
8#include <xrpld/app/ledger/detail/TimeoutCounter.h>
9#include <xrpld/app/main/Application.h>
10
11#include <xrpl/basics/Log.h>
12#include <xrpl/basics/base_uint.h>
13#include <xrpl/beast/utility/instrumentation.h>
14#include <xrpl/core/Job.h>
15
16#include <algorithm>
17#include <cstdint>
18#include <memory>
19#include <stdexcept>
20#include <vector>
21
22namespace xrpl {
23
26 uint256 const& finishLedgerHash,
27 std::uint32_t totalNumLedgers)
28 : reason(r), finishHash(finishLedgerHash), totalLedgers(totalNumLedgers)
29{
30 XRPL_ASSERT(
31 finishLedgerHash.isNonZero() && totalNumLedgers > 0,
32 "xrpl::LedgerReplayTask::TaskParameter::TaskParameter : valid "
33 "inputs");
34}
35
36bool
38 uint256 const& hash,
39 std::uint32_t seq,
40 std::vector<uint256> const& sList)
41{
42 if (finishHash != hash || sList.size() + 1 < totalLedgers || full)
43 return false;
44
45 finishSeq = seq;
46 skipList = sList;
47 skipList.emplace_back(finishHash);
49 XRPL_ASSERT(
50 startHash.isNonZero(),
51 "xrpl::LedgerReplayTask::TaskParameter::update : nonzero start hash");
53 full = true;
54 return true;
55}
56
57bool
59{
60 if (reason == existingTask.reason)
61 {
62 if (finishHash == existingTask.finishHash && totalLedgers <= existingTask.totalLedgers)
63 {
64 return true;
65 }
66
67 if (existingTask.full)
68 {
69 auto const& exList = existingTask.skipList;
70 if (auto i = std::ranges::find(exList, finishHash); i != exList.end())
71 {
72 return existingTask.totalLedgers >= totalLedgers + (exList.end() - i) - 1;
73 }
74 }
75 }
76
77 return false;
78}
79
81 Application& app,
82 InboundLedgers& inboundLedgers,
83 LedgerReplayer& replayer,
84 std::shared_ptr<SkipListAcquire>& skipListAcquirer,
85 TaskParameter const& parameter)
87 app,
88 parameter.finishHash,
89 LedgerReplayParameters::kTaskTimeout,
90 {.jobType = JtReplayTask,
91 .jobName = "LedReplTask",
93 app.getJournal("LedgerReplayTask"))
94 , inboundLedgers_(inboundLedgers)
95 , replayer_(replayer)
96 , parameter_(parameter)
97 , maxTimeouts_(
101 , skipListAcquirer_(skipListAcquirer)
102{
103 JLOG(journal_.trace()) << "Create " << hash_;
104}
105
107{
108 JLOG(journal_.trace()) << "Destroy " << hash_;
109}
110
111void
113{
114 JLOG(journal_.debug()) << "Task start " << hash_;
115
117 skipListAcquirer_->addDataCallback([wptr](bool good, uint256 const& hash) {
118 if (auto sptr = wptr.lock(); sptr)
119 {
120 if (!good)
121 {
122 sptr->cancel();
123 }
124 else
125 {
126 auto const skipListData = sptr->skipListAcquirer_->getData();
127 sptr->updateSkipList(hash, skipListData->ledgerSeq, skipListData->skipList);
128 }
129 }
130 });
131
133 if (!isDone())
134 {
135 trigger(sl);
136 setTimer(sl);
137 }
138}
139
140void
142{
143 JLOG(journal_.trace()) << "trigger " << hash_;
144 if (!parameter_.full)
145 return;
146
147 if (!parent_)
148 {
149 parent_ = app_.getLedgerMaster().getLedgerByHash(parameter_.startHash);
150 if (!parent_)
151 {
152 parent_ = inboundLedgers_.acquire(
154 }
155 if (parent_)
156 {
157 JLOG(journal_.trace())
158 << "Got start ledger " << parameter_.startHash << " for task " << hash_;
159 }
160 }
161
162 tryAdvance(sl);
163}
164
165void
167{
168 JLOG(journal_.trace()) << "Delta " << deltaHash << " ready for task " << hash_;
170 if (!isDone())
171 tryAdvance(sl);
172}
173
174void
176{
177 JLOG(journal_.trace()) << "tryAdvance task " << hash_
178 << (parameter_.full ? ", full parameter" : ", waiting to fill parameter")
179 << ", deltaIndex=" << deltaToBuild_ << ", totalDeltas=" << deltas_.size()
180 << ", parent " << (parent_ ? parent_->header().hash : uint256());
181
182 bool const shouldTry =
183 parent_ && parameter_.full && parameter_.totalLedgers - 1 == deltas_.size();
184 if (!shouldTry)
185 return;
186
187 try
188 {
189 for (; deltaToBuild_ < deltas_.size(); ++deltaToBuild_)
190 {
191 auto& delta = deltas_[deltaToBuild_];
192 XRPL_ASSERT(
193 parent_->seq() + 1 == delta->ledgerSeq_,
194 "xrpl::LedgerReplayTask::tryAdvance : consecutive sequence");
195 if (auto l = delta->tryBuild(parent_); l)
196 {
197 JLOG(journal_.debug())
198 << "Task " << hash_ << " got ledger " << l->header().hash
199 << " deltaIndex=" << deltaToBuild_ << " totalDeltas=" << deltas_.size();
200 parent_ = l;
201 }
202 else
203 {
204 return;
205 }
206 }
207
208 complete_ = true;
209 JLOG(journal_.info()) << "Completed " << hash_;
210 }
211 catch (std::runtime_error const&)
212 {
213 failed_ = true;
214 }
215}
216
217void
219 uint256 const& hash,
220 std::uint32_t seq,
221 std::vector<uint256> const& sList)
222{
223 {
224 ScopedLockType const sl(mtx_);
225 if (isDone())
226 return;
227 if (!parameter_.update(hash, seq, sList))
228 {
229 JLOG(journal_.error()) << "Parameter update failed " << hash_;
230 failed_ = true;
231 return;
232 }
233 }
234
235 replayer_.createDeltas(shared_from_this());
237 if (!isDone())
238 trigger(sl);
239}
240
241void
243{
244 JLOG(journal_.trace()) << "timeouts_=" << timeouts_ << " for " << hash_;
246 {
247 failed_ = true;
248 JLOG(journal_.debug()) << "LedgerReplayTask Failed, too many timeouts " << hash_;
249 }
250 else
251 {
252 trigger(sl);
253 }
254}
255
261
262void
264{
266 delta->addDataCallback(parameter_.reason, [wptr](bool good, uint256 const& hash) {
267 if (auto sptr = wptr.lock(); sptr)
268 {
269 if (!good)
270 {
271 sptr->cancel();
272 }
273 else
274 {
275 sptr->deltaReady(hash);
276 }
277 }
278 });
279
280 ScopedLockType const sl(mtx_);
281 if (!isDone())
282 {
283 JLOG(journal_.trace()) << "addDelta task " << hash_ << " deltaIndex=" << deltaToBuild_
284 << " totalDeltas=" << deltas_.size();
285 XRPL_ASSERT(
286 deltas_.empty() || deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_,
287 "xrpl::LedgerReplayTask::addDelta : no deltas or consecutive "
288 "sequence");
289 deltas_.push_back(delta);
290 }
291}
292
293bool
295{
296 ScopedLockType const sl(mtx_);
297 return isDone();
298}
299
300} // namespace xrpl
bool isNonZero() const
Definition base_uint.h:549
Manages the lifetime of inbound ledgers.
TaskParameter(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
constructor
bool update(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
fill all the fields that was not filled during construction
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
void trigger(ScopedLockType &sl)
Trigger another round.
void onTimer(bool progress, ScopedLockType &sl) override
Hook called from invokeOnTimer().
LedgerReplayer & replayer_
LedgerReplayTask(Application &app, InboundLedgers &inboundLedgers, LedgerReplayer &replayer, std::shared_ptr< SkipListAcquire > &skipListAcquirer, TaskParameter const &parameter)
Constructor.
InboundLedgers & inboundLedgers_
void updateSkipList(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
Update this task (by a SkipListAcquire subtask) when skip list is ready.
void tryAdvance(ScopedLockType &sl)
Try to build more ledgers.
void deltaReady(uint256 const &deltaHash)
Notify this task (by a LedgerDeltaAcquire subtask) that a delta is ready.
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::vector< std::shared_ptr< LedgerDeltaAcquire > > deltas_
bool finished() const
return if the task is finished
void addDelta(std::shared_ptr< LedgerDeltaAcquire > const &delta)
add a new LedgerDeltaAcquire subtask
std::shared_ptr< Ledger const > parent_
std::shared_ptr< SkipListAcquire > skipListAcquirer_
void init()
Start the task.
Manages the lifetime of ledger replay tasks.
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
T find(T... args)
T lock(T... args)
T max(T... args)
constexpr std::uint32_t kTaskMaxTimeoutsMinimum
constexpr std::uint32_t kTaskMaxTimeoutsMultiplier
constexpr std::uint32_t kMaxQueuedTasks
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ JtReplayTask
Definition Job.h:42
BaseUInt< 256 > uint256
Definition base_uint.h:562
T size(T... args)