xrpld
Loading...
Searching...
No Matches
LedgerDeltaAcquire.cpp
1#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
2
3#include <xrpld/app/ledger/BuildLedger.h>
4#include <xrpld/app/ledger/InboundLedger.h>
5#include <xrpld/app/ledger/LedgerReplay.h>
6#include <xrpld/app/ledger/LedgerReplayer.h>
7#include <xrpld/app/ledger/detail/TimeoutCounter.h>
8#include <xrpld/app/main/Application.h>
9#include <xrpld/overlay/Peer.h>
10#include <xrpld/overlay/PeerSet.h>
11
12#include <xrpl/basics/Log.h>
13#include <xrpl/basics/base_uint.h>
14#include <xrpl/basics/contract.h>
15#include <xrpl/beast/utility/instrumentation.h>
16#include <xrpl/core/Job.h>
17#include <xrpl/core/JobQueue.h>
18#include <xrpl/ledger/ApplyView.h>
19#include <xrpl/protocol/LedgerHeader.h>
20#include <xrpl/protocol/Rules.h>
21
22#include <xrpl.pb.h>
23
24#include <cstddef>
25#include <cstdint>
26#include <map>
27#include <memory>
28#include <optional>
29#include <stdexcept>
30#include <utility>
31#include <vector>
32
33namespace xrpl {
34
36 Application& app,
37 InboundLedgers& inboundLedgers,
38 uint256 const& ledgerHash,
39 std::uint32_t ledgerSeq,
42 app,
43 ledgerHash,
44 LedgerReplayParameters::kSubTaskTimeout,
45 {.jobType = JtReplayTask,
46 .jobName = "LedReplDelta",
48 app.getJournal("LedgerReplayDelta"))
49 , inboundLedgers_(inboundLedgers)
50 , ledgerSeq_(ledgerSeq)
51 , peerSet_(std::move(peerSet))
52{
53 JLOG(journal_.trace()) << "Create " << hash_ << " Seq " << ledgerSeq;
54}
55
57{
58 JLOG(journal_.trace()) << "Destroy " << hash_;
59}
60
61void
63{
65 if (!isDone())
66 {
67 trigger(numPeers, sl);
68 setTimer(sl);
69 }
70}
71
72void
74{
75 fullLedger_ = app_.getLedgerMaster().getLedgerByHash(hash_);
76 if (fullLedger_)
77 {
78 complete_ = true;
79 JLOG(journal_.trace()) << "existing ledger " << hash_;
80 notify(sl);
81 return;
82 }
83
84 if (!fallBack_)
85 {
86 peerSet_->addPeers(
87 limit,
88 [this](auto peer) {
89 return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
90 peer->hasLedger(hash_, ledgerSeq_);
91 },
92 [this](auto peer) {
93 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
94 {
95 JLOG(journal_.trace()) << "Add a peer " << peer->id() << " for " << hash_;
96 protocol::TMReplayDeltaRequest request;
97 request.set_ledgerhash(hash_.data(), hash_.size());
98 peerSet_->sendRequest(request, peer);
99 }
100 else
101 {
103 {
104 JLOG(journal_.debug()) << "Fall back for " << hash_;
106 fallBack_ = true;
107 }
108 }
109 });
110 }
111
112 if (fallBack_)
114}
115
116void
118{
119 JLOG(journal_.trace()) << "timeouts_=" << timeouts_ << " for " << hash_;
121 {
122 failed_ = true;
123 JLOG(journal_.debug()) << "too many timeouts " << hash_;
124 notify(sl);
125 }
126 else
127 {
128 trigger(1, sl);
129 }
130}
131
137
138void
140 LedgerHeader const& info,
142{
144 JLOG(journal_.trace()) << "got data for " << hash_;
145 if (isDone())
146 return;
147
148 if (info.seq == ledgerSeq_)
149 {
150 // create a temporary ledger for building a LedgerReplay object later
151 Rules const rules{app_.config().features};
152 replayTemp_ = std::make_shared<Ledger>(info, rules, app_.getNodeFamily());
153 if (replayTemp_)
154 {
155 complete_ = true;
156 orderedTxns_ = std::move(orderedTxns);
157 JLOG(journal_.debug()) << "ready to replay " << hash_;
158 notify(sl);
159 return;
160 }
161 }
162
163 failed_ = true;
164 JLOG(journal_.error()) << "failed to create a (info only) ledger from verified data " << hash_;
165 notify(sl);
166}
167
168void
170{
172 dataReadyCallbacks_.emplace_back(std::move(cb));
173 if (!reasons_.contains(reason))
174 {
175 reasons_.emplace(reason);
176 if (fullLedger_)
177 onLedgerBuilt(sl, reason);
178 }
179
180 if (isDone())
181 {
182 JLOG(journal_.debug()) << "task added to a finished LedgerDeltaAcquire " << hash_;
183 notify(sl);
184 }
185}
186
189{
191
192 if (fullLedger_)
193 return fullLedger_;
194
195 if (failed_ || !complete_ || !replayTemp_)
196 return {};
197
198 XRPL_ASSERT(
199 parent->seq() + 1 == replayTemp_->seq(),
200 "xrpl::LedgerDeltaAcquire::tryBuild : parent sequence match");
201 XRPL_ASSERT(
202 parent->header().hash == replayTemp_->header().parentHash,
203 "xrpl::LedgerDeltaAcquire::tryBuild : parent hash match");
204 // build ledger
205 LedgerReplay const replayData(parent, replayTemp_, std::move(orderedTxns_));
207 if (fullLedger_ && fullLedger_->header().hash == hash_)
208 {
209 JLOG(journal_.info()) << "Built " << hash_;
210 onLedgerBuilt(sl);
211 return fullLedger_;
212 }
213
214 failed_ = true;
215 complete_ = false;
216 JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent "
217 << parent->header().hash;
218 Throw<std::runtime_error>("Cannot replay ledger");
219}
220
221void
223{
224 JLOG(journal_.debug()) << "onLedgerBuilt " << hash_ << (reason ? " for a new reason" : "");
225
227 bool firstTime = true;
228 if (reason) // small chance
229 {
230 reasons.clear();
231 reasons.push_back(*reason);
232 firstTime = false;
233 }
234 app_.getJobQueue().addJob(
235 JtReplayTask, "OnLedBuilt", [=, ledger = this->fullLedger_, &app = this->app_]() {
236 for (auto reason : reasons)
237 {
238 switch (reason)
239 {
241 app.getLedgerMaster().storeLedger(ledger);
242 break;
243 default:
244 // TODO for other use cases
245 break;
246 }
247 }
248
249 if (firstTime)
250 app.getLedgerMaster().tryAdvance();
251 });
252}
253
254void
256{
257 XRPL_ASSERT(isDone(), "xrpl::LedgerDeltaAcquire::notify : is done");
260 auto const good = !failed_;
261 sl.unlock();
262
263 for (auto& cb : toCall)
264 {
265 cb(good, hash_);
266 }
267
268 sl.lock();
269}
270
271} // namespace xrpl
Manages the lifetime of inbound ledgers.
std::uint32_t const ledgerSeq_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
void init(int numPeers)
Start the LedgerDeltaAcquire task.
std::shared_ptr< Ledger const > fullLedger_
void onLedgerBuilt(ScopedLockType &sl, std::optional< InboundLedger::Reason > reason={})
Process a newly built ledger, such as store it.
LedgerDeltaAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::uint32_t ledgerSeq, std::unique_ptr< PeerSet > peerSet)
Constructor.
std::vector< OnDeltaDataCB > dataReadyCallbacks_
void processData(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&orderedTxns)
Process the data extracted from a peer's reply.
std::unique_ptr< PeerSet > peerSet_
std::set< InboundLedger::Reason > reasons_
void notify(ScopedLockType &sl)
Call the OnDeltaDataCB callbacks.
std::shared_ptr< Ledger const > replayTemp_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::shared_ptr< Ledger const > tryBuild(std::shared_ptr< Ledger const > const &parent)
Try to build the ledger if not already.
std::map< std::uint32_t, std::shared_ptr< STTx const > > orderedTxns_
void addDataCallback(InboundLedger::Reason reason, OnDeltaDataCB &&cb)
Add a reason and a callback to the LedgerDeltaAcquire subtask.
std::function< void(bool successful, uint256 const &hash)> OnDeltaDataCB
A callback used to notify that the delta's data is ready or failed.
Rules controlling protocol behavior.
Definition Rules.h:33
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
T lock(T... args)
T make_shared(T... args)
constexpr auto kSubTaskFallbackTimeout
constexpr auto kMaxNoFeaturePeerCount
constexpr std::uint32_t kSubTaskMaxTimeouts
constexpr std::uint32_t kMaxQueuedTasks
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ JtReplayTask
Definition Job.h:42
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, bool const closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
@ TapNone
Definition ApplyView.h:13
BaseUInt< 256 > uint256
Definition base_uint.h:562
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
Information about the notional ledger backing the view.
T swap(T... args)
T unlock(T... args)