Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Transformer.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/Types.hpp"
25#include "etl/SystemState.hpp"
26#include "etl/impl/AmendmentBlockHandler.hpp"
27#include "etl/impl/LedgerLoader.hpp"
28#include "util/Assert.hpp"
29#include "util/LedgerUtils.hpp"
30#include "util/Profiler.hpp"
31#include "util/log/Logger.hpp"
32
33#include <grpcpp/grpcpp.h>
34#include <xrpl/basics/base_uint.h>
35#include <xrpl/basics/strHex.h>
36#include <xrpl/beast/core/CurrentThreadName.h>
37#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
38#include <xrpl/protocol/LedgerHeader.h>
39
40#include <chrono>
41#include <cstdint>
42#include <functional>
43#include <memory>
44#include <optional>
45#include <set>
46#include <stdexcept>
47#include <string>
48#include <thread>
49#include <utility>
50#include <vector>
51
52namespace etl::impl {
53
54/*
55 * TODO:
56 *
57 * 1) loading of data into db should not really be part of transform right?
58 * 2) can we just prepare the data and give it to the loader afterwards?
59 * 3) how to deal with cache update that is needed to write successors if neighbours not included?
60 */
61
65template <
66 typename DataPipeType,
67 typename LedgerLoaderType,
68 typename LedgerPublisherType,
69 typename AmendmentBlockHandlerType>
71 using GetLedgerResponseType = typename LedgerLoaderType::GetLedgerResponseType;
72 using RawLedgerObjectType = typename LedgerLoaderType::RawLedgerObjectType;
73
74 util::Logger log_{"ETL"};
75
76 std::reference_wrapper<DataPipeType> pipe_;
77 std::shared_ptr<BackendInterface> backend_;
78 std::reference_wrapper<LedgerLoaderType> loader_;
79 std::reference_wrapper<LedgerPublisherType> publisher_;
80 std::reference_wrapper<AmendmentBlockHandlerType> amendmentBlockHandler_;
81
82 uint32_t startSequence_;
83 std::reference_wrapper<SystemState> state_; // shared state for ETL
84
85 std::thread thread_;
86
87public:
95 DataPipeType& pipe,
96 std::shared_ptr<BackendInterface> backend,
97 LedgerLoaderType& loader,
98 LedgerPublisherType& publisher,
99 AmendmentBlockHandlerType& amendmentBlockHandler,
100 uint32_t startSequence,
101 SystemState& state
102 )
103 : pipe_{std::ref(pipe)}
104 , backend_{std::move(backend)}
105 , loader_{std::ref(loader)}
106 , publisher_{std::ref(publisher)}
107 , amendmentBlockHandler_{std::ref(amendmentBlockHandler)}
108 , startSequence_{startSequence}
109 , state_{std::ref(state)}
110 {
111 thread_ = std::thread([this]() { process(); });
112 }
113
118 {
119 if (thread_.joinable())
120 thread_.join();
121 }
122
126 void
128 {
129 ASSERT(thread_.joinable(), "Transformer thread must be joinable");
130 thread_.join();
131 }
132
133private:
134 void
135 process()
136 {
137 beast::setCurrentThreadName("ETLService transform");
138 uint32_t currentSequence = startSequence_;
139
140 while (not hasWriteConflict()) {
141 auto fetchResponse = pipe_.get().popNext(currentSequence);
142 ++currentSequence;
143
144 // if fetchResponse is an empty optional, the extracter thread has stopped and the transformer should
145 // stop as well
146 if (!fetchResponse)
147 break;
148
149 if (isStopping())
150 continue;
151
152 auto const start = std::chrono::system_clock::now();
153 auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
154
155 if (success) {
156 auto const numTxns = fetchResponse->transactions_list().transactions_size();
157 auto const numObjects = fetchResponse->ledger_objects().objects_size();
158 auto const end = std::chrono::system_clock::now();
159 auto const duration = ((end - start).count()) / 1000000000.0;
160
161 LOG(log_.info()) << "Load phase of ETL. Successfully wrote ledger! Ledger info: "
162 << util::toString(lgrInfo) << ". txn count = " << numTxns
163 << ". object count = " << numObjects << ". load time = " << duration
164 << ". load txns per second = " << numTxns / duration
165 << ". load objs per second = " << numObjects / duration;
166
167 // success is false if the ledger was already written
168 publisher_.get().publish(lgrInfo);
169 } else {
170 LOG(log_.error()) << "Error writing ledger. " << util::toString(lgrInfo);
171 }
172
173 setWriteConflict(not success);
174 }
175 }
176
184 std::pair<ripple::LedgerHeader, bool>
185 buildNextLedger(GetLedgerResponseType& rawData)
186 {
187 LOG(log_.debug()) << "Beginning ledger update";
188 ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
189
190 LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo);
191 backend_->startWrites();
192 backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header()));
193
194 writeSuccessors(lgrInfo, rawData);
195 std::optional<FormattedTransactionsData> insertTxResultOp;
196 try {
197 updateCache(lgrInfo, rawData);
198
199 LOG(log_.debug()) << "Inserted/modified/deleted all objects. Number of objects = "
200 << rawData.ledger_objects().objects_size();
201
202 insertTxResultOp.emplace(loader_.get().insertTransactions(lgrInfo, rawData));
203 } catch (std::runtime_error const& e) {
204 LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();
205
206 amendmentBlockHandler_.get().notifyAmendmentBlocked();
207 return {ripple::LedgerHeader{}, false};
208 }
209
210 LOG(log_.debug()) << "Inserted all transactions. Number of transactions = "
211 << rawData.transactions_list().transactions_size();
212
213 backend_->writeAccountTransactions(std::move(insertTxResultOp->accountTxData));
214 backend_->writeNFTs(insertTxResultOp->nfTokensData);
215 backend_->writeNFTTransactions(insertTxResultOp->nfTokenTxData);
216 backend_->writeMPTHolders(insertTxResultOp->mptHoldersData);
217
218 auto [success, duration] =
219 ::util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(lgrInfo.seq); });
220
221 LOG(log_.debug()) << "Finished writes. Total time: " << std::to_string(duration);
222 LOG(log_.debug()) << "Finished ledger update: " << ::util::toString(lgrInfo);
223
224 return {lgrInfo, success};
225 }
226
233 void
234 updateCache(ripple::LedgerHeader const& lgrInfo, GetLedgerResponseType& rawData)
235 {
236 std::vector<data::LedgerObject> cacheUpdates;
237 cacheUpdates.reserve(rawData.ledger_objects().objects_size());
238
239 // TODO change these to unordered_set
240 std::set<ripple::uint256> bookSuccessorsToCalculate;
241 std::set<ripple::uint256> modified;
242
243 for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
244 auto key = ripple::uint256::fromVoidChecked(obj.key());
245 ASSERT(key.has_value(), "Failed to deserialize key from void");
246
247 cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
248 LOG(log_.debug()) << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type();
249
250 if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included()) {
251 LOG(log_.debug()) << "object neighbors not included. using cache";
252
253 if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1)
254 throw std::logic_error("Cache is not full, but object neighbors were not included");
255
256 auto const blob = obj.mutable_data();
257 auto checkBookBase = false;
258 auto const isDeleted = (blob->size() == 0);
259
260 if (isDeleted) {
261 auto const old = backend_->cache().get(*key, lgrInfo.seq - 1);
262 ASSERT(old.has_value(), "Deleted object {} must be in cache", ripple::strHex(*key));
263 checkBookBase = isBookDir(*key, *old);
264 } else {
265 checkBookBase = isBookDir(*key, *blob);
266 }
267
268 if (checkBookBase) {
269 LOG(log_.debug()) << "Is book dir. Key = " << ripple::strHex(*key);
270
271 auto const bookBase = getBookBase(*key);
272 auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
273 ASSERT(
274 oldFirstDir.has_value(),
275 "Book base must have a successor for lgrInfo.seq - 1 = {}",
276 lgrInfo.seq - 1
277 );
278
279 // We deleted the first directory, or we added a directory prior to the old first
280 // directory
281 if ((isDeleted && key == oldFirstDir->key) || (!isDeleted && key < oldFirstDir->key)) {
282 LOG(log_.debug())
283 << "Need to recalculate book base successor. base = " << ripple::strHex(bookBase)
284 << " - key = " << ripple::strHex(*key) << " - isDeleted = " << isDeleted
285 << " - seq = " << lgrInfo.seq;
286 bookSuccessorsToCalculate.insert(bookBase);
287 }
288 }
289 }
290
291 if (obj.mod_type() == RawLedgerObjectType::MODIFIED)
292 modified.insert(*key);
293
294 backend_->writeLedgerObject(std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()));
295 }
296
297 backend_->cache().update(cacheUpdates, lgrInfo.seq);
298
299 // rippled didn't send successor information, so use our cache
300 if (!rawData.object_neighbors_included()) {
301 LOG(log_.debug()) << "object neighbors not included. using cache";
302 if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq)
303 throw std::logic_error("Cache is not full, but object neighbors were not included");
304
305 for (auto const& obj : cacheUpdates) {
306 if (modified.contains(obj.key))
307 continue;
308
309 auto lb = backend_->cache().getPredecessor(obj.key, lgrInfo.seq);
310 if (!lb)
311 lb = {.key = data::kFIRST_KEY, .blob = {}};
312
313 auto ub = backend_->cache().getSuccessor(obj.key, lgrInfo.seq);
314 if (!ub)
315 ub = {.key = data::kLAST_KEY, .blob = {}};
316
317 if (obj.blob.empty()) {
318 LOG(log_.debug()) << "writing successor for deleted object " << ripple::strHex(obj.key) << " - "
319 << ripple::strHex(lb->key) << " - " << ripple::strHex(ub->key);
320
321 backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(ub->key));
322 } else {
323 backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(obj.key));
324 backend_->writeSuccessor(uint256ToString(obj.key), lgrInfo.seq, uint256ToString(ub->key));
325
326 LOG(log_.debug()) << "writing successor for new object " << ripple::strHex(lb->key) << " - "
327 << ripple::strHex(obj.key) << " - " << ripple::strHex(ub->key);
328 }
329 }
330
331 for (auto const& base : bookSuccessorsToCalculate) {
332 auto succ = backend_->cache().getSuccessor(base, lgrInfo.seq);
333 if (succ) {
334 backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(succ->key));
335
336 LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
337 << ripple::strHex(succ->key);
338 } else {
339 backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(data::kLAST_KEY));
340
341 LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
342 << ripple::strHex(data::kLAST_KEY);
343 }
344 }
345 }
346 }
347
354 void
355 writeSuccessors(ripple::LedgerHeader const& lgrInfo, GetLedgerResponseType& rawData)
356 {
357 // Write successor info, if included from rippled
358 if (rawData.object_neighbors_included()) {
359 LOG(log_.debug()) << "object neighbors included";
360
361 for (auto& obj : *(rawData.mutable_book_successors())) {
362 auto firstBook = std::move(*obj.mutable_first_book());
363 if (!firstBook.size())
364 firstBook = uint256ToString(data::kLAST_KEY);
365 LOG(log_.debug()) << "writing book successor " << ripple::strHex(obj.book_base()) << " - "
366 << ripple::strHex(firstBook);
367
368 backend_->writeSuccessor(std::move(*obj.mutable_book_base()), lgrInfo.seq, std::move(firstBook));
369 }
370
371 for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
372 if (obj.mod_type() != RawLedgerObjectType::MODIFIED) {
373 std::string* predPtr = obj.mutable_predecessor();
374 if (predPtr->empty())
375 *predPtr = uint256ToString(data::kFIRST_KEY);
376 std::string* succPtr = obj.mutable_successor();
377 if (succPtr->empty())
378 *succPtr = uint256ToString(data::kLAST_KEY);
379
380 if (obj.mod_type() == RawLedgerObjectType::DELETED) {
381 LOG(log_.debug()) << "Modifying successors for deleted object " << ripple::strHex(obj.key())
382 << " - " << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
383
384 backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::move(*succPtr));
385 } else {
386 LOG(log_.debug()) << "adding successor for new object " << ripple::strHex(obj.key()) << " - "
387 << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
388
389 backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::string{obj.key()});
390 backend_->writeSuccessor(std::string{obj.key()}, lgrInfo.seq, std::move(*succPtr));
391 }
392 } else
393 LOG(log_.debug()) << "object modified " << ripple::strHex(obj.key());
394 }
395 }
396 }
397
399 bool
400 isStopping() const
401 {
402 return state_.get().isStopping;
403 }
404
406 bool
407 hasWriteConflict() const
408 {
409 return state_.get().writeConflict;
410 }
411
417 void
418 setWriteConflict(bool conflict)
419 {
420 state_.get().writeConflict = conflict;
421 }
422};
423
424} // namespace etl::impl
ripple::uint256 getBookBase(T const &key)
Get the book base.
Definition DBHelpers.hpp:292
std::string uint256ToString(ripple::uint256 const &input)
Stringify a ripple::uint256.
Definition DBHelpers.hpp:312
bool isBookDir(T const &key, R const &object)
Check whether the supplied object is a book dir.
Definition DBHelpers.hpp:258
Transformer thread that prepares new ledger out of raw data from GRPC.
Definition Transformer.hpp:70
void waitTillFinished()
Block calling thread until transformer thread exits.
Definition Transformer.hpp:127
Transformer(DataPipeType &pipe, std::shared_ptr< BackendInterface > backend, LedgerLoaderType &loader, LedgerPublisherType &publisher, AmendmentBlockHandlerType &amendmentBlockHandler, uint32_t startSequence, SystemState &state)
Create an instance of the transformer.
Definition Transformer.hpp:94
~Transformer()
Joins the transformer thread.
Definition Transformer.hpp:117
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:215
Pump fatal(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::FTL severity.
Definition Logger.cpp:220
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:203
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::string toString(ripple::LedgerHeader const &info)
A helper function that converts a ripple::LedgerHeader to a string representation.
Definition LedgerUtils.hpp:215
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33