xrpld
Loading...
Searching...
No Matches
LedgerReplayer.cpp
1#include <xrpld/app/ledger/LedgerReplayer.h>
2
3#include <xrpld/app/ledger/InboundLedger.h>
4#include <xrpld/app/ledger/LedgerReplayTask.h>
5#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
6#include <xrpld/app/ledger/detail/SkipListAcquire.h>
7#include <xrpld/app/main/Application.h>
8#include <xrpld/overlay/PeerSet.h>
9
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/base_uint.h>
12#include <xrpl/beast/utility/instrumentation.h>
13#include <xrpl/protocol/LedgerHeader.h>
14#include <xrpl/protocol/STTx.h>
15#include <xrpl/shamap/SHAMapItem.h>
16
17#include <boost/smart_ptr/intrusive_ptr.hpp>
18
19#include <algorithm>
20#include <chrono>
21#include <cstdint>
22#include <map>
23#include <memory>
24#include <mutex>
25#include <utility>
26
27namespace xrpl {
28
30 Application& app,
31 InboundLedgers& inboundLedgers,
33 : app_(app)
34 , inboundLedgers_(inboundLedgers)
35 , peerSetBuilder_(std::move(peerSetBuilder))
36 , j_(app.getJournal("LedgerReplayer"))
37{
38}
39
41{
42 std::scoped_lock const lock(mtx_);
43 tasks_.clear();
44}
45
46void
49 uint256 const& finishLedgerHash,
50 std::uint32_t totalNumLedgers)
51{
52 XRPL_ASSERT(
53 finishLedgerHash.isNonZero() && totalNumLedgers > 0 &&
54 totalNumLedgers <= LedgerReplayParameters::kMaxTaskSize,
55 "xrpl::LedgerReplayer::replay : valid inputs");
56
57 // NOLINTNEXTLINE(misc-const-correctness)
58 LedgerReplayTask::TaskParameter parameter(r, finishLedgerHash, totalNumLedgers);
59
62 bool newSkipList = false;
63 {
64 std::scoped_lock const lock(mtx_);
65 if (app_.isStopping())
66 return;
68 {
69 JLOG(j_.info()) << "Too many replay tasks, dropping new task " << parameter.finishHash;
70 return;
71 }
72
73 for (auto const& t : tasks_)
74 {
75 if (parameter.canMergeInto(t->getTaskParameter()))
76 {
77 JLOG(j_.info()) << "Task " << parameter.finishHash << " with " << totalNumLedgers
78 << " ledgers merged into an existing task.";
79 return;
80 }
81 }
82 JLOG(j_.info()) << "Replay " << totalNumLedgers << " ledgers. Finish ledger hash "
83 << parameter.finishHash;
84
85 auto i = skipLists_.find(parameter.finishHash);
86 if (i != skipLists_.end())
87 skipList = i->second.lock();
88
89 if (!skipList) // cannot find, or found but cannot lock
90 {
92 app_, inboundLedgers_, parameter.finishHash, peerSetBuilder_->build());
93 skipLists_[parameter.finishHash] = skipList;
94 newSkipList = true;
95 }
96
98 app_, inboundLedgers_, *this, skipList, std::move(parameter));
99 tasks_.push_back(task);
100 }
101
102 if (newSkipList)
103 skipList->init(1);
104 // task init after skipList init, could save a timeout
105 task->init();
106}
107
108void
110{
111 {
112 // TODO for use cases like Consensus (i.e. totalLedgers = 1 or small):
113 // check if the last closed or validated ledger l the local node has
114 // is in the skip list and is an ancestor of parameter.startLedger
115 // that has to be downloaded, if so expand the task to start with l.
116 }
117
118 auto const& parameter = task->getTaskParameter();
119 JLOG(j_.trace()) << "Creating " << parameter.totalLedgers - 1 << " deltas";
120 if (parameter.totalLedgers > 1)
121 {
122 auto skipListItem = std::ranges::find(parameter.skipList, parameter.startHash);
123 auto const wasLast = skipListItem == parameter.skipList.end();
124 if (not wasLast)
125 ++skipListItem;
126 auto const isLast = skipListItem == parameter.skipList.end();
127
128 if (wasLast || isLast)
129 {
130 JLOG(j_.error()) << "Task parameter error when creating deltas "
131 << parameter.finishHash;
132 return;
133 }
134
135 for (std::uint32_t seq = parameter.startSeq + 1;
136 seq <= parameter.finishSeq && skipListItem != parameter.skipList.end();
137 ++seq, ++skipListItem)
138 {
140 bool newDelta = false;
141 {
142 std::scoped_lock const lock(mtx_);
143 if (app_.isStopping())
144 return;
145 auto i = deltas_.find(*skipListItem);
146 if (i != deltas_.end())
147 delta = i->second.lock();
148
149 if (!delta) // cannot find, or found but cannot lock
150 {
152 app_, inboundLedgers_, *skipListItem, seq, peerSetBuilder_->build());
153 deltas_[*skipListItem] = delta;
154 newDelta = true;
155 }
156 }
157
158 task->addDelta(delta);
159 if (newDelta)
160 delta->init(1);
161 }
162 }
163}
164
165void
167 LedgerHeader const& info,
168 boost::intrusive_ptr<SHAMapItem const> const& item)
169{
171 {
172 std::scoped_lock const lock(mtx_);
173 auto i = skipLists_.find(info.hash);
174 if (i == skipLists_.end())
175 return;
176 skipList = i->second.lock();
177 if (!skipList)
178 {
179 skipLists_.erase(i);
180 return;
181 }
182 }
183
184 if (skipList)
185 skipList->processData(info.seq, item);
186}
187
188void
190 LedgerHeader const& info,
192{
194 {
195 std::scoped_lock const lock(mtx_);
196 auto i = deltas_.find(info.hash);
197 if (i == deltas_.end())
198 return;
199 delta = i->second.lock();
200 if (!delta)
201 {
202 deltas_.erase(i);
203 return;
204 }
205 }
206
207 if (delta)
208 delta->processData(info, std::move(txns));
209}
210
211void
213{
214 auto const start = std::chrono::steady_clock::now();
215 {
216 std::scoped_lock const lock(mtx_);
217 JLOG(j_.debug()) << "Sweeping, LedgerReplayer has " << tasks_.size() << " tasks, "
218 << skipLists_.size() << " skipLists, and " << deltas_.size() << " deltas.";
219
220 tasks_.erase(
222 tasks_,
223
224 [this](auto const& t) -> bool {
225 if (t->finished())
226 {
227 JLOG(j_.debug()) << "Sweep task " << t->getTaskParameter().finishHash;
228 return true;
229 }
230 return false;
231 })
232 .begin(),
233 tasks_.end());
234
235 auto removeCannotLocked = [](auto& subTasks) {
236 for (auto it = subTasks.begin(); it != subTasks.end();)
237 {
238 if (auto item = it->second.lock(); !item)
239 {
240 it = subTasks.erase(it);
241 }
242 else
243 {
244 ++it;
245 }
246 }
247 };
248 removeCannotLocked(skipLists_);
249 removeCannotLocked(deltas_);
250 }
251 JLOG(j_.debug()) << " LedgerReplayer sweep lock duration "
254 .count()
255 << "ms";
256}
257
258void
260{
261 JLOG(j_.info()) << "Stopping...";
262 {
263 std::scoped_lock const lock(mtx_);
264 std::ranges::for_each(tasks_, [](auto& i) { i->cancel(); });
265 tasks_.clear();
266 auto lockAndCancel = [](auto& i) {
267 if (auto sptr = i.second.lock(); sptr)
268 {
269 sptr->cancel();
270 }
271 };
272 std::ranges::for_each(skipLists_, lockAndCancel);
273 skipLists_.clear();
274 std::ranges::for_each(deltas_, lockAndCancel);
275 deltas_.clear();
276 }
277
278 JLOG(j_.info()) << "Stopped";
279}
280
281} // namespace xrpl
bool isNonZero() const
Definition base_uint.h:549
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
InboundLedgers & inboundLedgers_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
void gotReplayDelta(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message).
void gotSkipList(LedgerHeader const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message).
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
void sweep()
Remove completed tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
T duration_cast(T... args)
T find(T... args)
T for_each(T... args)
T make_shared(T... args)
STL namespace.
constexpr std::uint32_t kMaxTasks
constexpr std::uint32_t kMaxTaskSize
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
BaseUInt< 256 > uint256
Definition base_uint.h:562
T remove_if(T... args)
Information about the notional ledger backing the view.