71 using GetLedgerResponseType =
typename LedgerLoaderType::GetLedgerResponseType;
72 using RawLedgerObjectType =
typename LedgerLoaderType::RawLedgerObjectType;
76 std::reference_wrapper<DataPipeType> pipe_;
77 std::shared_ptr<BackendInterface> backend_;
78 std::reference_wrapper<LedgerLoaderType> loader_;
79 std::reference_wrapper<LedgerPublisherType> publisher_;
80 std::reference_wrapper<AmendmentBlockHandlerType> amendmentBlockHandler_;
82 uint32_t startSequence_;
83 std::reference_wrapper<SystemState> state_;
96 std::shared_ptr<BackendInterface> backend,
97 LedgerLoaderType& loader,
98 LedgerPublisherType& publisher,
99 AmendmentBlockHandlerType& amendmentBlockHandler,
100 uint32_t startSequence,
103 : pipe_{std::ref(pipe)}
104 , backend_{std::move(backend)}
105 , loader_{std::ref(loader)}
106 , publisher_{std::ref(publisher)}
107 , amendmentBlockHandler_{std::ref(amendmentBlockHandler)}
108 , startSequence_{startSequence}
109 , state_{std::ref(state)}
111 thread_ = std::thread([
this]() { process(); });
119 if (thread_.joinable())
129 ASSERT(thread_.joinable(),
"Transformer thread must be joinable");
137 beast::setCurrentThreadName(
"ETLService transform");
138 uint32_t currentSequence = startSequence_;
140 while (not hasWriteConflict()) {
141 auto fetchResponse = pipe_.get().popNext(currentSequence);
152 auto const start = std::chrono::system_clock::now();
153 auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
156 auto const numTxns = fetchResponse->transactions_list().transactions_size();
157 auto const numObjects = fetchResponse->ledger_objects().objects_size();
158 auto const end = std::chrono::system_clock::now();
159 auto const duration = ((end - start).count()) / 1000000000.0;
161 LOG(log_.
info()) <<
"Load phase of ETL. Successfully wrote ledger! Ledger info: "
163 <<
". object count = " << numObjects <<
". load time = " << duration
164 <<
". load txns per second = " << numTxns / duration
165 <<
". load objs per second = " << numObjects / duration;
168 publisher_.get().publish(lgrInfo);
173 setWriteConflict(not success);
184 std::pair<ripple::LedgerHeader, bool>
185 buildNextLedger(GetLedgerResponseType& rawData)
187 LOG(log_.
debug()) <<
"Beginning ledger update";
191 backend_->startWrites();
192 backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header()));
194 writeSuccessors(lgrInfo, rawData);
195 std::optional<FormattedTransactionsData> insertTxResultOp;
197 updateCache(lgrInfo, rawData);
199 LOG(log_.
debug()) <<
"Inserted/modified/deleted all objects. Number of objects = "
200 << rawData.ledger_objects().objects_size();
202 insertTxResultOp.emplace(loader_.get().insertTransactions(lgrInfo, rawData));
203 }
catch (std::runtime_error
const& e) {
204 LOG(log_.
fatal()) <<
"Failed to build next ledger: " << e.what();
206 amendmentBlockHandler_.get().notifyAmendmentBlocked();
207 return {ripple::LedgerHeader{},
false};
210 LOG(log_.
debug()) <<
"Inserted all transactions. Number of transactions = "
211 << rawData.transactions_list().transactions_size();
213 backend_->writeAccountTransactions(std::move(insertTxResultOp->accountTxData));
214 backend_->writeNFTs(insertTxResultOp->nfTokensData);
215 backend_->writeNFTTransactions(insertTxResultOp->nfTokenTxData);
216 backend_->writeMPTHolders(insertTxResultOp->mptHoldersData);
218 auto [success, duration] =
221 LOG(log_.
debug()) <<
"Finished writes. Total time: " << std::to_string(duration);
224 return {lgrInfo, success};
234 updateCache(ripple::LedgerHeader
const& lgrInfo, GetLedgerResponseType& rawData)
236 std::vector<data::LedgerObject> cacheUpdates;
237 cacheUpdates.reserve(rawData.ledger_objects().objects_size());
240 std::set<ripple::uint256> bookSuccessorsToCalculate;
241 std::set<ripple::uint256> modified;
243 for (
auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
244 auto key = ripple::uint256::fromVoidChecked(obj.key());
245 ASSERT(key.has_value(),
"Failed to deserialize key from void");
247 cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
248 LOG(log_.
debug()) <<
"key = " << ripple::strHex(*key) <<
" - mod type = " << obj.mod_type();
250 if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included()) {
251 LOG(log_.
debug()) <<
"object neighbors not included. using cache";
253 if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1)
254 throw std::logic_error(
"Cache is not full, but object neighbors were not included");
256 auto const blob = obj.mutable_data();
257 auto checkBookBase =
false;
258 auto const isDeleted = (blob->size() == 0);
261 auto const old = backend_->cache().get(*key, lgrInfo.seq - 1);
262 ASSERT(old.has_value(),
"Deleted object {} must be in cache", ripple::strHex(*key));
269 LOG(log_.
debug()) <<
"Is book dir. Key = " << ripple::strHex(*key);
272 auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
274 oldFirstDir.has_value(),
275 "Book base must have a successor for lgrInfo.seq - 1 = {}",
281 if ((isDeleted && key == oldFirstDir->key) || (!isDeleted && key < oldFirstDir->key)) {
283 <<
"Need to recalculate book base successor. base = " << ripple::strHex(bookBase)
284 <<
" - key = " << ripple::strHex(*key) <<
" - isDeleted = " << isDeleted
285 <<
" - seq = " << lgrInfo.seq;
286 bookSuccessorsToCalculate.insert(bookBase);
291 if (obj.mod_type() == RawLedgerObjectType::MODIFIED)
292 modified.insert(*key);
294 backend_->writeLedgerObject(std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()));
297 backend_->cache().update(cacheUpdates, lgrInfo.seq);
300 if (!rawData.object_neighbors_included()) {
301 LOG(log_.
debug()) <<
"object neighbors not included. using cache";
302 if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq)
303 throw std::logic_error(
"Cache is not full, but object neighbors were not included");
305 for (
auto const& obj : cacheUpdates) {
306 if (modified.contains(obj.key))
309 auto lb = backend_->cache().getPredecessor(obj.key, lgrInfo.seq);
311 lb = {.key = data::kFIRST_KEY, .blob = {}};
313 auto ub = backend_->cache().getSuccessor(obj.key, lgrInfo.seq);
315 ub = {.key = data::kLAST_KEY, .blob = {}};
317 if (obj.blob.empty()) {
318 LOG(log_.
debug()) <<
"writing successor for deleted object " << ripple::strHex(obj.key) <<
" - "
319 << ripple::strHex(lb->key) <<
" - " << ripple::strHex(ub->key);
326 LOG(log_.
debug()) <<
"writing successor for new object " << ripple::strHex(lb->key) <<
" - "
327 << ripple::strHex(obj.key) <<
" - " << ripple::strHex(ub->key);
331 for (
auto const& base : bookSuccessorsToCalculate) {
332 auto succ = backend_->cache().getSuccessor(base, lgrInfo.seq);
336 LOG(log_.
debug()) <<
"Updating book successor " << ripple::strHex(base) <<
" - "
337 << ripple::strHex(succ->key);
341 LOG(log_.
debug()) <<
"Updating book successor " << ripple::strHex(base) <<
" - "
342 << ripple::strHex(data::kLAST_KEY);
355 writeSuccessors(ripple::LedgerHeader
const& lgrInfo, GetLedgerResponseType& rawData)
358 if (rawData.object_neighbors_included()) {
359 LOG(log_.
debug()) <<
"object neighbors included";
361 for (
auto& obj : *(rawData.mutable_book_successors())) {
362 auto firstBook = std::move(*obj.mutable_first_book());
363 if (!firstBook.size())
365 LOG(log_.
debug()) <<
"writing book successor " << ripple::strHex(obj.book_base()) <<
" - "
366 << ripple::strHex(firstBook);
368 backend_->writeSuccessor(std::move(*obj.mutable_book_base()), lgrInfo.seq, std::move(firstBook));
371 for (
auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
372 if (obj.mod_type() != RawLedgerObjectType::MODIFIED) {
373 std::string* predPtr = obj.mutable_predecessor();
374 if (predPtr->empty())
376 std::string* succPtr = obj.mutable_successor();
377 if (succPtr->empty())
380 if (obj.mod_type() == RawLedgerObjectType::DELETED) {
381 LOG(log_.
debug()) <<
"Modifying successors for deleted object " << ripple::strHex(obj.key())
382 <<
" - " << ripple::strHex(*predPtr) <<
" - " << ripple::strHex(*succPtr);
384 backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::move(*succPtr));
386 LOG(log_.
debug()) <<
"adding successor for new object " << ripple::strHex(obj.key()) <<
" - "
387 << ripple::strHex(*predPtr) <<
" - " << ripple::strHex(*succPtr);
389 backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::string{obj.key()});
390 backend_->writeSuccessor(std::string{obj.key()}, lgrInfo.seq, std::move(*succPtr));
393 LOG(log_.
debug()) <<
"object modified " << ripple::strHex(obj.key());
402 return state_.get().isStopping;
407 hasWriteConflict()
const
409 return state_.get().writeConflict;
418 setWriteConflict(
bool conflict)
420 state_.get().writeConflict = conflict;