Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerLoader.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/Types.hpp"
25#include "etl/MPTHelpers.hpp"
26#include "etl/NFTHelpers.hpp"
27#include "etl/SystemState.hpp"
28#include "etl/impl/LedgerFetcher.hpp"
29#include "util/Assert.hpp"
30#include "util/LedgerUtils.hpp"
31#include "util/Profiler.hpp"
32#include "util/log/Logger.hpp"
33
34#include <xrpl/basics/base_uint.h>
35#include <xrpl/basics/strHex.h>
36#include <xrpl/beast/core/CurrentThreadName.h>
37#include <xrpl/protocol/LedgerHeader.h>
38#include <xrpl/protocol/STTx.h>
39#include <xrpl/protocol/Serializer.h>
40#include <xrpl/protocol/TxMeta.h>
41
42#include <chrono>
43#include <cstddef>
44#include <cstdint>
45#include <functional>
46#include <memory>
47#include <optional>
48#include <string>
49#include <utility>
50#include <vector>
51
56 std::vector<AccountTransactionsData> accountTxData;
57 std::vector<NFTTransactionsData> nfTokenTxData;
58 std::vector<NFTsData> nfTokensData;
59 std::vector<MPTHolderData> mptHoldersData;
60 std::vector<NFTsData> nfTokenURIChanges;
61};
62
63namespace etl::impl {
64
68template <typename LoadBalancerType, typename LedgerFetcherType>
70public:
71 using GetLedgerResponseType = typename LoadBalancerType::GetLedgerResponseType;
72 using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType;
73 using RawLedgerObjectType = typename LoadBalancerType::RawLedgerObjectType;
74
75private:
76 util::Logger log_{"ETL"};
77
78 std::shared_ptr<BackendInterface> backend_;
79 std::shared_ptr<LoadBalancerType> loadBalancer_;
80 std::reference_wrapper<LedgerFetcherType> fetcher_;
81 std::reference_wrapper<SystemState const> state_; // shared state for ETL
82
83public:
88 std::shared_ptr<BackendInterface> backend,
89 std::shared_ptr<LoadBalancerType> balancer,
90 LedgerFetcherType& fetcher,
91 SystemState const& state
92 )
93 : backend_{std::move(backend)}
94 , loadBalancer_{std::move(balancer)}
95 , fetcher_{std::ref(fetcher)}
96 , state_{std::cref(state)}
97 {
98 }
99
111 insertTransactions(ripple::LedgerHeader const& ledger, GetLedgerResponseType& data)
112 {
114
115 std::vector<NFTsData> nfTokenURIChanges;
116 for (auto& txn : *(data.mutable_transactions_list()->mutable_transactions())) {
117 std::string* raw = txn.mutable_transaction_blob();
118
119 ripple::SerialIter it{raw->data(), raw->size()};
120 ripple::STTx const sttx{it};
121
122 LOG(log_.trace()) << "Inserting transaction = " << sttx.getTransactionID();
123
124 ripple::TxMeta txMeta{sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
125
126 auto const [nftTxs, maybeNFT] = getNFTDataFromTx(txMeta, sttx);
127 result.nfTokenTxData.insert(result.nfTokenTxData.end(), nftTxs.begin(), nftTxs.end());
128
129 // We need to unique the URI changes separately, in case the URI changes are discarded
130 if (maybeNFT) {
131 if (maybeNFT->onlyUriChanged) {
132 nfTokenURIChanges.push_back(*maybeNFT);
133 } else {
134 result.nfTokensData.push_back(*maybeNFT);
135 }
136 }
137
138 auto const maybeMPTHolder = getMPTHolderFromTx(txMeta, sttx);
139 if (maybeMPTHolder)
140 result.mptHoldersData.push_back(*maybeMPTHolder);
141
142 result.accountTxData.emplace_back(txMeta, sttx.getTransactionID());
143 static constexpr std::size_t kEY_SIZE = 32;
144 std::string keyStr{reinterpret_cast<char const*>(sttx.getTransactionID().data()), kEY_SIZE};
145 backend_->writeTransaction(
146 std::move(keyStr),
147 ledger.seq,
148 ledger.closeTime.time_since_epoch().count(),
149 std::move(*raw),
150 std::move(*txn.mutable_metadata_blob())
151 );
152 }
153
154 result.nfTokensData = getUniqueNFTsDatas(result.nfTokensData);
155 nfTokenURIChanges = getUniqueNFTsDatas(nfTokenURIChanges);
156
157 // Put uri change at the end to ensure the uri not overwritten
158 result.nfTokensData.insert(result.nfTokensData.end(), nfTokenURIChanges.begin(), nfTokenURIChanges.end());
159 return result;
160 }
161
170 std::optional<ripple::LedgerHeader>
171 loadInitialLedger(uint32_t sequence)
172 {
173 // check that database is actually empty
174 auto rng = backend_->hardFetchLedgerRangeNoThrow();
175 if (rng) {
176 ASSERT(false, "Database is not empty");
177 return {};
178 }
179
180 // Fetch the ledger from the network. This function will not return until either the fetch is successful, or the
181 // server is being shutdown. This only fetches the ledger header and the transactions+metadata
182 OptionalGetLedgerResponseType ledgerData{fetcher_.get().fetchData(sequence)};
183 if (!ledgerData)
184 return {};
185
186 ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(ledgerData->ledger_header()));
187
188 LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo);
189
190 auto timeDiff = ::util::timed<std::chrono::duration<double>>([this, sequence, &lgrInfo, &ledgerData]() {
191 backend_->startWrites();
192
193 LOG(log_.debug()) << "Started writes";
194
195 backend_->writeLedger(lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
196
197 LOG(log_.debug()) << "Wrote ledger";
198 FormattedTransactionsData insertTxResult = insertTransactions(lgrInfo, *ledgerData);
199 LOG(log_.debug()) << "Inserted txns";
200
201 // download the full account state map. This function downloads full
202 // ledger data and pushes the downloaded data into the writeQueue.
203 // asyncWriter consumes from the queue and inserts the data into the
204 // Ledger object. Once the below call returns, all data has been pushed
205 // into the queue
206 auto edgeKeys = loadBalancer_->loadInitialLedger(sequence);
207
208 size_t numWrites = 0;
209 backend_->cache().setFull();
210
211 auto seconds = ::util::timed<std::chrono::seconds>([this, keys = std::move(edgeKeys), sequence, &numWrites](
212 ) mutable {
213 for (auto& key : keys) {
214 LOG(log_.debug()) << "Writing edge key = " << ripple::strHex(key);
215 auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence);
216 if (succ)
217 backend_->writeSuccessor(std::move(key), sequence, uint256ToString(succ->key));
218 }
219
220 ripple::uint256 prev = data::kFIRST_KEY;
221 while (auto cur = backend_->cache().getSuccessor(prev, sequence)) {
222 ASSERT(cur.has_value(), "Succesor for key {} must exist", ripple::strHex(prev));
223 if (prev == data::kFIRST_KEY)
224 backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(cur->key));
225
226 if (isBookDir(cur->key, cur->blob)) {
227 auto base = getBookBase(cur->key);
228 // make sure the base is not an actual object
229 if (!backend_->cache().get(base, sequence)) {
230 auto succ = backend_->cache().getSuccessor(base, sequence);
231 ASSERT(succ.has_value(), "Book base {} must have a successor", ripple::strHex(base));
232 if (succ->key == cur->key) {
233 LOG(log_.debug()) << "Writing book successor = " << ripple::strHex(base) << " - "
234 << ripple::strHex(cur->key);
235
236 backend_->writeSuccessor(uint256ToString(base), sequence, uint256ToString(cur->key));
237 }
238 }
239
240 ++numWrites;
241 }
242
243 prev = cur->key;
244 static constexpr std::size_t kLOG_INTERVAL = 100000;
245 if (numWrites % kLOG_INTERVAL == 0 && numWrites != 0)
246 LOG(log_.info()) << "Wrote " << numWrites << " book successors";
247 }
248
249 backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::kLAST_KEY));
250 ++numWrites;
251 });
252
253 LOG(log_.info()) << "Looping through cache and submitting all writes took " << seconds
254 << " seconds. numWrites = " << std::to_string(numWrites);
255
256 LOG(log_.debug()) << "Loaded initial ledger";
257
258 if (not state_.get().isStopping) {
259 backend_->writeAccountTransactions(std::move(insertTxResult.accountTxData));
260 backend_->writeNFTs(insertTxResult.nfTokensData);
261 backend_->writeNFTTransactions(insertTxResult.nfTokenTxData);
262 backend_->writeMPTHolders(insertTxResult.mptHoldersData);
263 }
264
265 backend_->finishWrites(sequence);
266 });
267
268 LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
269 return lgrInfo;
270 }
271};
272
273} // namespace etl::impl
ripple::uint256 getBookBase(T const &key)
Get the book base.
Definition DBHelpers.hpp:292
std::string uint256ToString(ripple::uint256 const &input)
Stringify a ripple::uint256.
Definition DBHelpers.hpp:312
bool isBookDir(T const &key, R const &object)
Check whether the supplied object is a book dir.
Definition DBHelpers.hpp:258
Loads ledger data into the DB.
Definition LedgerLoader.hpp:69
FormattedTransactionsData insertTransactions(ripple::LedgerHeader const &ledger, GetLedgerResponseType &data)
Insert extracted transaction into the ledger.
Definition LedgerLoader.hpp:111
std::optional< ripple::LedgerHeader > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full.
Definition LedgerLoader.hpp:171
LedgerLoader(std::shared_ptr< BackendInterface > backend, std::shared_ptr< LoadBalancerType > balancer, LedgerFetcherType &fetcher, SystemState const &state)
Create an instance of the loader.
Definition LedgerLoader.hpp:87
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:195
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
std::optional< MPTHolderData > getMPTHolderFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull MPT data from TX via ETLService.
Definition MPTHelpers.cpp:63
std::vector< NFTsData > getUniqueNFTsDatas(std::vector< NFTsData > const &nfts)
Get the unique NFTs data from a vector of NFTsData happening in the same ledger. For example,...
Definition NFTHelpers.cpp:376
std::pair< std::vector< NFTTransactionsData >, std::optional< NFTsData > > getNFTDataFromTx(ripple::TxMeta const &txMeta, ripple::STTx const &sttx)
Pull NFT data from TX via ETLService.
Definition NFTHelpers.cpp:329
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:203
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::string toString(ripple::LedgerHeader const &info)
A helper function that converts a ripple::LedgerHeader to a string representation.
Definition LedgerUtils.hpp:215
Account transactions, NFT transactions and NFT data bundled togeher.
Definition LedgerLoader.hpp:55
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33