xrpld
Loading...
Searching...
No Matches
SkipListAcquire.cpp
1#include <xrpld/app/ledger/detail/SkipListAcquire.h>
2
3#include <xrpld/app/ledger/InboundLedger.h>
4#include <xrpld/app/ledger/LedgerReplayer.h>
5#include <xrpld/app/ledger/detail/TimeoutCounter.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/overlay/Peer.h>
8#include <xrpld/overlay/PeerSet.h>
9
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/base_uint.h>
12#include <xrpl/beast/utility/instrumentation.h>
13#include <xrpl/core/Job.h>
14#include <xrpl/protocol/Indexes.h>
15#include <xrpl/protocol/SField.h>
16#include <xrpl/shamap/SHAMapItem.h>
17
18#include <boost/smart_ptr/intrusive_ptr.hpp>
19
20#include <xrpl.pb.h>
21
22#include <cstddef>
23#include <cstdint>
24#include <memory>
25#include <utility>
26#include <vector>
27
28namespace xrpl {
29
31 Application& app,
32 InboundLedgers& inboundLedgers,
33 uint256 const& ledgerHash,
36 app,
37 ledgerHash,
38 LedgerReplayParameters::kSubTaskTimeout,
39 {.jobType = JtReplayTask,
40 .jobName = "SkipListAcq",
42 app.getJournal("LedgerReplaySkipList"))
43 , inboundLedgers_(inboundLedgers)
44 , peerSet_(std::move(peerSet))
45{
46 JLOG(journal_.trace()) << "Create " << hash_;
47}
48
50{
51 JLOG(journal_.trace()) << "Destroy " << hash_;
52}
53
54void
56{
58 if (!isDone())
59 {
60 trigger(numPeers, sl);
61 setTimer(sl);
62 }
63}
64
65void
67{
68 if (auto const l = app_.getLedgerMaster().getLedgerByHash(hash_); l)
69 {
70 JLOG(journal_.trace()) << "existing ledger " << hash_;
71 retrieveSkipList(l, sl);
72 return;
73 }
74
75 if (!fallBack_)
76 {
77 peerSet_->addPeers(
78 limit,
79 [this](auto peer) {
80 return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
81 peer->hasLedger(hash_, 0);
82 },
83 [this](auto peer) {
84 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
85 {
86 JLOG(journal_.trace()) << "Add a peer " << peer->id() << " for " << hash_;
87 protocol::TMProofPathRequest request;
88 request.set_ledgerhash(hash_.data(), hash_.size());
89 request.set_key(keylet::skip().key.data(), keylet::skip().key.size());
90 request.set_type(protocol::TMLedgerMapType::lmACCOUNT_STATE);
91 peerSet_->sendRequest(request, peer);
92 }
93 else
94 {
95 JLOG(journal_.trace())
96 << "Add a no feature peer " << peer->id() << " for " << hash_;
98 {
99 JLOG(journal_.debug()) << "Fall back for " << hash_;
101 fallBack_ = true;
102 }
103 }
104 });
105 }
106
107 if (fallBack_)
109}
110
111void
113{
114 JLOG(journal_.trace()) << "timeouts_=" << timeouts_ << " for " << hash_;
116 {
117 failed_ = true;
118 JLOG(journal_.debug()) << "too many timeouts " << hash_;
119 notify(sl);
120 }
121 else
122 {
123 trigger(1, sl);
124 }
125}
126
132
133void
135 std::uint32_t ledgerSeq,
136 boost::intrusive_ptr<SHAMapItem const> const& item)
137{
138 XRPL_ASSERT(ledgerSeq != 0 && item, "xrpl::SkipListAcquire::processData : valid inputs");
140 if (isDone())
141 return;
142
143 JLOG(journal_.trace()) << "got data for " << hash_;
144 try
145 {
146 if (auto sle = std::make_shared<SLE>(SerialIter{item->slice()}, item->key()); sle)
147 {
148 if (auto const& skipList = sle->getFieldV256(sfHashes).value(); !skipList.empty())
149 onSkipListAcquired(skipList, ledgerSeq, sl);
150 return;
151 }
152 }
153 catch (...) // NOLINT(bugprone-empty-catch)
154 {
155 }
156
157 failed_ = true;
158 JLOG(journal_.error()) << "failed to retrieve Skip list from verified data " << hash_;
159 notify(sl);
160}
161
162void
164{
166 dataReadyCallbacks_.emplace_back(std::move(cb));
167 if (isDone())
168 {
169 JLOG(journal_.debug()) << "task added to a finished SkipListAcquire " << hash_;
170 notify(sl);
171 }
172}
173
176{
177 ScopedLockType const sl(mtx_);
178 return data_;
179}
180
181void
183{
184 if (auto const hashIndex = ledger->read(keylet::skip());
185 hashIndex && hashIndex->isFieldPresent(sfHashes))
186 {
187 auto const& slist = hashIndex->getFieldV256(sfHashes).value();
188 if (!slist.empty())
189 {
190 onSkipListAcquired(slist, ledger->seq(), sl);
191 return;
192 }
193 }
194
195 failed_ = true;
196 JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger " << hash_;
197 notify(sl);
198}
199
200void
202 std::vector<uint256> const& skipList,
203 std::uint32_t ledgerSeq,
204 ScopedLockType& sl)
205{
206 complete_ = true;
207 data_ = std::make_shared<SkipListData>(ledgerSeq, skipList);
208 JLOG(journal_.debug()) << "Skip list acquired " << hash_;
209 notify(sl);
210}
211
212void
214{
215 XRPL_ASSERT(isDone(), "xrpl::SkipListAcquire::notify : is done");
218 auto const good = !failed_;
219 sl.unlock();
220
221 for (auto& cb : toCall)
222 {
223 cb(good, hash_);
224 }
225
226 sl.lock();
227}
228
229} // namespace xrpl
Manages the lifetime of inbound ledgers.
void addDataCallback(OnSkipListDataCB &&cb)
Add a callback that will be called when the skipList is ready or failed.
std::uint32_t noFeaturePeerCount_
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > peerSet_
std::shared_ptr< SkipListData const > data_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
void notify(ScopedLockType &sl)
Call the OnSkipListDataCB callbacks.
std::vector< OnSkipListDataCB > dataReadyCallbacks_
std::shared_ptr< SkipListData const > getData() const
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
InboundLedgers & inboundLedgers_
std::function< void(bool successful, uint256 const &hash)> OnSkipListDataCB
A callback used to notify that the SkipList is ready or failed.
void onSkipListAcquired(std::vector< uint256 > const &skipList, std::uint32_t ledgerSeq, ScopedLockType &sl)
Process the skip list.
void retrieveSkipList(std::shared_ptr< Ledger const > const &ledger, ScopedLockType &sl)
Retrieve the skip list from the ledger.
void processData(std::uint32_t ledgerSeq, boost::intrusive_ptr< SHAMapItem const > const &item)
Process the data extracted from a peer's reply.
SkipListAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::unique_ptr< PeerSet > peerSet)
Constructor.
void init(int numPeers)
Start the SkipListAcquire task.
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
T lock(T... args)
T make_shared(T... args)
constexpr auto kSubTaskFallbackTimeout
constexpr auto kMaxNoFeaturePeerCount
constexpr std::uint32_t kSubTaskMaxTimeouts
constexpr std::uint32_t kMaxQueuedTasks
Keylet const & skip() noexcept
The index of the "short" skip list.
Definition Indexes.cpp:198
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ JtReplayTask
Definition Job.h:42
BaseUInt< 256 > uint256
Definition base_uint.h:562
T swap(T... args)
T unlock(T... args)