22#include "data/BackendInterface.hpp"
24#include "data/Types.hpp"
27#include "etl/SystemState.hpp"
28#include "etl/impl/LedgerFetcher.hpp"
29#include "util/Assert.hpp"
30#include "util/LedgerUtils.hpp"
31#include "util/Profiler.hpp"
32#include "util/log/Logger.hpp"
34#include <xrpl/basics/base_uint.h>
35#include <xrpl/basics/strHex.h>
36#include <xrpl/beast/core/CurrentThreadName.h>
37#include <xrpl/protocol/LedgerHeader.h>
38#include <xrpl/protocol/STTx.h>
39#include <xrpl/protocol/Serializer.h>
40#include <xrpl/protocol/TxMeta.h>
56 std::vector<AccountTransactionsData> accountTxData;
57 std::vector<NFTTransactionsData> nfTokenTxData;
58 std::vector<NFTsData> nfTokensData;
59 std::vector<MPTHolderData> mptHoldersData;
60 std::vector<NFTsData> nfTokenURIChanges;
68template <
typename LoadBalancerType,
typename LedgerFetcherType>
71 using GetLedgerResponseType =
typename LoadBalancerType::GetLedgerResponseType;
72 using OptionalGetLedgerResponseType =
typename LoadBalancerType::OptionalGetLedgerResponseType;
73 using RawLedgerObjectType =
typename LoadBalancerType::RawLedgerObjectType;
78 std::shared_ptr<BackendInterface> backend_;
79 std::shared_ptr<LoadBalancerType> loadBalancer_;
80 std::reference_wrapper<LedgerFetcherType> fetcher_;
81 std::reference_wrapper<SystemState const> state_;
88 std::shared_ptr<BackendInterface> backend,
89 std::shared_ptr<LoadBalancerType> balancer,
90 LedgerFetcherType& fetcher,
93 : backend_{std::move(backend)}
94 , loadBalancer_{std::move(balancer)}
95 , fetcher_{std::ref(fetcher)}
96 , state_{std::cref(state)}
115 std::vector<NFTsData> nfTokenURIChanges;
116 for (
auto& txn : *(
data.mutable_transactions_list()->mutable_transactions())) {
117 std::string* raw = txn.mutable_transaction_blob();
119 ripple::SerialIter it{raw->data(), raw->size()};
120 ripple::STTx
const sttx{it};
122 LOG(log_.
trace()) <<
"Inserting transaction = " << sttx.getTransactionID();
124 ripple::TxMeta txMeta{sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
127 result.nfTokenTxData.insert(result.nfTokenTxData.end(), nftTxs.begin(), nftTxs.end());
131 if (maybeNFT->onlyUriChanged) {
132 nfTokenURIChanges.push_back(*maybeNFT);
134 result.nfTokensData.push_back(*maybeNFT);
140 result.mptHoldersData.push_back(*maybeMPTHolder);
142 result.accountTxData.emplace_back(txMeta, sttx.getTransactionID());
143 static constexpr std::size_t kEY_SIZE = 32;
144 std::string keyStr{
reinterpret_cast<char const*
>(sttx.getTransactionID().
data()), kEY_SIZE};
145 backend_->writeTransaction(
148 ledger.closeTime.time_since_epoch().count(),
150 std::move(*txn.mutable_metadata_blob())
158 result.nfTokensData.insert(result.nfTokensData.end(), nfTokenURIChanges.begin(), nfTokenURIChanges.end());
170 std::optional<ripple::LedgerHeader>
174 auto rng = backend_->hardFetchLedgerRangeNoThrow();
176 ASSERT(
false,
"Database is not empty");
182 OptionalGetLedgerResponseType ledgerData{fetcher_.get().fetchData(sequence)};
191 backend_->startWrites();
193 LOG(log_.
debug()) <<
"Started writes";
195 backend_->writeLedger(lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
197 LOG(log_.
debug()) <<
"Wrote ledger";
199 LOG(log_.
debug()) <<
"Inserted txns";
206 auto edgeKeys = loadBalancer_->loadInitialLedger(sequence);
208 size_t numWrites = 0;
209 backend_->cache().setFull();
213 for (
auto& key : keys) {
214 LOG(log_.
debug()) <<
"Writing edge key = " << ripple::strHex(key);
215 auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence);
217 backend_->writeSuccessor(std::move(key), sequence,
uint256ToString(succ->key));
220 ripple::uint256 prev = data::kFIRST_KEY;
221 while (
auto cur = backend_->cache().getSuccessor(prev, sequence)) {
222 ASSERT(cur.has_value(),
"Succesor for key {} must exist", ripple::strHex(prev));
223 if (prev == data::kFIRST_KEY)
229 if (!backend_->cache().get(base, sequence)) {
230 auto succ = backend_->cache().getSuccessor(base, sequence);
231 ASSERT(succ.has_value(),
"Book base {} must have a successor", ripple::strHex(base));
232 if (succ->key == cur->key) {
233 LOG(log_.
debug()) <<
"Writing book successor = " << ripple::strHex(base) <<
" - "
234 << ripple::strHex(cur->key);
244 static constexpr std::size_t kLOG_INTERVAL = 100000;
245 if (numWrites % kLOG_INTERVAL == 0 && numWrites != 0)
246 LOG(log_.
info()) <<
"Wrote " << numWrites <<
" book successors";
253 LOG(log_.
info()) <<
"Looping through cache and submitting all writes took " << seconds
254 <<
" seconds. numWrites = " << std::to_string(numWrites);
256 LOG(log_.
debug()) <<
"Loaded initial ledger";
258 if (not state_.get().isStopping) {
259 backend_->writeAccountTransactions(std::move(insertTxResult.accountTxData));
260 backend_->writeNFTs(insertTxResult.nfTokensData);
261 backend_->writeNFTTransactions(insertTxResult.nfTokenTxData);
262 backend_->writeMPTHolders(insertTxResult.mptHoldersData);
265 backend_->finishWrites(sequence);
268 LOG(log_.
debug()) <<
"Time to download and store ledger = " << timeDiff;
ripple::uint256 getBookBase(T const &key)
Get the book base.
Definition DBHelpers.hpp:292
std::string uint256ToString(ripple::uint256 const &input)
Stringify a ripple::uint256.
Definition DBHelpers.hpp:312
bool isBookDir(T const &key, R const &object)
Check whether the supplied object is a book dir.
Definition DBHelpers.hpp:258
Loads ledger data into the DB.
Definition LedgerLoader.hpp:69
FormattedTransactionsData insertTransactions(ripple::LedgerHeader const &ledger, GetLedgerResponseType &data)
Insert extracted transaction into the ledger.
Definition LedgerLoader.hpp:111
std::optional< ripple::LedgerHeader > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full.
Definition LedgerLoader.hpp:171
LedgerLoader(std::shared_ptr< BackendInterface > backend, std::shared_ptr< LoadBalancerType > balancer, LedgerFetcherType &fetcher, SystemState const &state)
Create an instance of the loader.
Definition LedgerLoader.hpp:87
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:195
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
std::optional< MPTHolderData > getMPTHolderFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull MPT data from TX via ETLService.
Definition MPTHelpers.cpp:63
std::vector< NFTsData > getUniqueNFTsDatas(std::vector< NFTsData > const &nfts)
Get the unique NFTs data from a vector of NFTsData happening in the same ledger. For example,...
Definition NFTHelpers.cpp:376
std::pair< std::vector< NFTTransactionsData >, std::optional< NFTsData > > getNFTDataFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull NFT data from TX via ETLService.
Definition NFTHelpers.cpp:329
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:203
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::string toString(ripple::LedgerHeader const &info)
A helper function that converts a ripple::LedgerHeader to a string representation.
Definition LedgerUtils.hpp:215
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33