rippled
Loading...
Searching...
No Matches
LedgerDeltaAcquire.cpp
1#include <xrpld/app/ledger/BuildLedger.h>
2#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/LedgerReplay.h>
4#include <xrpld/app/ledger/LedgerReplayer.h>
5#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/overlay/PeerSet.h>
8
9#include <xrpl/core/JobQueue.h>
10
11namespace xrpl {
12
14 Application& app,
15 InboundLedgers& inboundLedgers,
16 uint256 const& ledgerHash,
17 std::uint32_t ledgerSeq,
20 app,
21 ledgerHash,
22 LedgerReplayParameters::SUB_TASK_TIMEOUT,
24 "LedgerReplayDelta",
26 app.journal("LedgerReplayDelta"))
27 , inboundLedgers_(inboundLedgers)
28 , ledgerSeq_(ledgerSeq)
29 , peerSet_(std::move(peerSet))
30{
31 JLOG(journal_.trace()) << "Create " << hash_ << " Seq " << ledgerSeq;
32}
33
35{
36 JLOG(journal_.trace()) << "Destroy " << hash_;
37}
38
39void
41{
43 if (!isDone())
44 {
45 trigger(numPeers, sl);
46 setTimer(sl);
47 }
48}
49
50void
52{
54 if (fullLedger_)
55 {
56 complete_ = true;
57 JLOG(journal_.trace()) << "existing ledger " << hash_;
58 notify(sl);
59 return;
60 }
61
62 if (!fallBack_)
63 {
64 peerSet_->addPeers(
65 limit,
66 [this](auto peer) {
67 return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
68 peer->hasLedger(hash_, ledgerSeq_);
69 },
70 [this](auto peer) {
71 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
72 {
73 JLOG(journal_.trace())
74 << "Add a peer " << peer->id() << " for " << hash_;
75 protocol::TMReplayDeltaRequest request;
76 request.set_ledgerhash(hash_.data(), hash_.size());
77 peerSet_->sendRequest(request, peer);
78 }
79 else
80 {
81 if (++noFeaturePeerCount >=
83 {
84 JLOG(journal_.debug()) << "Fall back for " << hash_;
87 fallBack_ = true;
88 }
89 }
90 });
91 }
92
93 if (fallBack_)
96}
97
98void
100{
101 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
103 {
104 failed_ = true;
105 JLOG(journal_.debug()) << "too many timeouts " << hash_;
106 notify(sl);
107 }
108 else
109 {
110 trigger(1, sl);
111 }
112}
113
119
120void
122 LedgerHeader const& info,
124{
126 JLOG(journal_.trace()) << "got data for " << hash_;
127 if (isDone())
128 return;
129
130 if (info.seq == ledgerSeq_)
131 {
132 // create a temporary ledger for building a LedgerReplay object later
135 if (replayTemp_)
136 {
137 complete_ = true;
138 orderedTxns_ = std::move(orderedTxns);
139 JLOG(journal_.debug()) << "ready to replay " << hash_;
140 notify(sl);
141 return;
142 }
143 }
144
145 failed_ = true;
146 JLOG(journal_.error())
147 << "failed to create a (info only) ledger from verified data " << hash_;
148 notify(sl);
149}
150
151void
154 OnDeltaDataCB&& cb)
155{
157 dataReadyCallbacks_.emplace_back(std::move(cb));
158 if (reasons_.count(reason) == 0)
159 {
160 reasons_.emplace(reason);
161 if (fullLedger_)
162 onLedgerBuilt(sl, reason);
163 }
164
165 if (isDone())
166 {
167 JLOG(journal_.debug())
168 << "task added to a finished LedgerDeltaAcquire " << hash_;
169 notify(sl);
170 }
171}
172
175{
177
178 if (fullLedger_)
179 return fullLedger_;
180
181 if (failed_ || !complete_ || !replayTemp_)
182 return {};
183
184 XRPL_ASSERT(
185 parent->seq() + 1 == replayTemp_->seq(),
186 "xrpl::LedgerDeltaAcquire::tryBuild : parent sequence match");
187 XRPL_ASSERT(
188 parent->header().hash == replayTemp_->header().parentHash,
189 "xrpl::LedgerDeltaAcquire::tryBuild : parent hash match");
190 // build ledger
191 LedgerReplay replayData(parent, replayTemp_, std::move(orderedTxns_));
193 if (fullLedger_ && fullLedger_->header().hash == hash_)
194 {
195 JLOG(journal_.info()) << "Built " << hash_;
196 onLedgerBuilt(sl);
197 return fullLedger_;
198 }
199 else
200 {
201 failed_ = true;
202 complete_ = false;
203 JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent "
204 << parent->header().hash;
205 Throw<std::runtime_error>("Cannot replay ledger");
206 }
207}
208
209void
211 ScopedLockType& sl,
213{
214 JLOG(journal_.debug()) << "onLedgerBuilt " << hash_
215 << (reason ? " for a new reason" : "");
216
219 bool firstTime = true;
220 if (reason) // small chance
221 {
222 reasons.clear();
223 reasons.push_back(*reason);
224 firstTime = false;
225 }
228 "onLedgerBuilt",
229 [=, ledger = this->fullLedger_, &app = this->app_]() {
230 for (auto reason : reasons)
231 {
232 switch (reason)
233 {
235 app.getLedgerMaster().storeLedger(ledger);
236 break;
237 default:
238 // TODO for other use cases
239 break;
240 }
241 }
242
243 if (firstTime)
244 app.getLedgerMaster().tryAdvance();
245 });
246}
247
248void
250{
251 XRPL_ASSERT(isDone(), "xrpl::LedgerDeltaAcquire::notify : is done");
254 auto const good = !failed_;
255 sl.unlock();
256
257 for (auto& cb : toCall)
258 {
259 cb(good, hash_);
260 }
261
262 sl.lock();
263}
264
265} // namespace xrpl
T begin(T... args)
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
virtual Config & config()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual Family & getNodeFamily()=0
virtual JobQueue & getJobQueue()=0
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:148
std::uint32_t const ledgerSeq_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
void init(int numPeers)
Start the LedgerDeltaAcquire task.
std::shared_ptr< Ledger const > fullLedger_
void onLedgerBuilt(ScopedLockType &sl, std::optional< InboundLedger::Reason > reason={})
Process a newly built ledger, such as store it.
LedgerDeltaAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::uint32_t ledgerSeq, std::unique_ptr< PeerSet > peerSet)
Constructor.
std::vector< OnDeltaDataCB > dataReadyCallbacks_
void processData(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&orderedTxns)
Process the data extracted from a peer's reply.
std::unique_ptr< PeerSet > peerSet_
std::set< InboundLedger::Reason > reasons_
void notify(ScopedLockType &sl)
Call the OnDeltaDataCB callbacks.
std::shared_ptr< Ledger const > replayTemp_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::shared_ptr< Ledger const > tryBuild(std::shared_ptr< Ledger const > const &parent)
Try to build the ledger if not already.
std::map< std::uint32_t, std::shared_ptr< STTx const > > orderedTxns_
void addDataCallback(InboundLedger::Reason reason, OnDeltaDataCB &&cb)
Add a reason and a callback to the LedgerDeltaAcquire subtask.
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
pointer data()
Definition base_uint.h:106
static constexpr std::size_t size()
Definition base_uint.h:507
T count(T... args)
T emplace(T... args)
T end(T... args)
T is_same_v
T lock(T... args)
auto constexpr SUB_TASK_FALLBACK_TIMEOUT
auto constexpr MAX_NO_FEATURE_PEER_COUNT
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
@ jtREPLAY_TASK
Definition Job.h:41
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, bool const closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
@ tapNONE
Definition ApplyView.h:12
Information about the notional ledger backing the view.
T swap(T... args)
T unlock(T... args)