Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerLoader.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/Types.hpp"
25#include "etl/MPTHelpers.hpp"
26#include "etl/NFTHelpers.hpp"
27#include "etl/SystemState.hpp"
28#include "etl/impl/LedgerFetcher.hpp"
30#include "util/Assert.hpp"
31#include "util/LedgerUtils.hpp"
32#include "util/Profiler.hpp"
33#include "util/log/Logger.hpp"
34
35#include <xrpl/basics/base_uint.h>
36#include <xrpl/basics/strHex.h>
37#include <xrpl/beast/core/CurrentThreadName.h>
38#include <xrpl/protocol/LedgerHeader.h>
39#include <xrpl/protocol/STTx.h>
40#include <xrpl/protocol/Serializer.h>
41#include <xrpl/protocol/TxMeta.h>
42
43#include <chrono>
44#include <cstddef>
45#include <cstdint>
46#include <functional>
47#include <memory>
48#include <optional>
49#include <string>
50#include <utility>
51#include <vector>
52
57 std::vector<AccountTransactionsData> accountTxData;
58 std::vector<NFTTransactionsData> nfTokenTxData;
59 std::vector<NFTsData> nfTokensData;
60 std::vector<MPTHolderData> mptHoldersData;
61 std::vector<NFTsData> nfTokenURIChanges;
62};
63
64namespace etl::impl {
65
69template <typename LedgerFetcherType>
71public:
72 using GetLedgerResponseType = etlng::LoadBalancerInterface::GetLedgerResponseType;
73 using OptionalGetLedgerResponseType = etlng::LoadBalancerInterface::OptionalGetLedgerResponseType;
74 using RawLedgerObjectType = etlng::LoadBalancerInterface::RawLedgerObjectType;
75
76private:
77 util::Logger log_{"ETL"};
78
79 std::shared_ptr<BackendInterface> backend_;
80 std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
81 std::reference_wrapper<LedgerFetcherType> fetcher_;
82 std::reference_wrapper<SystemState const> state_; // shared state for ETL
83
84public:
89 std::shared_ptr<BackendInterface> backend,
90 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
91 LedgerFetcherType& fetcher,
92 SystemState const& state
93 )
94 : backend_{std::move(backend)}
95 , loadBalancer_{std::move(balancer)}
96 , fetcher_{std::ref(fetcher)}
97 , state_{std::cref(state)}
98 {
99 }
100
112 insertTransactions(ripple::LedgerHeader const& ledger, GetLedgerResponseType& data)
113 {
115
116 std::vector<NFTsData> nfTokenURIChanges;
117 for (auto& txn : *(data.mutable_transactions_list()->mutable_transactions())) {
118 std::string* raw = txn.mutable_transaction_blob();
119
120 ripple::SerialIter it{raw->data(), raw->size()};
121 ripple::STTx const sttx{it};
122
123 LOG(log_.trace()) << "Inserting transaction = " << sttx.getTransactionID();
124
125 ripple::TxMeta const txMeta{sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
126
127 auto const [nftTxs, maybeNFT] = getNFTDataFromTx(txMeta, sttx);
128 result.nfTokenTxData.insert(result.nfTokenTxData.end(), nftTxs.begin(), nftTxs.end());
129
130 // We need to unique the URI changes separately, in case the URI changes are discarded
131 if (maybeNFT) {
132 if (maybeNFT->onlyUriChanged) {
133 nfTokenURIChanges.push_back(*maybeNFT);
134 } else {
135 result.nfTokensData.push_back(*maybeNFT);
136 }
137 }
138
139 auto const maybeMPTHolder = getMPTHolderFromTx(txMeta, sttx);
140 if (maybeMPTHolder)
141 result.mptHoldersData.push_back(*maybeMPTHolder);
142
143 result.accountTxData.emplace_back(txMeta, sttx.getTransactionID());
144 static constexpr std::size_t kEY_SIZE = 32;
145 std::string keyStr{reinterpret_cast<char const*>(sttx.getTransactionID().data()), kEY_SIZE};
146 backend_->writeTransaction(
147 std::move(keyStr),
148 ledger.seq,
149 ledger.closeTime.time_since_epoch().count(),
150 std::move(*raw),
151 std::move(*txn.mutable_metadata_blob())
152 );
153 }
154
155 result.nfTokensData = getUniqueNFTsDatas(result.nfTokensData);
156 nfTokenURIChanges = getUniqueNFTsDatas(nfTokenURIChanges);
157
158 // Put uri change at the end to ensure the uri not overwritten
159 result.nfTokensData.insert(result.nfTokensData.end(), nfTokenURIChanges.begin(), nfTokenURIChanges.end());
160 return result;
161 }
162
171 std::optional<ripple::LedgerHeader>
172 loadInitialLedger(uint32_t sequence)
173 {
174 // check that database is actually empty
175 auto rng = backend_->hardFetchLedgerRangeNoThrow();
176 if (rng) {
177 ASSERT(false, "Database is not empty");
178 return {};
179 }
180
181 // Fetch the ledger from the network. This function will not return until either the fetch is successful, or the
182 // server is being shutdown. This only fetches the ledger header and the transactions+metadata
183 OptionalGetLedgerResponseType ledgerData{fetcher_.get().fetchData(sequence)};
184 if (!ledgerData)
185 return {};
186
187 ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(ledgerData->ledger_header()));
188
189 LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo);
190
191 auto timeDiff = ::util::timed<std::chrono::duration<double>>([this, sequence, &lgrInfo, &ledgerData]() {
192 backend_->startWrites();
193
194 LOG(log_.debug()) << "Started writes";
195
196 backend_->writeLedger(lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
197
198 LOG(log_.debug()) << "Wrote ledger";
199 FormattedTransactionsData insertTxResult = insertTransactions(lgrInfo, *ledgerData);
200 LOG(log_.debug()) << "Inserted txns";
201
202 // download the full account state map. This function downloads full
203 // ledger data and pushes the downloaded data into the writeQueue.
204 // asyncWriter consumes from the queue and inserts the data into the
205 // Ledger object. Once the below call returns, all data has been pushed
206 // into the queue
207 auto edgeKeys = loadBalancer_->loadInitialLedger(sequence);
208
209 size_t numWrites = 0;
210 backend_->cache().setFull();
211
212 auto seconds = ::util::timed<std::chrono::seconds>([this, keys = std::move(edgeKeys), sequence, &numWrites](
213 ) mutable {
214 for (auto& key : keys) {
215 LOG(log_.debug()) << "Writing edge key = " << ripple::strHex(key);
216 auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence);
217 if (succ)
218 backend_->writeSuccessor(std::move(key), sequence, uint256ToString(succ->key));
219 }
220
221 ripple::uint256 prev = data::kFIRST_KEY;
222 while (auto cur = backend_->cache().getSuccessor(prev, sequence)) {
223 ASSERT(cur.has_value(), "Successor for key {} must exist", ripple::strHex(prev));
224 if (prev == data::kFIRST_KEY)
225 backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(cur->key));
226
227 if (isBookDir(cur->key, cur->blob)) {
228 auto base = getBookBase(cur->key);
229 // make sure the base is not an actual object
230 if (!backend_->cache().get(base, sequence)) {
231 auto succ = backend_->cache().getSuccessor(base, sequence);
232 ASSERT(succ.has_value(), "Book base {} must have a successor", ripple::strHex(base));
233 if (succ->key == cur->key) {
234 LOG(log_.debug()) << "Writing book successor = " << ripple::strHex(base) << " - "
235 << ripple::strHex(cur->key);
236
237 backend_->writeSuccessor(uint256ToString(base), sequence, uint256ToString(cur->key));
238 }
239 }
240
241 ++numWrites;
242 }
243
244 prev = cur->key;
245 static constexpr std::size_t kLOG_INTERVAL = 100000;
246 if (numWrites % kLOG_INTERVAL == 0 && numWrites != 0)
247 LOG(log_.info()) << "Wrote " << numWrites << " book successors";
248 }
249
250 backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::kLAST_KEY));
251 ++numWrites;
252 });
253
254 LOG(log_.info()) << "Looping through cache and submitting all writes took " << seconds
255 << " seconds. numWrites = " << std::to_string(numWrites);
256
257 LOG(log_.debug()) << "Loaded initial ledger";
258
259 if (not state_.get().isStopping) {
260 backend_->writeAccountTransactions(std::move(insertTxResult.accountTxData));
261 backend_->writeNFTs(insertTxResult.nfTokensData);
262 backend_->writeNFTTransactions(insertTxResult.nfTokenTxData);
263 backend_->writeMPTHolders(insertTxResult.mptHoldersData);
264 }
265
266 backend_->finishWrites(sequence);
267 });
268
269 LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
270 return lgrInfo;
271 }
272};
273
274} // namespace etl::impl
ripple::uint256 getBookBase(T const &key)
Get the book base.
Definition DBHelpers.hpp:292
std::string uint256ToString(ripple::uint256 const &input)
Stringify a ripple::uint256.
Definition DBHelpers.hpp:312
bool isBookDir(T const &key, R const &object)
Check whether the supplied object is a book dir.
Definition DBHelpers.hpp:258
Loads ledger data into the DB.
Definition LedgerLoader.hpp:70
FormattedTransactionsData insertTransactions(ripple::LedgerHeader const &ledger, GetLedgerResponseType &data)
Insert extracted transaction into the ledger.
Definition LedgerLoader.hpp:112
LedgerLoader(std::shared_ptr< BackendInterface > backend, std::shared_ptr< etlng::LoadBalancerInterface > balancer, LedgerFetcherType &fetcher, SystemState const &state)
Create an instance of the loader.
Definition LedgerLoader.hpp:88
std::optional< ripple::LedgerHeader > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full.
Definition LedgerLoader.hpp:172
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:209
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
std::optional< MPTHolderData > getMPTHolderFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull MPT data from TX via ETLService.
Definition MPTHelpers.cpp:63
std::vector< NFTsData > getUniqueNFTsDatas(std::vector< NFTsData > const &nfts)
Get the unique NFTs data from a vector of NFTsData happening in the same ledger. For example,...
Definition NFTHelpers.cpp:376
std::pair< std::vector< NFTTransactionsData >, std::optional< NFTsData > > getNFTDataFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull NFT data from TX via ETLService.
Definition NFTHelpers.cpp:329
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:204
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::string toString(ripple::LedgerHeader const &info)
A helper function that converts a ripple::LedgerHeader to a string representation.
Definition LedgerUtils.hpp:216
Account transactions, NFT transactions and NFT data bundled togeher.
Definition LedgerLoader.hpp:56
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33