22#include "data/BackendInterface.hpp"
24#include "data/Types.hpp"
27#include "etl/SystemState.hpp"
28#include "etl/impl/LedgerFetcher.hpp"
30#include "util/Assert.hpp"
31#include "util/LedgerUtils.hpp"
32#include "util/Profiler.hpp"
33#include "util/log/Logger.hpp"
35#include <xrpl/basics/base_uint.h>
36#include <xrpl/basics/strHex.h>
37#include <xrpl/beast/core/CurrentThreadName.h>
38#include <xrpl/protocol/LedgerHeader.h>
39#include <xrpl/protocol/STTx.h>
40#include <xrpl/protocol/Serializer.h>
41#include <xrpl/protocol/TxMeta.h>
57 std::vector<AccountTransactionsData> accountTxData;
58 std::vector<NFTTransactionsData> nfTokenTxData;
59 std::vector<NFTsData> nfTokensData;
60 std::vector<MPTHolderData> mptHoldersData;
61 std::vector<NFTsData> nfTokenURIChanges;
69template <
typename LedgerFetcherType>
72 using GetLedgerResponseType = etlng::LoadBalancerInterface::GetLedgerResponseType;
73 using OptionalGetLedgerResponseType = etlng::LoadBalancerInterface::OptionalGetLedgerResponseType;
74 using RawLedgerObjectType = etlng::LoadBalancerInterface::RawLedgerObjectType;
79 std::shared_ptr<BackendInterface> backend_;
80 std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
81 std::reference_wrapper<LedgerFetcherType> fetcher_;
82 std::reference_wrapper<SystemState const> state_;
89 std::shared_ptr<BackendInterface> backend,
90 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
91 LedgerFetcherType& fetcher,
94 : backend_{std::move(backend)}
95 , loadBalancer_{std::move(balancer)}
96 , fetcher_{std::ref(fetcher)}
97 , state_{std::cref(state)}
116 std::vector<NFTsData> nfTokenURIChanges;
117 for (
auto& txn : *(
data.mutable_transactions_list()->mutable_transactions())) {
118 std::string* raw = txn.mutable_transaction_blob();
120 ripple::SerialIter it{raw->data(), raw->size()};
121 ripple::STTx
const sttx{it};
123 LOG(log_.
trace()) <<
"Inserting transaction = " << sttx.getTransactionID();
125 ripple::TxMeta
const txMeta{sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
128 result.nfTokenTxData.insert(result.nfTokenTxData.end(), nftTxs.begin(), nftTxs.end());
132 if (maybeNFT->onlyUriChanged) {
133 nfTokenURIChanges.push_back(*maybeNFT);
135 result.nfTokensData.push_back(*maybeNFT);
141 result.mptHoldersData.push_back(*maybeMPTHolder);
143 result.accountTxData.emplace_back(txMeta, sttx.getTransactionID());
144 static constexpr std::size_t kEY_SIZE = 32;
145 std::string keyStr{
reinterpret_cast<char const*
>(sttx.getTransactionID().
data()), kEY_SIZE};
146 backend_->writeTransaction(
149 ledger.closeTime.time_since_epoch().count(),
151 std::move(*txn.mutable_metadata_blob())
159 result.nfTokensData.insert(result.nfTokensData.end(), nfTokenURIChanges.begin(), nfTokenURIChanges.end());
171 std::optional<ripple::LedgerHeader>
175 auto rng = backend_->hardFetchLedgerRangeNoThrow();
177 ASSERT(
false,
"Database is not empty");
183 OptionalGetLedgerResponseType ledgerData{fetcher_.get().fetchData(sequence)};
192 backend_->startWrites();
194 LOG(log_.
debug()) <<
"Started writes";
196 backend_->writeLedger(lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
198 LOG(log_.
debug()) <<
"Wrote ledger";
200 LOG(log_.
debug()) <<
"Inserted txns";
207 auto edgeKeys = loadBalancer_->loadInitialLedger(sequence);
209 size_t numWrites = 0;
210 backend_->cache().setFull();
214 for (
auto& key : keys) {
215 LOG(log_.
debug()) <<
"Writing edge key = " << ripple::strHex(key);
216 auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence);
218 backend_->writeSuccessor(std::move(key), sequence,
uint256ToString(succ->key));
221 ripple::uint256 prev = data::kFIRST_KEY;
222 while (
auto cur = backend_->cache().getSuccessor(prev, sequence)) {
223 ASSERT(cur.has_value(),
"Successor for key {} must exist", ripple::strHex(prev));
224 if (prev == data::kFIRST_KEY)
230 if (!backend_->cache().get(base, sequence)) {
231 auto succ = backend_->cache().getSuccessor(base, sequence);
232 ASSERT(succ.has_value(),
"Book base {} must have a successor", ripple::strHex(base));
233 if (succ->key == cur->key) {
234 LOG(log_.
debug()) <<
"Writing book successor = " << ripple::strHex(base) <<
" - "
235 << ripple::strHex(cur->key);
245 static constexpr std::size_t kLOG_INTERVAL = 100000;
246 if (numWrites % kLOG_INTERVAL == 0 && numWrites != 0)
247 LOG(log_.
info()) <<
"Wrote " << numWrites <<
" book successors";
254 LOG(log_.
info()) <<
"Looping through cache and submitting all writes took " << seconds
255 <<
" seconds. numWrites = " << std::to_string(numWrites);
257 LOG(log_.
debug()) <<
"Loaded initial ledger";
259 if (not state_.get().isStopping) {
260 backend_->writeAccountTransactions(std::move(insertTxResult.accountTxData));
261 backend_->writeNFTs(insertTxResult.nfTokensData);
262 backend_->writeNFTTransactions(insertTxResult.nfTokenTxData);
263 backend_->writeMPTHolders(insertTxResult.mptHoldersData);
266 backend_->finishWrites(sequence);
269 LOG(log_.
debug()) <<
"Time to download and store ledger = " << timeDiff;
ripple::uint256 getBookBase(T const &key)
Get the book base.
Definition DBHelpers.hpp:292
std::string uint256ToString(ripple::uint256 const &input)
Stringify a ripple::uint256.
Definition DBHelpers.hpp:312
bool isBookDir(T const &key, R const &object)
Check whether the supplied object is a book dir.
Definition DBHelpers.hpp:258
Loads ledger data into the DB.
Definition LedgerLoader.hpp:70
FormattedTransactionsData insertTransactions(ripple::LedgerHeader const &ledger, GetLedgerResponseType &data)
Insert extracted transaction into the ledger.
Definition LedgerLoader.hpp:112
LedgerLoader(std::shared_ptr< BackendInterface > backend, std::shared_ptr< etlng::LoadBalancerInterface > balancer, LedgerFetcherType &fetcher, SystemState const &state)
Create an instance of the loader.
Definition LedgerLoader.hpp:88
std::optional< ripple::LedgerHeader > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full.
Definition LedgerLoader.hpp:172
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:209
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
std::optional< MPTHolderData > getMPTHolderFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull MPT data from TX via ETLService.
Definition MPTHelpers.cpp:63
std::vector< NFTsData > getUniqueNFTsDatas(std::vector< NFTsData > const &nfts)
Get the unique NFTs data from a vector of NFTsData happening in the same ledger. For example,...
Definition NFTHelpers.cpp:376
std::pair< std::vector< NFTTransactionsData >, std::optional< NFTsData > > getNFTDataFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull NFT data from TX via ETLService.
Definition NFTHelpers.cpp:329
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:204
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::string toString(ripple::LedgerHeader const &info)
A helper function that converts a ripple::LedgerHeader to a string representation.
Definition LedgerUtils.hpp:216
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33