rippled
Loading...
Searching...
No Matches
SkipListAcquire.cpp
1#include <xrpld/app/ledger/InboundLedger.h>
2#include <xrpld/app/ledger/LedgerReplayer.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
4#include <xrpld/app/main/Application.h>
5#include <xrpld/overlay/PeerSet.h>
6
7namespace xrpl {
8
10 Application& app,
11 InboundLedgers& inboundLedgers,
12 uint256 const& ledgerHash,
15 app,
16 ledgerHash,
17 LedgerReplayParameters::SUB_TASK_TIMEOUT,
19 app.getJournal("LedgerReplaySkipList"))
20 , inboundLedgers_(inboundLedgers)
21 , peerSet_(std::move(peerSet))
22{
23 JLOG(journal_.trace()) << "Create " << hash_;
24}
25
27{
28 JLOG(journal_.trace()) << "Destroy " << hash_;
29}
30
31void
33{
35 if (!isDone())
36 {
37 trigger(numPeers, sl);
38 setTimer(sl);
39 }
40}
41
42void
44{
45 if (auto const l = app_.getLedgerMaster().getLedgerByHash(hash_); l)
46 {
47 JLOG(journal_.trace()) << "existing ledger " << hash_;
48 retrieveSkipList(l, sl);
49 return;
50 }
51
52 if (!fallBack_)
53 {
54 peerSet_->addPeers(
55 limit,
56 [this](auto peer) {
57 return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
58 peer->hasLedger(hash_, 0);
59 },
60 [this](auto peer) {
61 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
62 {
63 JLOG(journal_.trace()) << "Add a peer " << peer->id() << " for " << hash_;
64 protocol::TMProofPathRequest request;
65 request.set_ledgerhash(hash_.data(), hash_.size());
66 request.set_key(keylet::skip().key.data(), keylet::skip().key.size());
67 request.set_type(protocol::TMLedgerMapType::lmACCOUNT_STATE);
68 peerSet_->sendRequest(request, peer);
69 }
70 else
71 {
72 JLOG(journal_.trace())
73 << "Add a no feature peer " << peer->id() << " for " << hash_;
75 {
76 JLOG(journal_.debug()) << "Fall back for " << hash_;
78 fallBack_ = true;
79 }
80 }
81 });
82 }
83
84 if (fallBack_)
86}
87
88void
90{
91 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
93 {
94 failed_ = true;
95 JLOG(journal_.debug()) << "too many timeouts " << hash_;
96 notify(sl);
97 }
98 else
99 {
100 trigger(1, sl);
101 }
102}
103
109
110void
112 std::uint32_t ledgerSeq,
113 boost::intrusive_ptr<SHAMapItem const> const& item)
114{
115 XRPL_ASSERT(ledgerSeq != 0 && item, "xrpl::SkipListAcquire::processData : valid inputs");
117 if (isDone())
118 return;
119
120 JLOG(journal_.trace()) << "got data for " << hash_;
121 try
122 {
123 if (auto sle = std::make_shared<SLE>(SerialIter{item->slice()}, item->key()); sle)
124 {
125 if (auto const& skipList = sle->getFieldV256(sfHashes).value(); !skipList.empty())
126 onSkipListAcquired(skipList, ledgerSeq, sl);
127 return;
128 }
129 }
130 catch (...) // NOLINT(bugprone-empty-catch)
131 {
132 }
133
134 failed_ = true;
135 JLOG(journal_.error()) << "failed to retrieve Skip list from verified data " << hash_;
136 notify(sl);
137}
138
139void
141{
143 dataReadyCallbacks_.emplace_back(std::move(cb));
144 if (isDone())
145 {
146 JLOG(journal_.debug()) << "task added to a finished SkipListAcquire " << hash_;
147 notify(sl);
148 }
149}
150
153{
154 ScopedLockType const sl(mtx_);
155 return data_;
156}
157
158void
160{
161 if (auto const hashIndex = ledger->read(keylet::skip());
162 hashIndex && hashIndex->isFieldPresent(sfHashes))
163 {
164 auto const& slist = hashIndex->getFieldV256(sfHashes).value();
165 if (!slist.empty())
166 {
167 onSkipListAcquired(slist, ledger->seq(), sl);
168 return;
169 }
170 }
171
172 failed_ = true;
173 JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger " << hash_;
174 notify(sl);
175}
176
177void
179 std::vector<uint256> const& skipList,
180 std::uint32_t ledgerSeq,
181 ScopedLockType& sl)
182{
183 complete_ = true;
184 data_ = std::make_shared<SkipListData>(ledgerSeq, skipList);
185 JLOG(journal_.debug()) << "Skip list acquired " << hash_;
186 notify(sl);
187}
188
189void
191{
192 XRPL_ASSERT(isDone(), "xrpl::SkipListAcquire::notify : is done");
195 auto const good = !failed_;
196 sl.unlock();
197
198 for (auto& cb : toCall)
199 {
200 cb(good, hash_);
201 }
202
203 sl.lock();
204}
205
206} // namespace xrpl
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
virtual LedgerMaster & getLedgerMaster()=0
void addDataCallback(OnSkipListDataCB &&cb)
Add a callback that will be called when the skipList is ready or failed.
std::uint32_t noFeaturePeerCount_
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > peerSet_
std::shared_ptr< SkipListData const > data_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
void notify(ScopedLockType &sl)
Call the OnSkipListDataCB callbacks.
std::vector< OnSkipListDataCB > dataReadyCallbacks_
std::shared_ptr< SkipListData const > getData() const
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
InboundLedgers & inboundLedgers_
void onSkipListAcquired(std::vector< uint256 > const &skipList, std::uint32_t ledgerSeq, ScopedLockType &sl)
Process the skip list.
void retrieveSkipList(std::shared_ptr< Ledger const > const &ledger, ScopedLockType &sl)
Retrieve the skip list from the ledger.
void processData(std::uint32_t ledgerSeq, boost::intrusive_ptr< SHAMapItem const > const &item)
Process the data extracted from a peer's reply.
SkipListAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::unique_ptr< PeerSet > peerSet)
Constructor.
void init(int numPeers)
Start the SkipListAcquire task.
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
pointer data()
Definition base_uint.h:101
static constexpr std::size_t size()
Definition base_uint.h:499
T is_same_v
T lock(T... args)
auto constexpr SUB_TASK_FALLBACK_TIMEOUT
auto constexpr MAX_NO_FEATURE_PEER_COUNT
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS
Keylet const & skip() noexcept
The index of the "short" skip list.
Definition Indexes.cpp:177
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ jtREPLAY_TASK
Definition Job.h:40
uint256 key
Definition Keylet.h:20
T swap(T... args)
T unlock(T... args)