rippled
Loading...
Searching...
No Matches
SkipListAcquire.cpp
1#include <xrpld/app/ledger/InboundLedger.h>
2#include <xrpld/app/ledger/LedgerReplayer.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
4#include <xrpld/app/main/Application.h>
5#include <xrpld/overlay/PeerSet.h>
6
7namespace ripple {
8
10 Application& app,
11 InboundLedgers& inboundLedgers,
12 uint256 const& ledgerHash,
15 app,
16 ledgerHash,
17 LedgerReplayParameters::SUB_TASK_TIMEOUT,
19 "SkipListAcquire",
21 app.journal("LedgerReplaySkipList"))
22 , inboundLedgers_(inboundLedgers)
23 , peerSet_(std::move(peerSet))
24{
25 JLOG(journal_.trace()) << "Create " << hash_;
26}
27
29{
30 JLOG(journal_.trace()) << "Destroy " << hash_;
31}
32
33void
35{
37 if (!isDone())
38 {
39 trigger(numPeers, sl);
40 setTimer(sl);
41 }
42}
43
44void
46{
47 if (auto const l = app_.getLedgerMaster().getLedgerByHash(hash_); l)
48 {
49 JLOG(journal_.trace()) << "existing ledger " << hash_;
50 retrieveSkipList(l, sl);
51 return;
52 }
53
54 if (!fallBack_)
55 {
56 peerSet_->addPeers(
57 limit,
58 [this](auto peer) {
59 return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
60 peer->hasLedger(hash_, 0);
61 },
62 [this](auto peer) {
63 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
64 {
65 JLOG(journal_.trace())
66 << "Add a peer " << peer->id() << " for " << hash_;
67 protocol::TMProofPathRequest request;
68 request.set_ledgerhash(hash_.data(), hash_.size());
69 request.set_key(
70 keylet::skip().key.data(), keylet::skip().key.size());
71 request.set_type(
72 protocol::TMLedgerMapType::lmACCOUNT_STATE);
73 peerSet_->sendRequest(request, peer);
74 }
75 else
76 {
77 JLOG(journal_.trace()) << "Add a no feature peer "
78 << peer->id() << " for " << hash_;
79 if (++noFeaturePeerCount_ >=
81 {
82 JLOG(journal_.debug()) << "Fall back for " << hash_;
85 fallBack_ = true;
86 }
87 }
88 });
89 }
90
91 if (fallBack_)
93}
94
95void
97{
98 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
100 {
101 failed_ = true;
102 JLOG(journal_.debug()) << "too many timeouts " << hash_;
103 notify(sl);
104 }
105 else
106 {
107 trigger(1, sl);
108 }
109}
110
116
117void
119 std::uint32_t ledgerSeq,
120 boost::intrusive_ptr<SHAMapItem const> const& item)
121{
122 XRPL_ASSERT(
123 ledgerSeq != 0 && item,
124 "ripple::SkipListAcquire::processData : valid inputs");
126 if (isDone())
127 return;
128
129 JLOG(journal_.trace()) << "got data for " << hash_;
130 try
131 {
132 if (auto sle =
133 std::make_shared<SLE>(SerialIter{item->slice()}, item->key());
134 sle)
135 {
136 if (auto const& skipList = sle->getFieldV256(sfHashes).value();
137 !skipList.empty())
138 onSkipListAcquired(skipList, ledgerSeq, sl);
139 return;
140 }
141 }
142 catch (...)
143 {
144 }
145
146 failed_ = true;
147 JLOG(journal_.error()) << "failed to retrieve Skip list from verified data "
148 << hash_;
149 notify(sl);
150}
151
152void
154{
156 dataReadyCallbacks_.emplace_back(std::move(cb));
157 if (isDone())
158 {
159 JLOG(journal_.debug())
160 << "task added to a finished SkipListAcquire " << hash_;
161 notify(sl);
162 }
163}
164
167{
169 return data_;
170}
171
172void
174 std::shared_ptr<Ledger const> const& ledger,
175 ScopedLockType& sl)
176{
177 if (auto const hashIndex = ledger->read(keylet::skip());
178 hashIndex && hashIndex->isFieldPresent(sfHashes))
179 {
180 auto const& slist = hashIndex->getFieldV256(sfHashes).value();
181 if (!slist.empty())
182 {
183 onSkipListAcquired(slist, ledger->seq(), sl);
184 return;
185 }
186 }
187
188 failed_ = true;
189 JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger "
190 << hash_;
191 notify(sl);
192}
193
194void
196 std::vector<uint256> const& skipList,
197 std::uint32_t ledgerSeq,
198 ScopedLockType& sl)
199{
200 complete_ = true;
201 data_ = std::make_shared<SkipListData>(ledgerSeq, skipList);
202 JLOG(journal_.debug()) << "Skip list acquired " << hash_;
203 notify(sl);
204}
205
206void
208{
209 XRPL_ASSERT(isDone(), "ripple::SkipListAcquire::notify : is done");
212 auto const good = !failed_;
213 sl.unlock();
214
215 for (auto& cb : toCall)
216 {
217 cb(good, hash_);
218 }
219
220 sl.lock();
221}
222
223} // namespace ripple
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
virtual LedgerMaster & getLedgerMaster()=0
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
void init(int numPeers)
Start the SkipListAcquire task.
void addDataCallback(OnSkipListDataCB &&cb)
Add a callback that will be called when the skipList is ready or failed.
InboundLedgers & inboundLedgers_
std::shared_ptr< SkipListData const > getData() const
SkipListAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::unique_ptr< PeerSet > peerSet)
Constructor.
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
void notify(ScopedLockType &sl)
Call the OnSkipListDataCB callbacks.
void retrieveSkipList(std::shared_ptr< Ledger const > const &ledger, ScopedLockType &sl)
Retrieve the skip list from the ledger.
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
std::shared_ptr< SkipListData const > data_
void processData(std::uint32_t ledgerSeq, boost::intrusive_ptr< SHAMapItem const > const &item)
Process the data extracted from a peer's reply.
void onSkipListAcquired(std::vector< uint256 > const &skipList, std::uint32_t ledgerSeq, ScopedLockType &sl)
Process the skip list.
std::vector< OnSkipListDataCB > dataReadyCallbacks_
std::unique_ptr< PeerSet > peerSet_
std::uint32_t noFeaturePeerCount_
This class is an "active" object.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
static constexpr std::size_t size()
Definition base_uint.h:507
T is_same_v
T lock(T... args)
auto constexpr SUB_TASK_FALLBACK_TIMEOUT
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS
auto constexpr MAX_NO_FEATURE_PEER_COUNT
Keylet const & skip() noexcept
The index of the "short" skip list.
Definition Indexes.cpp:177
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
@ jtREPLAY_TASK
Definition Job.h:42
uint256 key
Definition Keylet.h:21
T swap(T... args)
T unlock(T... args)