Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CassandraBackend.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/Types.hpp"
25#include "data/cassandra/Concepts.hpp"
26#include "data/cassandra/Handle.hpp"
27#include "data/cassandra/Schema.hpp"
28#include "data/cassandra/SettingsProvider.hpp"
29#include "data/cassandra/Types.hpp"
30#include "data/cassandra/impl/ExecutionStrategy.hpp"
31#include "util/Assert.hpp"
32#include "util/LedgerUtils.hpp"
33#include "util/Profiler.hpp"
34#include "util/log/Logger.hpp"
35
36#include <boost/asio/spawn.hpp>
37#include <boost/json/object.hpp>
38#include <cassandra.h>
39#include <xrpl/basics/Blob.h>
40#include <xrpl/basics/base_uint.h>
41#include <xrpl/basics/strHex.h>
42#include <xrpl/protocol/AccountID.h>
43#include <xrpl/protocol/Indexes.h>
44#include <xrpl/protocol/LedgerHeader.h>
45#include <xrpl/protocol/nft.h>
46
47#include <atomic>
48#include <chrono>
49#include <cstddef>
50#include <cstdint>
51#include <iterator>
52#include <limits>
53#include <optional>
54#include <stdexcept>
55#include <string>
56#include <tuple>
57#include <utility>
58#include <vector>
59
60namespace data::cassandra {
61
70template <SomeSettingsProvider SettingsProviderType, SomeExecutionStrategy ExecutionStrategyType>
72 util::Logger log_{"Backend"};
73
74 SettingsProviderType settingsProvider_;
76
77 std::atomic_uint32_t ledgerSequence_ = 0u;
78
79protected:
80 Handle handle_;
81
82 // have to be mutable because BackendInterface constness :(
83 mutable ExecutionStrategyType executor_;
84
85public:
92 BasicCassandraBackend(SettingsProviderType settingsProvider, bool readOnly)
93 : settingsProvider_{std::move(settingsProvider)}
94 , schema_{settingsProvider_}
95 , handle_{settingsProvider_.getSettings()}
96 , executor_{settingsProvider_.getSettings(), handle_}
97 {
98 if (auto const res = handle_.connect(); not res)
99 throw std::runtime_error("Could not connect to database: " + res.error());
100
101 if (not readOnly) {
102 if (auto const res = handle_.execute(schema_.createKeyspace); not res) {
103 // on datastax, creation of keyspaces can be configured to only be done thru the admin
104 // interface. this does not mean that the keyspace does not already exist tho.
105 if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
106 throw std::runtime_error("Could not create keyspace: " + res.error());
107 }
108
109 if (auto const res = handle_.executeEach(schema_.createSchema); not res)
110 throw std::runtime_error("Could not create schema: " + res.error());
111 }
112
113 try {
114 schema_.prepareStatements(handle_);
115 } catch (std::runtime_error const& ex) {
116 LOG(log_.error()) << "Failed to prepare the statements: " << ex.what() << "; readOnly: " << readOnly;
117 throw;
118 }
119
120 LOG(log_.info()) << "Created (revamped) CassandraBackend";
121 }
122
125 ripple::AccountID const& account,
126 std::uint32_t const limit,
127 bool forward,
128 std::optional<TransactionsCursor> const& cursorIn,
129 boost::asio::yield_context yield
130 ) const override
131 {
132 auto rng = fetchLedgerRange();
133 if (!rng)
134 return {.txns = {}, .cursor = {}};
135
136 Statement const statement = [this, forward, &account]() {
137 if (forward)
138 return schema_->selectAccountTxForward.bind(account);
139
140 return schema_->selectAccountTx.bind(account);
141 }();
142
143 auto cursor = cursorIn;
144 if (cursor) {
145 statement.bindAt(1, cursor->asTuple());
146 LOG(log_.debug()) << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence
147 << cursor->transactionIndex;
148 } else {
149 auto const seq = forward ? rng->minSequence : rng->maxSequence;
150 auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
151
152 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
153 LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq
154 << " tuple = " << placeHolder;
155 }
156
157 // FIXME: Limit is a hack to support uint32_t properly for the time
158 // being. Should be removed later and schema updated to use proper
159 // types.
160 statement.bindAt(2, Limit{limit});
161 auto const res = executor_.read(yield, statement);
162 auto const& results = res.value();
163 if (not results.hasRows()) {
164 LOG(log_.debug()) << "No rows returned";
165 return {};
166 }
167
168 std::vector<ripple::uint256> hashes = {};
169 auto numRows = results.numRows();
170 LOG(log_.info()) << "num_rows = " << numRows;
171
172 for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
173 hashes.push_back(hash);
174 if (--numRows == 0) {
175 LOG(log_.debug()) << "Setting cursor";
176 cursor = data;
177 }
178 }
179
180 auto const txns = fetchTransactions(hashes, yield);
181 LOG(log_.debug()) << "Txns = " << txns.size();
182
183 if (txns.size() == limit) {
184 LOG(log_.debug()) << "Returning cursor";
185 return {txns, cursor};
186 }
187
188 return {txns, {}};
189 }
190
191 void
193 {
194 executor_.sync();
195 }
196
197 bool
198 doFinishWrites() override
199 {
201
202 if (!range_) {
203 executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
204 }
205
206 if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
207 LOG(log_.warn()) << "Update failed for ledger " << ledgerSequence_;
208 return false;
209 }
210
211 LOG(log_.info()) << "Committed ledger " << ledgerSequence_;
212 return true;
213 }
214
215 void
216 writeLedger(ripple::LedgerHeader const& ledgerHeader, std::string&& blob) override
217 {
218 executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob));
219
220 executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq);
221
222 ledgerSequence_ = ledgerHeader.seq;
223 }
224
225 std::optional<std::uint32_t>
226 fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
227 {
228 if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res) {
229 if (auto const& result = res.value(); result) {
230 if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
231 return maybeValue;
232
233 LOG(log_.error()) << "Could not fetch latest ledger - no rows";
234 return std::nullopt;
235 }
236
237 LOG(log_.error()) << "Could not fetch latest ledger - no result";
238 } else {
239 LOG(log_.error()) << "Could not fetch latest ledger: " << res.error();
240 }
241
242 return std::nullopt;
243 }
244
245 std::optional<ripple::LedgerHeader>
246 fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
247 {
248 auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence);
249 if (res) {
250 if (auto const& result = res.value(); result) {
251 if (auto const maybeValue = result.template get<std::vector<unsigned char>>(); maybeValue) {
252 return util::deserializeHeader(ripple::makeSlice(*maybeValue));
253 }
254
255 LOG(log_.error()) << "Could not fetch ledger by sequence - no rows";
256 return std::nullopt;
257 }
258
259 LOG(log_.error()) << "Could not fetch ledger by sequence - no result";
260 } else {
261 LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error();
262 }
263
264 return std::nullopt;
265 }
266
267 std::optional<ripple::LedgerHeader>
268 fetchLedgerByHash(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
269 {
270 if (auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) {
271 if (auto const& result = res.value(); result) {
272 if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
273 return fetchLedgerBySequence(*maybeValue, yield);
274
275 LOG(log_.error()) << "Could not fetch ledger by hash - no rows";
276 return std::nullopt;
277 }
278
279 LOG(log_.error()) << "Could not fetch ledger by hash - no result";
280 } else {
281 LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error();
282 }
283
284 return std::nullopt;
285 }
286
287 std::optional<LedgerRange>
288 hardFetchLedgerRange(boost::asio::yield_context yield) const override
289 {
290 auto const res = executor_.read(yield, schema_->selectLedgerRange);
291 if (res) {
292 auto const& results = res.value();
293 if (not results.hasRows()) {
294 LOG(log_.debug()) << "Could not fetch ledger range - no rows";
295 return std::nullopt;
296 }
297
298 // TODO: this is probably a good place to use user type in
299 // cassandra instead of having two rows with bool flag. or maybe at
300 // least use tuple<int, int>?
301 LedgerRange range;
302 std::size_t idx = 0;
303 for (auto [seq] : extract<uint32_t>(results)) {
304 if (idx == 0) {
305 range.maxSequence = range.minSequence = seq;
306 } else if (idx == 1) {
307 range.maxSequence = seq;
308 }
309
310 ++idx;
311 }
312
313 if (range.minSequence > range.maxSequence)
314 std::swap(range.minSequence, range.maxSequence);
315
316 LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":"
317 << range.maxSequence;
318 return range;
319 }
320 LOG(log_.error()) << "Could not fetch ledger range: " << res.error();
321
322 return std::nullopt;
323 }
324
325 std::vector<TransactionAndMetadata>
326 fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
327 {
328 auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence, yield);
329 return fetchTransactions(hashes, yield);
330 }
331
332 std::vector<ripple::uint256>
333 fetchAllTransactionHashesInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield)
334 const override
335 {
336 auto start = std::chrono::system_clock::now();
337 auto const res = executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence);
338
339 if (not res) {
340 LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error();
341 return {};
342 }
343
344 auto const& result = res.value();
345 if (not result.hasRows()) {
346 LOG(log_.warn()) << "Could not fetch all transaction hashes - no rows; ledger = "
347 << std::to_string(ledgerSequence);
348 return {};
349 }
350
351 std::vector<ripple::uint256> hashes;
352 for (auto [hash] : extract<ripple::uint256>(result))
353 hashes.push_back(std::move(hash));
354
355 auto end = std::chrono::system_clock::now();
356 LOG(log_.debug()) << "Fetched " << hashes.size() << " transaction hashes from database in "
357 << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
358 << " milliseconds";
359
360 return hashes;
361 }
362
363 std::optional<NFT>
364 fetchNFT(ripple::uint256 const& tokenID, std::uint32_t const ledgerSequence, boost::asio::yield_context yield)
365 const override
366 {
367 auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence);
368 if (not res)
369 return std::nullopt;
370
371 if (auto const maybeRow = res->template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
372 auto [seq, owner, isBurned] = *maybeRow;
373 auto result = std::make_optional<NFT>(tokenID, seq, owner, isBurned);
374
375 // now fetch URI. Usually we will have the URI even for burned NFTs,
376 // but if the first ledger on this clio included NFTokenBurn
377 // transactions we will not have the URIs for any of those tokens.
378 // In any other case not having the URI indicates something went
379 // wrong with our data.
380 //
381 // TODO - in the future would be great for any handlers that use
382 // this could inject a warning in this case (the case of not having
383 // a URI because it was burned in the first ledger) to indicate that
384 // even though we are returning a blank URI, the NFT might have had
385 // one.
386 auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence);
387 if (uriRes) {
388 if (auto const maybeUri = uriRes->template get<ripple::Blob>(); maybeUri)
389 result->uri = *maybeUri;
390 }
391
392 return result;
393 }
394
395 LOG(log_.error()) << "Could not fetch NFT - no rows";
396 return std::nullopt;
397 }
398
401 ripple::uint256 const& tokenID,
402 std::uint32_t const limit,
403 bool const forward,
404 std::optional<TransactionsCursor> const& cursorIn,
405 boost::asio::yield_context yield
406 ) const override
407 {
408 auto rng = fetchLedgerRange();
409 if (!rng)
410 return {.txns = {}, .cursor = {}};
411
412 Statement const statement = [this, forward, &tokenID]() {
413 if (forward)
414 return schema_->selectNFTTxForward.bind(tokenID);
415
416 return schema_->selectNFTTx.bind(tokenID);
417 }();
418
419 auto cursor = cursorIn;
420 if (cursor) {
421 statement.bindAt(1, cursor->asTuple());
422 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence
423 << cursor->transactionIndex;
424 } else {
425 auto const seq = forward ? rng->minSequence : rng->maxSequence;
426 auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
427
428 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
429 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq
430 << " tuple = " << placeHolder;
431 }
432
433 statement.bindAt(2, Limit{limit});
434
435 auto const res = executor_.read(yield, statement);
436 auto const& results = res.value();
437 if (not results.hasRows()) {
438 LOG(log_.debug()) << "No rows returned";
439 return {};
440 }
441
442 std::vector<ripple::uint256> hashes = {};
443 auto numRows = results.numRows();
444 LOG(log_.info()) << "num_rows = " << numRows;
445
446 for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
447 hashes.push_back(hash);
448 if (--numRows == 0) {
449 LOG(log_.debug()) << "Setting cursor";
450 cursor = data;
451
452 // forward queries by ledger/tx sequence `>=`
453 // so we have to advance the index by one
454 if (forward)
455 ++cursor->transactionIndex;
456 }
457 }
458
459 auto const txns = fetchTransactions(hashes, yield);
460 LOG(log_.debug()) << "NFT Txns = " << txns.size();
461
462 if (txns.size() == limit) {
463 LOG(log_.debug()) << "Returning cursor";
464 return {txns, cursor};
465 }
466
467 return {txns, {}};
468 }
469
472 ripple::AccountID const& issuer,
473 std::optional<std::uint32_t> const& taxon,
474 std::uint32_t const ledgerSequence,
475 std::uint32_t const limit,
476 std::optional<ripple::uint256> const& cursorIn,
477 boost::asio::yield_context yield
478 ) const override
479 {
480 NFTsAndCursor ret;
481
482 Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() {
483 if (taxon.has_value()) {
484 auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
485 r.bindAt(1, *taxon);
486 r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
487 r.bindAt(3, Limit{limit});
488 return r;
489 }
490
491 auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
492 r.bindAt(
493 1,
494 std::make_tuple(
495 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
496 cursorIn.value_or(ripple::uint256(0))
497 )
498 );
499 r.bindAt(2, Limit{limit});
500 return r;
501 }();
502
503 // Query for all the NFTs issued by the account, potentially filtered by the taxon
504 auto const res = executor_.read(yield, idQueryStatement);
505
506 auto const& idQueryResults = res.value();
507 if (not idQueryResults.hasRows()) {
508 LOG(log_.debug()) << "No rows returned";
509 return {};
510 }
511
512 std::vector<ripple::uint256> nftIDs;
513 for (auto const [nftID] : extract<ripple::uint256>(idQueryResults))
514 nftIDs.push_back(nftID);
515
516 if (nftIDs.empty())
517 return ret;
518
519 if (nftIDs.size() == limit)
520 ret.cursor = nftIDs.back();
521
522 std::vector<Statement> selectNFTStatements;
523 selectNFTStatements.reserve(nftIDs.size());
524
525 std::transform(
526 std::cbegin(nftIDs),
527 std::cend(nftIDs),
528 std::back_inserter(selectNFTStatements),
529 [&](auto const& nftID) { return schema_->selectNFT.bind(nftID, ledgerSequence); }
530 );
531
532 auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
533
534 std::vector<Statement> selectNFTURIStatements;
535 selectNFTURIStatements.reserve(nftIDs.size());
536
537 std::transform(
538 std::cbegin(nftIDs),
539 std::cend(nftIDs),
540 std::back_inserter(selectNFTURIStatements),
541 [&](auto const& nftID) { return schema_->selectNFTURI.bind(nftID, ledgerSequence); }
542 );
543
544 auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
545
546 for (auto i = 0u; i < nftIDs.size(); i++) {
547 if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
548 auto [seq, owner, isBurned] = *maybeRow;
549 NFT nft(nftIDs[i], seq, owner, isBurned);
550 if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
551 nft.uri = *maybeUri;
552 ret.nfts.push_back(nft);
553 }
554 }
555 return ret;
556 }
557
560 ripple::uint192 const& mptID,
561 std::uint32_t const limit,
562 std::optional<ripple::AccountID> const& cursorIn,
563 std::uint32_t const ledgerSequence,
564 boost::asio::yield_context yield
565 ) const override
566 {
567 auto const holderEntries = executor_.read(
568 yield, schema_->selectMPTHolders, mptID, cursorIn.value_or(ripple::AccountID(0)), Limit{limit}
569 );
570
571 auto const& holderResults = holderEntries.value();
572 if (not holderResults.hasRows()) {
573 LOG(log_.debug()) << "No rows returned";
574 return {};
575 }
576
577 std::vector<ripple::uint256> mptKeys;
578 std::optional<ripple::AccountID> cursor;
579 for (auto const [holder] : extract<ripple::AccountID>(holderResults)) {
580 mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key);
581 cursor = holder;
582 }
583
584 auto mptObjects = doFetchLedgerObjects(mptKeys, ledgerSequence, yield);
585
586 auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob const& mpt) { return mpt.empty(); });
587
588 mptObjects.erase(it, mptObjects.end());
589
590 ASSERT(mptKeys.size() <= limit, "Number of keys can't exceed the limit");
591 if (mptKeys.size() == limit)
592 return {mptObjects, cursor};
593
594 return {mptObjects, {}};
595 }
596
597 std::optional<Blob>
598 doFetchLedgerObject(ripple::uint256 const& key, std::uint32_t const sequence, boost::asio::yield_context yield)
599 const override
600 {
601 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
602 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
603 if (auto const result = res->template get<Blob>(); result) {
604 if (result->size())
605 return result;
606 } else {
607 LOG(log_.debug()) << "Could not fetch ledger object - no rows";
608 }
609 } else {
610 LOG(log_.error()) << "Could not fetch ledger object: " << res.error();
611 }
612
613 return std::nullopt;
614 }
615
616 std::optional<std::uint32_t>
617 doFetchLedgerObjectSeq(ripple::uint256 const& key, std::uint32_t const sequence, boost::asio::yield_context yield)
618 const override
619 {
620 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
621 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
622 if (auto const result = res->template get<Blob, std::uint32_t>(); result) {
623 auto [_, seq] = result.value();
624 return seq;
625 }
626 LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows";
627 } else {
628 LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error();
629 }
630
631 return std::nullopt;
632 }
633
634 std::optional<TransactionAndMetadata>
635 fetchTransaction(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
636 {
637 if (auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) {
638 if (auto const maybeValue = res->template get<Blob, Blob, uint32_t, uint32_t>(); maybeValue) {
639 auto [transaction, meta, seq, date] = *maybeValue;
640 return std::make_optional<TransactionAndMetadata>(transaction, meta, seq, date);
641 }
642
643 LOG(log_.debug()) << "Could not fetch transaction - no rows";
644 } else {
645 LOG(log_.error()) << "Could not fetch transaction: " << res.error();
646 }
647
648 return std::nullopt;
649 }
650
651 std::optional<ripple::uint256>
652 doFetchSuccessorKey(ripple::uint256 key, std::uint32_t const ledgerSequence, boost::asio::yield_context yield)
653 const override
654 {
655 if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence); res) {
656 if (auto const result = res->template get<ripple::uint256>(); result) {
657 if (*result == kLAST_KEY)
658 return std::nullopt;
659 return result;
660 }
661
662 LOG(log_.debug()) << "Could not fetch successor - no rows";
663 } else {
664 LOG(log_.error()) << "Could not fetch successor: " << res.error();
665 }
666
667 return std::nullopt;
668 }
669
670 std::vector<TransactionAndMetadata>
671 fetchTransactions(std::vector<ripple::uint256> const& hashes, boost::asio::yield_context yield) const override
672 {
673 if (hashes.empty())
674 return {};
675
676 auto const numHashes = hashes.size();
677 std::vector<TransactionAndMetadata> results;
678 results.reserve(numHashes);
679
680 std::vector<Statement> statements;
681 statements.reserve(numHashes);
682
683 auto const timeDiff = util::timed([this, yield, &results, &hashes, &statements]() {
684 // TODO: seems like a job for "hash IN (list of hashes)" instead?
685 std::transform(
686 std::cbegin(hashes),
687 std::cend(hashes),
688 std::back_inserter(statements),
689 [this](auto const& hash) { return schema_->selectTransaction.bind(hash); }
690 );
691
692 auto const entries = executor_.readEach(yield, statements);
693 std::transform(
694 std::cbegin(entries),
695 std::cend(entries),
696 std::back_inserter(results),
697 [](auto const& res) -> TransactionAndMetadata {
698 if (auto const maybeRow = res.template get<Blob, Blob, uint32_t, uint32_t>(); maybeRow)
699 return *maybeRow;
700
701 return {};
702 }
703 );
704 });
705
706 ASSERT(numHashes == results.size(), "Number of hashes and results must match");
707 LOG(log_.debug()) << "Fetched " << numHashes << " transactions from database in " << timeDiff
708 << " milliseconds";
709 return results;
710 }
711
712 std::vector<Blob>
714 std::vector<ripple::uint256> const& keys,
715 std::uint32_t const sequence,
716 boost::asio::yield_context yield
717 ) const override
718 {
719 if (keys.empty())
720 return {};
721
722 auto const numKeys = keys.size();
723 LOG(log_.trace()) << "Fetching " << numKeys << " objects";
724
725 std::vector<Blob> results;
726 results.reserve(numKeys);
727
728 std::vector<Statement> statements;
729 statements.reserve(numKeys);
730
731 // TODO: seems like a job for "key IN (list of keys)" instead?
732 std::transform(
733 std::cbegin(keys),
734 std::cend(keys),
735 std::back_inserter(statements),
736 [this, &sequence](auto const& key) { return schema_->selectObject.bind(key, sequence); }
737 );
738
739 auto const entries = executor_.readEach(yield, statements);
740 std::transform(
741 std::cbegin(entries),
742 std::cend(entries),
743 std::back_inserter(results),
744 [](auto const& res) -> Blob {
745 if (auto const maybeValue = res.template get<Blob>(); maybeValue)
746 return *maybeValue;
747
748 return {};
749 }
750 );
751
752 LOG(log_.trace()) << "Fetched " << numKeys << " objects";
753 return results;
754 }
755
756 std::vector<ripple::uint256>
757 fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield)
758 const override
759 {
760 std::vector<ripple::uint256> liveAccounts;
761 std::optional<ripple::AccountID> lastItem;
762
763 while (liveAccounts.size() < number) {
764 Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize})
765 : schema_->selectAccountFromBegining.bind(Limit{pageSize});
766
767 auto const res = executor_.read(yield, statement);
768 if (res) {
769 auto const& results = res.value();
770 if (not results.hasRows()) {
771 LOG(log_.debug()) << "No rows returned";
772 break;
773 }
774 // The results should not contain duplicates, we just filter out deleted accounts
775 std::vector<ripple::uint256> fullAccounts;
776 for (auto [account] : extract<ripple::AccountID>(results)) {
777 fullAccounts.push_back(ripple::keylet::account(account).key);
778 lastItem = account;
779 }
780 auto const objs = doFetchLedgerObjects(fullAccounts, seq, yield);
781
782 for (auto i = 0u; i < fullAccounts.size(); i++) {
783 if (not objs[i].empty()) {
784 if (liveAccounts.size() < number) {
785 liveAccounts.push_back(fullAccounts[i]);
786 } else {
787 break;
788 }
789 }
790 }
791 } else {
792 LOG(log_.error()) << "Could not fetch account from account_tx: " << res.error();
793 break;
794 }
795 }
796
797 return liveAccounts;
798 }
799
800 std::vector<LedgerObject>
801 fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
802 {
803 auto const [keys, timeDiff] = util::timed([this, &ledgerSequence, yield]() -> std::vector<ripple::uint256> {
804 auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
805 if (not res) {
806 LOG(log_.error()) << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence;
807 return {};
808 }
809
810 auto const& results = res.value();
811 if (not results) {
812 LOG(log_.error()) << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
813 return {};
814 }
815
816 std::vector<ripple::uint256> resultKeys;
817 for (auto [key] : extract<ripple::uint256>(results))
818 resultKeys.push_back(key);
819
820 return resultKeys;
821 });
822
823 // one of the above errors must have happened
824 if (keys.empty())
825 return {};
826
827 LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from database in " << timeDiff
828 << " milliseconds";
829
830 auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield);
831 std::vector<LedgerObject> results;
832 results.reserve(keys.size());
833
834 std::transform(
835 std::cbegin(keys),
836 std::cend(keys),
837 std::cbegin(objs),
838 std::back_inserter(results),
839 [](auto const& key, auto const& obj) { return LedgerObject{key, obj}; }
840 );
841
842 return results;
843 }
844
845 std::optional<std::string>
846 fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const override
847 {
848 auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName));
849 if (not res) {
850 LOG(log_.error()) << "Could not fetch migrator status: " << res.error();
851 return {};
852 }
853
854 auto const& results = res.value();
855 if (not results) {
856 return {};
857 }
858
859 for (auto [statusString] : extract<std::string>(results))
860 return statusString;
861
862 return {};
863 }
864
865 void
866 doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
867 {
868 LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]";
869
870 if (range_)
871 executor_.write(schema_->insertDiff, seq, key);
872
873 executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));
874 }
875
876 void
877 writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override
878 {
879 LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. "
880 << " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes.";
881 ASSERT(!key.empty(), "Key must not be empty");
882 ASSERT(!successor.empty(), "Successor must not be empty");
883
884 executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor));
885 }
886
887 void
888 writeAccountTransactions(std::vector<AccountTransactionsData> data) override
889 {
890 std::vector<Statement> statements;
891 statements.reserve(data.size() * 10); // assume 10 transactions avg
892
893 for (auto& record : data) {
894 std::transform(
895 std::begin(record.accounts),
896 std::end(record.accounts),
897 std::back_inserter(statements),
898 [this, &record](auto&& account) {
899 return schema_->insertAccountTx.bind(
900 std::forward<decltype(account)>(account),
901 std::make_tuple(record.ledgerSequence, record.transactionIndex),
902 record.txHash
903 );
904 }
905 );
906 }
907
908 executor_.write(std::move(statements));
909 }
910
911 void
912 writeNFTTransactions(std::vector<NFTTransactionsData> const& data) override
913 {
914 std::vector<Statement> statements;
915 statements.reserve(data.size());
916
917 std::transform(std::cbegin(data), std::cend(data), std::back_inserter(statements), [this](auto const& record) {
918 return schema_->insertNFTTx.bind(
919 record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash
920 );
921 });
922
923 executor_.write(std::move(statements));
924 }
925
926 void
928 std::string&& hash,
929 std::uint32_t const seq,
930 std::uint32_t const date,
931 std::string&& transaction,
932 std::string&& metadata
933 ) override
934 {
935 LOG(log_.trace()) << "Writing txn to database";
936
937 executor_.write(schema_->insertLedgerTransaction, seq, hash);
938 executor_.write(
939 schema_->insertTransaction, std::move(hash), seq, date, std::move(transaction), std::move(metadata)
940 );
941 }
942
943 void
944 writeNFTs(std::vector<NFTsData> const& data) override
945 {
946 std::vector<Statement> statements;
947 statements.reserve(data.size() * 3);
948
949 for (NFTsData const& record : data) {
950 if (!record.onlyUriChanged) {
951 statements.push_back(
952 schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned)
953 );
954
955 // If `uri` is set (and it can be set to an empty uri), we know this
956 // is a net-new NFT. That is, this NFT has not been seen before by
957 // us _OR_ it is in the extreme edge case of a re-minted NFT ID with
958 // the same NFT ID as an already-burned token. In this case, we need
959 // to record the URI and link to the issuer_nf_tokens table.
960 if (record.uri) {
961 statements.push_back(schema_->insertIssuerNFT.bind(
962 ripple::nft::getIssuer(record.tokenID),
963 static_cast<uint32_t>(ripple::nft::getTaxon(record.tokenID)),
964 record.tokenID
965 ));
966 statements.push_back(
967 schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
968 );
969 }
970 } else {
971 // only uri changed, we update the uri table only
972 statements.push_back(
973 schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
974 );
975 }
976 }
977
978 executor_.writeEach(std::move(statements));
979 }
980
981 void
982 writeMPTHolders(std::vector<MPTHolderData> const& data) override
983 {
984 std::vector<Statement> statements;
985 statements.reserve(data.size());
986 for (auto [mptId, holder] : data)
987 statements.push_back(schema_->insertMPTHolder.bind(std::move(mptId), std::move(holder)));
988
989 executor_.write(std::move(statements));
990 }
991
992 void
993 startWrites() const override
994 {
995 // Note: no-op in original implementation too.
996 // probably was used in PG to start a transaction or smth.
997 }
998
999 void
1000 writeMigratorStatus(std::string const& migratorName, std::string const& status) override
1001 {
1002 executor_.writeSync(
1003 schema_->insertMigratorStatus, data::cassandra::Text{migratorName}, data::cassandra::Text(status)
1004 );
1005 }
1006
1007 bool
1008 isTooBusy() const override
1009 {
1010 return executor_.isTooBusy();
1011 }
1012
1013 boost::json::object
1014 stats() const override
1015 {
1016 return executor_.stats();
1017 }
1018
1019private:
1020 bool
1021 executeSyncUpdate(Statement statement)
1022 {
1023 auto const res = executor_.writeSync(statement);
1024 auto maybeSuccess = res->template get<bool>();
1025 if (not maybeSuccess) {
1026 LOG(log_.error()) << "executeSyncUpdate - error getting result - no row";
1027 return false;
1028 }
1029
1030 if (not maybeSuccess.value()) {
1031 LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect";
1032
1033 // error may indicate that another writer wrote something.
1034 // in this case let's just compare the current state of things
1035 // against what we were trying to write in the first place and
1036 // use that as the source of truth for the result.
1037 auto rng = hardFetchLedgerRangeNoThrow();
1038 return rng && rng->maxSequence == ledgerSequence_;
1039 }
1040
1041 return true;
1042 }
1043};
1044
1045using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;
1046
1047} // namespace data::cassandra
The interface to the database used by Clio.
Definition BackendInterface.hpp:138
std::vector< Blob > fetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t sequence, boost::asio::yield_context yield) const
Fetches all ledger objects by their keys.
Definition BackendInterface.cpp:119
std::optional< LedgerRange > hardFetchLedgerRangeNoThrow() const
Fetches the ledger range from DB retrying until no DatabaseTimeout is thrown.
Definition BackendInterface.cpp:77
std::optional< LedgerRange > fetchLedgerRange() const
Fetch the current ledger range.
Definition BackendInterface.cpp:267
Implements BackendInterface for Cassandra/ScyllaDB.
Definition CassandraBackend.hpp:71
std::vector< TransactionAndMetadata > fetchTransactions(std::vector< ripple::uint256 > const &hashes, boost::asio::yield_context yield) const override
Fetches multiple transactions.
Definition CassandraBackend.hpp:671
MPTHoldersAndCursor fetchMPTHolders(ripple::uint192 const &mptID, std::uint32_t const limit, std::optional< ripple::AccountID > const &cursorIn, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all holders' balances for a MPTIssuanceID.
Definition CassandraBackend.hpp:559
void writeTransaction(std::string &&hash, std::uint32_t const seq, std::uint32_t const date, std::string &&transaction, std::string &&metadata) override
Writes a new transaction.
Definition CassandraBackend.hpp:927
void writeNFTTransactions(std::vector< NFTTransactionsData > const &data) override
Write NFTs transactions.
Definition CassandraBackend.hpp:912
std::vector< ripple::uint256 > fetchAllTransactionHashesInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transaction hashes from a specific ledger.
Definition CassandraBackend.hpp:333
std::optional< ripple::uint256 > doFetchSuccessorKey(ripple::uint256 key, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Database-specific implementation of fetching the successor key.
Definition CassandraBackend.hpp:652
std::optional< std::uint32_t > doFetchLedgerObjectSeq(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object sequence.
Definition CassandraBackend.hpp:617
bool isTooBusy() const override
Definition CassandraBackend.hpp:1008
boost::json::object stats() const override
Definition CassandraBackend.hpp:1014
std::optional< std::string > fetchMigratorStatus(std::string const &migratorName, boost::asio::yield_context yield) const override
Fetches the status of migrator by name.
Definition CassandraBackend.hpp:846
std::optional< NFT > fetchNFT(ripple::uint256 const &tokenID, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches a specific NFT.
Definition CassandraBackend.hpp:364
void startWrites() const override
Starts a write transaction with the DB. No-op for cassandra.
Definition CassandraBackend.hpp:993
void writeLedger(ripple::LedgerHeader const &ledgerHeader, std::string &&blob) override
Writes to a specific ledger.
Definition CassandraBackend.hpp:216
void waitForWritesToFinish() override
Wait for all pending writes to finish.
Definition CassandraBackend.hpp:192
void writeMPTHolders(std::vector< MPTHolderData > const &data) override
Write accounts that started holding onto a MPT.
Definition CassandraBackend.hpp:982
void writeNFTs(std::vector< NFTsData > const &data) override
Writes NFTs to the database.
Definition CassandraBackend.hpp:944
void writeAccountTransactions(std::vector< AccountTransactionsData > data) override
Write a new set of account transactions.
Definition CassandraBackend.hpp:888
std::optional< std::uint32_t > fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
Fetches the latest ledger sequence.
Definition CassandraBackend.hpp:226
TransactionsAndCursor fetchAccountTransactions(ripple::AccountID const &account, std::uint32_t const limit, bool forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific account.
Definition CassandraBackend.hpp:124
void doWriteLedgerObject(std::string &&key, std::uint32_t const seq, std::string &&blob) override
Writes a ledger object to the database.
Definition CassandraBackend.hpp:866
NFTsAndCursor fetchNFTsByIssuer(ripple::AccountID const &issuer, std::optional< std::uint32_t > const &taxon, std::uint32_t const ledgerSequence, std::uint32_t const limit, std::optional< ripple::uint256 > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all NFTs issued by a given address.
Definition CassandraBackend.hpp:471
void writeSuccessor(std::string &&key, std::uint32_t const seq, std::string &&successor) override
Write a new successor.
Definition CassandraBackend.hpp:877
BasicCassandraBackend(SettingsProviderType settingsProvider, bool readOnly)
Create a new cassandra/scylla backend instance.
Definition CassandraBackend.hpp:92
std::optional< TransactionAndMetadata > fetchTransaction(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific transaction.
Definition CassandraBackend.hpp:635
std::vector< TransactionAndMetadata > fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transactions from a specific ledger.
Definition CassandraBackend.hpp:326
std::optional< Blob > doFetchLedgerObject(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object.
Definition CassandraBackend.hpp:598
bool doFinishWrites() override
The implementation should wait for all pending writes to finish.
Definition CassandraBackend.hpp:198
std::vector< ripple::uint256 > fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield) const override
Fetch the specified number of account root object indexes by page, the accounts need to exist for seq...
Definition CassandraBackend.hpp:757
std::vector< LedgerObject > fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Returns the difference between ledgers.
Definition CassandraBackend.hpp:801
void writeMigratorStatus(std::string const &migratorName, std::string const &status) override
Mark the migration status of a migrator as Migrated in the database.
Definition CassandraBackend.hpp:1000
TransactionsAndCursor fetchNFTTransactions(ripple::uint256 const &tokenID, std::uint32_t const limit, bool const forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific NFT.
Definition CassandraBackend.hpp:400
std::optional< ripple::LedgerHeader > fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
Fetches a specific ledger by sequence number.
Definition CassandraBackend.hpp:246
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching ledger objects.
Definition CassandraBackend.hpp:713
std::optional< LedgerRange > hardFetchLedgerRange(boost::asio::yield_context yield) const override
Fetches the ledger range from DB.
Definition CassandraBackend.hpp:288
std::optional< ripple::LedgerHeader > fetchLedgerByHash(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific ledger by hash.
Definition CassandraBackend.hpp:268
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:46
MaybeErrorType connect() const
Synchonous version of the above.
Definition Handle.cpp:55
MaybeErrorType executeEach(std::vector< StatementType > const &statements) const
Synchonous version of the above.
Definition Handle.cpp:109
ResultOrErrorType execute(std::string_view query, Args &&... args) const
Synchonous version of the above.
Definition Handle.hpp:185
Manages the DB schema and provides access to prepared statements.
Definition Schema.hpp:55
void prepareStatements(Handle const &handle)
Recreates the prepared statements.
Definition Schema.hpp:814
Definition Statement.hpp:45
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:91
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:210
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:215
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:195
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
This namespace implements a wrapper for the Cassandra C++ driver.
Definition Concepts.hpp:37
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:329
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:203
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
Represents an object in the ledger.
Definition Types.hpp:41
Stores a range of sequences as a min and max pair.
Definition Types.hpp:247
Represents an array of MPTokens.
Definition Types.hpp:239
Represents a NFToken.
Definition Types.hpp:172
Represents a bundle of NFTs with a cursor to the next page.
Definition Types.hpp:231
Represents a transaction and its metadata bundled together.
Definition Types.hpp:68
Represests a bundle of transactions with metadata and a cursor to the next page.
Definition Types.hpp:164
A strong type wrapper for int32_t.
Definition Types.hpp:56
A strong type wrapper for string.
Definition Types.hpp:67