rippled
Loading...
Searching...
No Matches
LedgerDeltaAcquire.cpp
1#include <xrpld/app/ledger/BuildLedger.h>
2#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/LedgerReplay.h>
4#include <xrpld/app/ledger/LedgerReplayer.h>
5#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/overlay/PeerSet.h>
8
9#include <xrpl/core/JobQueue.h>
10
11namespace xrpl {
12
14 Application& app,
15 InboundLedgers& inboundLedgers,
16 uint256 const& ledgerHash,
17 std::uint32_t ledgerSeq,
20 app,
21 ledgerHash,
22 LedgerReplayParameters::SUB_TASK_TIMEOUT,
24 app.getJournal("LedgerReplayDelta"))
25 , inboundLedgers_(inboundLedgers)
26 , ledgerSeq_(ledgerSeq)
27 , peerSet_(std::move(peerSet))
28{
29 JLOG(journal_.trace()) << "Create " << hash_ << " Seq " << ledgerSeq;
30}
31
33{
34 JLOG(journal_.trace()) << "Destroy " << hash_;
35}
36
37void
39{
41 if (!isDone())
42 {
43 trigger(numPeers, sl);
44 setTimer(sl);
45 }
46}
47
48void
50{
52 if (fullLedger_)
53 {
54 complete_ = true;
55 JLOG(journal_.trace()) << "existing ledger " << hash_;
56 notify(sl);
57 return;
58 }
59
60 if (!fallBack_)
61 {
62 peerSet_->addPeers(
63 limit,
64 [this](auto peer) {
65 return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
66 peer->hasLedger(hash_, ledgerSeq_);
67 },
68 [this](auto peer) {
69 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
70 {
71 JLOG(journal_.trace()) << "Add a peer " << peer->id() << " for " << hash_;
72 protocol::TMReplayDeltaRequest request;
73 request.set_ledgerhash(hash_.data(), hash_.size());
74 peerSet_->sendRequest(request, peer);
75 }
76 else
77 {
79 {
80 JLOG(journal_.debug()) << "Fall back for " << hash_;
82 fallBack_ = true;
83 }
84 }
85 });
86 }
87
88 if (fallBack_)
90}
91
92void
94{
95 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
97 {
98 failed_ = true;
99 JLOG(journal_.debug()) << "too many timeouts " << hash_;
100 notify(sl);
101 }
102 else
103 {
104 trigger(1, sl);
105 }
106}
107
113
114void
116 LedgerHeader const& info,
118{
120 JLOG(journal_.trace()) << "got data for " << hash_;
121 if (isDone())
122 return;
123
124 if (info.seq == ledgerSeq_)
125 {
126 // create a temporary ledger for building a LedgerReplay object later
127 Rules const rules{app_.config().features};
129 if (replayTemp_)
130 {
131 complete_ = true;
132 orderedTxns_ = std::move(orderedTxns);
133 JLOG(journal_.debug()) << "ready to replay " << hash_;
134 notify(sl);
135 return;
136 }
137 }
138
139 failed_ = true;
140 JLOG(journal_.error()) << "failed to create a (info only) ledger from verified data " << hash_;
141 notify(sl);
142}
143
144void
146{
148 dataReadyCallbacks_.emplace_back(std::move(cb));
149 if (!reasons_.contains(reason))
150 {
151 reasons_.emplace(reason);
152 if (fullLedger_)
153 onLedgerBuilt(sl, reason);
154 }
155
156 if (isDone())
157 {
158 JLOG(journal_.debug()) << "task added to a finished LedgerDeltaAcquire " << hash_;
159 notify(sl);
160 }
161}
162
165{
167
168 if (fullLedger_)
169 return fullLedger_;
170
171 if (failed_ || !complete_ || !replayTemp_)
172 return {};
173
174 XRPL_ASSERT(
175 parent->seq() + 1 == replayTemp_->seq(),
176 "xrpl::LedgerDeltaAcquire::tryBuild : parent sequence match");
177 XRPL_ASSERT(
178 parent->header().hash == replayTemp_->header().parentHash,
179 "xrpl::LedgerDeltaAcquire::tryBuild : parent hash match");
180 // build ledger
181 LedgerReplay const replayData(parent, replayTemp_, std::move(orderedTxns_));
183 if (fullLedger_ && fullLedger_->header().hash == hash_)
184 {
185 JLOG(journal_.info()) << "Built " << hash_;
186 onLedgerBuilt(sl);
187 return fullLedger_;
188 }
189
190 failed_ = true;
191 complete_ = false;
192 JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent "
193 << parent->header().hash;
194 Throw<std::runtime_error>("Cannot replay ledger");
195}
196
197void
199{
200 JLOG(journal_.debug()) << "onLedgerBuilt " << hash_ << (reason ? " for a new reason" : "");
201
203 bool firstTime = true;
204 if (reason) // small chance
205 {
206 reasons.clear();
207 reasons.push_back(*reason);
208 firstTime = false;
209 }
211 jtREPLAY_TASK, "OnLedBuilt", [=, ledger = this->fullLedger_, &app = this->app_]() {
212 for (auto reason : reasons)
213 {
214 switch (reason)
215 {
217 app.getLedgerMaster().storeLedger(ledger);
218 break;
219 default:
220 // TODO for other use cases
221 break;
222 }
223 }
224
225 if (firstTime)
226 app.getLedgerMaster().tryAdvance();
227 });
228}
229
230void
232{
233 XRPL_ASSERT(isDone(), "xrpl::LedgerDeltaAcquire::notify : is done");
236 auto const good = !failed_;
237 sl.unlock();
238
239 for (auto& cb : toCall)
240 {
241 cb(good, hash_);
242 }
243
244 sl.lock();
245}
246
247} // namespace xrpl
T begin(T... args)
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
virtual Config & config()=0
std::unordered_set< uint256, beast::uhash<> > features
Definition Config.h:261
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
std::uint32_t const ledgerSeq_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
void init(int numPeers)
Start the LedgerDeltaAcquire task.
std::shared_ptr< Ledger const > fullLedger_
void onLedgerBuilt(ScopedLockType &sl, std::optional< InboundLedger::Reason > reason={})
Process a newly built ledger, such as store it.
LedgerDeltaAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::uint32_t ledgerSeq, std::unique_ptr< PeerSet > peerSet)
Constructor.
std::vector< OnDeltaDataCB > dataReadyCallbacks_
void processData(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&orderedTxns)
Process the data extracted from a peer's reply.
std::unique_ptr< PeerSet > peerSet_
std::set< InboundLedger::Reason > reasons_
void notify(ScopedLockType &sl)
Call the OnDeltaDataCB callbacks.
std::shared_ptr< Ledger const > replayTemp_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::shared_ptr< Ledger const > tryBuild(std::shared_ptr< Ledger const > const &parent)
Try to build the ledger if not already.
std::map< std::uint32_t, std::shared_ptr< STTx const > > orderedTxns_
void addDataCallback(InboundLedger::Reason reason, OnDeltaDataCB &&cb)
Add a reason and a callback to the LedgerDeltaAcquire subtask.
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
Rules controlling protocol behavior.
Definition Rules.h:18
virtual JobQueue & getJobQueue()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual Family & getNodeFamily()=0
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
pointer data()
Definition base_uint.h:101
static constexpr std::size_t size()
Definition base_uint.h:499
T contains(T... args)
T emplace(T... args)
T end(T... args)
T is_same_v
T lock(T... args)
auto constexpr SUB_TASK_FALLBACK_TIMEOUT
auto constexpr MAX_NO_FEATURE_PEER_COUNT
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ jtREPLAY_TASK
Definition Job.h:40
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, bool const closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
@ tapNONE
Definition ApplyView.h:11
Information about the notional ledger backing the view.
T swap(T... args)
T unlock(T... args)