rippled
Loading...
Searching...
No Matches
LedgerDeltaAcquire.cpp
1#include <xrpld/app/ledger/BuildLedger.h>
2#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/LedgerReplay.h>
4#include <xrpld/app/ledger/LedgerReplayer.h>
5#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/core/JobQueue.h>
8#include <xrpld/overlay/PeerSet.h>
9
10namespace ripple {
11
13 Application& app,
14 InboundLedgers& inboundLedgers,
15 uint256 const& ledgerHash,
16 std::uint32_t ledgerSeq,
19 app,
20 ledgerHash,
21 LedgerReplayParameters::SUB_TASK_TIMEOUT,
23 "LedgerReplayDelta",
25 app.journal("LedgerReplayDelta"))
26 , inboundLedgers_(inboundLedgers)
27 , ledgerSeq_(ledgerSeq)
28 , peerSet_(std::move(peerSet))
29{
30 JLOG(journal_.trace()) << "Create " << hash_ << " Seq " << ledgerSeq;
31}
32
34{
35 JLOG(journal_.trace()) << "Destroy " << hash_;
36}
37
38void
40{
42 if (!isDone())
43 {
44 trigger(numPeers, sl);
45 setTimer(sl);
46 }
47}
48
49void
51{
53 if (fullLedger_)
54 {
55 complete_ = true;
56 JLOG(journal_.trace()) << "existing ledger " << hash_;
57 notify(sl);
58 return;
59 }
60
61 if (!fallBack_)
62 {
63 peerSet_->addPeers(
64 limit,
65 [this](auto peer) {
66 return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
67 peer->hasLedger(hash_, ledgerSeq_);
68 },
69 [this](auto peer) {
70 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
71 {
72 JLOG(journal_.trace())
73 << "Add a peer " << peer->id() << " for " << hash_;
74 protocol::TMReplayDeltaRequest request;
75 request.set_ledgerhash(hash_.data(), hash_.size());
76 peerSet_->sendRequest(request, peer);
77 }
78 else
79 {
80 if (++noFeaturePeerCount >=
82 {
83 JLOG(journal_.debug()) << "Fall back for " << hash_;
86 fallBack_ = true;
87 }
88 }
89 });
90 }
91
92 if (fallBack_)
95}
96
97void
99{
100 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
102 {
103 failed_ = true;
104 JLOG(journal_.debug()) << "too many timeouts " << hash_;
105 notify(sl);
106 }
107 else
108 {
109 trigger(1, sl);
110 }
111}
112
118
119void
121 LedgerInfo const& info,
123{
125 JLOG(journal_.trace()) << "got data for " << hash_;
126 if (isDone())
127 return;
128
129 if (info.seq == ledgerSeq_)
130 {
131 // create a temporary ledger for building a LedgerReplay object later
134 if (replayTemp_)
135 {
136 complete_ = true;
137 orderedTxns_ = std::move(orderedTxns);
138 JLOG(journal_.debug()) << "ready to replay " << hash_;
139 notify(sl);
140 return;
141 }
142 }
143
144 failed_ = true;
145 JLOG(journal_.error())
146 << "failed to create a (info only) ledger from verified data " << hash_;
147 notify(sl);
148}
149
150void
153 OnDeltaDataCB&& cb)
154{
156 dataReadyCallbacks_.emplace_back(std::move(cb));
157 if (reasons_.count(reason) == 0)
158 {
159 reasons_.emplace(reason);
160 if (fullLedger_)
161 onLedgerBuilt(sl, reason);
162 }
163
164 if (isDone())
165 {
166 JLOG(journal_.debug())
167 << "task added to a finished LedgerDeltaAcquire " << hash_;
168 notify(sl);
169 }
170}
171
174{
176
177 if (fullLedger_)
178 return fullLedger_;
179
180 if (failed_ || !complete_ || !replayTemp_)
181 return {};
182
183 XRPL_ASSERT(
184 parent->seq() + 1 == replayTemp_->seq(),
185 "ripple::LedgerDeltaAcquire::tryBuild : parent sequence match");
186 XRPL_ASSERT(
187 parent->info().hash == replayTemp_->info().parentHash,
188 "ripple::LedgerDeltaAcquire::tryBuild : parent hash match");
189 // build ledger
190 LedgerReplay replayData(parent, replayTemp_, std::move(orderedTxns_));
192 if (fullLedger_ && fullLedger_->info().hash == hash_)
193 {
194 JLOG(journal_.info()) << "Built " << hash_;
195 onLedgerBuilt(sl);
196 return fullLedger_;
197 }
198 else
199 {
200 failed_ = true;
201 complete_ = false;
202 JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent "
203 << parent->info().hash;
204 Throw<std::runtime_error>("Cannot replay ledger");
205 }
206}
207
208void
210 ScopedLockType& sl,
212{
213 JLOG(journal_.debug()) << "onLedgerBuilt " << hash_
214 << (reason ? " for a new reason" : "");
215
218 bool firstTime = true;
219 if (reason) // small chance
220 {
221 reasons.clear();
222 reasons.push_back(*reason);
223 firstTime = false;
224 }
227 "onLedgerBuilt",
228 [=, ledger = this->fullLedger_, &app = this->app_]() {
229 for (auto reason : reasons)
230 {
231 switch (reason)
232 {
234 app.getLedgerMaster().storeLedger(ledger);
235 break;
236 default:
237 // TODO for other use cases
238 break;
239 }
240 }
241
242 if (firstTime)
243 app.getLedgerMaster().tryAdvance();
244 });
245}
246
247void
249{
250 XRPL_ASSERT(isDone(), "ripple::LedgerDeltaAcquire::notify : is done");
253 auto const good = !failed_;
254 sl.unlock();
255
256 for (auto& cb : toCall)
257 {
258 cb(good, hash_);
259 }
260
261 sl.lock();
262}
263
264} // namespace ripple
T begin(T... args)
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
virtual Config & config()=0
virtual JobQueue & getJobQueue()=0
virtual Family & getNodeFamily()=0
virtual LedgerMaster & getLedgerMaster()=0
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
std::shared_ptr< Ledger const > fullLedger_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
std::set< InboundLedger::Reason > reasons_
LedgerDeltaAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::uint32_t ledgerSeq, std::unique_ptr< PeerSet > peerSet)
Constructor.
void processData(LedgerInfo const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&orderedTxns)
Process the data extracted from a peer's reply.
std::map< std::uint32_t, std::shared_ptr< STTx const > > orderedTxns_
std::shared_ptr< Ledger const > tryBuild(std::shared_ptr< Ledger const > const &parent)
Try to build the ledger if not already.
std::shared_ptr< Ledger const > replayTemp_
void init(int numPeers)
Start the LedgerDeltaAcquire task.
std::vector< OnDeltaDataCB > dataReadyCallbacks_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::unique_ptr< PeerSet > peerSet_
void notify(ScopedLockType &sl)
Call the OnDeltaDataCB callbacks.
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void addDataCallback(InboundLedger::Reason reason, OnDeltaDataCB &&cb)
Add a reason and a callback to the LedgerDeltaAcquire subtask.
void onLedgerBuilt(ScopedLockType &sl, std::optional< InboundLedger::Reason > reason={})
Process a newly built ledger, such as store it.
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
This class is an "active" object.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
static constexpr std::size_t size()
Definition base_uint.h:507
T count(T... args)
T emplace(T... args)
T end(T... args)
T is_same_v
T lock(T... args)
auto constexpr SUB_TASK_FALLBACK_TIMEOUT
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS
auto constexpr MAX_NO_FEATURE_PEER_COUNT
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, bool const closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
@ tapNONE
Definition ApplyView.h:12
@ jtREPLAY_TASK
Definition Job.h:42
Information about the notional ledger backing the view.
T swap(T... args)
T unlock(T... args)