22#include "data/BackendInterface.hpp"
24#include "data/Types.hpp"
25#include "data/cassandra/Concepts.hpp"
26#include "data/cassandra/Handle.hpp"
27#include "data/cassandra/Schema.hpp"
28#include "data/cassandra/SettingsProvider.hpp"
29#include "data/cassandra/Types.hpp"
30#include "data/cassandra/impl/ExecutionStrategy.hpp"
31#include "util/Assert.hpp"
32#include "util/LedgerUtils.hpp"
33#include "util/Profiler.hpp"
34#include "util/log/Logger.hpp"
36#include <boost/asio/spawn.hpp>
37#include <boost/json/object.hpp>
39#include <xrpl/basics/Blob.h>
40#include <xrpl/basics/base_uint.h>
41#include <xrpl/basics/strHex.h>
42#include <xrpl/protocol/AccountID.h>
43#include <xrpl/protocol/Indexes.h>
44#include <xrpl/protocol/LedgerHeader.h>
45#include <xrpl/protocol/nft.h>
70template <SomeSettingsProv
ider SettingsProv
iderType, SomeExecutionStrategy ExecutionStrategyType>
74 SettingsProviderType settingsProvider_;
77 std::atomic_uint32_t ledgerSequence_ = 0u;
83 mutable ExecutionStrategyType executor_;
93 : settingsProvider_{std::move(settingsProvider)}
94 , schema_{settingsProvider_}
95 , handle_{settingsProvider_.getSettings()}
96 , executor_{settingsProvider_.getSettings(), handle_}
98 if (
auto const res = handle_.
connect(); not res)
99 throw std::runtime_error(
"Could not connect to database: " + res.error());
102 if (
auto const res = handle_.
execute(schema_.createKeyspace); not res) {
105 if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
106 throw std::runtime_error(
"Could not create keyspace: " + res.error());
109 if (
auto const res = handle_.
executeEach(schema_.createSchema); not res)
110 throw std::runtime_error(
"Could not create schema: " + res.error());
115 }
catch (std::runtime_error
const& ex) {
116 LOG(log_.
error()) <<
"Failed to prepare the statements: " << ex.what() <<
"; readOnly: " << readOnly;
120 LOG(log_.
info()) <<
"Created (revamped) CassandraBackend";
125 ripple::AccountID
const& account,
126 std::uint32_t
const limit,
128 std::optional<TransactionsCursor>
const& cursorIn,
129 boost::asio::yield_context yield
134 return {.txns = {}, .cursor = {}};
136 Statement const statement = [
this, forward, &account]() {
138 return schema_->selectAccountTxForward.bind(account);
140 return schema_->selectAccountTx.bind(account);
143 auto cursor = cursorIn;
145 statement.
bindAt(1, cursor->asTuple());
146 LOG(log_.
debug()) <<
"account = " << ripple::strHex(account) <<
" tuple = " << cursor->ledgerSequence
147 << cursor->transactionIndex;
149 auto const seq = forward ? rng->minSequence : rng->maxSequence;
150 auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
152 statement.
bindAt(1, std::make_tuple(placeHolder, placeHolder));
153 LOG(log_.
debug()) <<
"account = " << ripple::strHex(account) <<
" idx = " << seq
154 <<
" tuple = " << placeHolder;
161 auto const res = executor_.read(yield, statement);
162 auto const& results = res.value();
163 if (not results.hasRows()) {
164 LOG(log_.
debug()) <<
"No rows returned";
168 std::vector<ripple::uint256> hashes = {};
169 auto numRows = results.numRows();
170 LOG(log_.
info()) <<
"num_rows = " << numRows;
172 for (
auto [hash,
data] :
extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
173 hashes.push_back(hash);
174 if (--numRows == 0) {
175 LOG(log_.
debug()) <<
"Setting cursor";
181 LOG(log_.
debug()) <<
"Txns = " << txns.size();
183 if (txns.size() == limit) {
184 LOG(log_.
debug()) <<
"Returning cursor";
185 return {txns, cursor};
203 executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_,
false, ledgerSequence_);
206 if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_,
true, ledgerSequence_ - 1))) {
207 LOG(log_.
warn()) <<
"Update failed for ledger " << ledgerSequence_;
211 LOG(log_.
info()) <<
"Committed ledger " << ledgerSequence_;
216 writeLedger(ripple::LedgerHeader
const& ledgerHeader, std::string&& blob)
override
218 executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob));
220 executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq);
222 ledgerSequence_ = ledgerHeader.seq;
225 std::optional<std::uint32_t>
228 if (
auto const res = executor_.read(yield, schema_->selectLatestLedger); res) {
229 if (
auto const& result = res.value(); result) {
230 if (
auto const maybeValue = result.template get<uint32_t>(); maybeValue)
233 LOG(log_.
error()) <<
"Could not fetch latest ledger - no rows";
237 LOG(log_.
error()) <<
"Could not fetch latest ledger - no result";
239 LOG(log_.
error()) <<
"Could not fetch latest ledger: " << res.error();
245 std::optional<ripple::LedgerHeader>
248 auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence);
250 if (
auto const& result = res.value(); result) {
251 if (
auto const maybeValue = result.template get<std::vector<unsigned char>>(); maybeValue) {
255 LOG(log_.
error()) <<
"Could not fetch ledger by sequence - no rows";
259 LOG(log_.
error()) <<
"Could not fetch ledger by sequence - no result";
261 LOG(log_.
error()) <<
"Could not fetch ledger by sequence: " << res.error();
267 std::optional<ripple::LedgerHeader>
270 if (
auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) {
271 if (
auto const& result = res.value(); result) {
272 if (
auto const maybeValue = result.template get<uint32_t>(); maybeValue)
275 LOG(log_.
error()) <<
"Could not fetch ledger by hash - no rows";
279 LOG(log_.
error()) <<
"Could not fetch ledger by hash - no result";
281 LOG(log_.
error()) <<
"Could not fetch ledger by hash: " << res.error();
287 std::optional<LedgerRange>
290 auto const res = executor_.read(yield, schema_->selectLedgerRange);
292 auto const& results = res.value();
293 if (not results.hasRows()) {
294 LOG(log_.
debug()) <<
"Could not fetch ledger range - no rows";
305 range.maxSequence = range.minSequence = seq;
306 }
else if (idx == 1) {
307 range.maxSequence = seq;
313 if (range.minSequence > range.maxSequence)
314 std::swap(range.minSequence, range.maxSequence);
316 LOG(log_.
debug()) <<
"After hardFetchLedgerRange range is " << range.minSequence <<
":"
317 << range.maxSequence;
320 LOG(log_.
error()) <<
"Could not fetch ledger range: " << res.error();
325 std::vector<TransactionAndMetadata>
332 std::vector<ripple::uint256>
336 auto start = std::chrono::system_clock::now();
337 auto const res = executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence);
340 LOG(log_.
error()) <<
"Could not fetch all transaction hashes: " << res.error();
344 auto const& result = res.value();
345 if (not result.hasRows()) {
346 LOG(log_.
warn()) <<
"Could not fetch all transaction hashes - no rows; ledger = "
347 << std::to_string(ledgerSequence);
351 std::vector<ripple::uint256> hashes;
353 hashes.push_back(std::move(hash));
355 auto end = std::chrono::system_clock::now();
356 LOG(log_.
debug()) <<
"Fetched " << hashes.size() <<
" transaction hashes from database in "
357 << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
364 fetchNFT(ripple::uint256
const& tokenID, std::uint32_t
const ledgerSequence, boost::asio::yield_context yield)
367 auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence);
371 if (
auto const maybeRow = res->template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
372 auto [seq, owner, isBurned] = *maybeRow;
373 auto result = std::make_optional<NFT>(tokenID, seq, owner, isBurned);
386 auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence);
388 if (
auto const maybeUri = uriRes->template get<ripple::Blob>(); maybeUri)
389 result->uri = *maybeUri;
395 LOG(log_.
error()) <<
"Could not fetch NFT - no rows";
401 ripple::uint256
const& tokenID,
402 std::uint32_t
const limit,
404 std::optional<TransactionsCursor>
const& cursorIn,
405 boost::asio::yield_context yield
410 return {.txns = {}, .cursor = {}};
412 Statement const statement = [
this, forward, &tokenID]() {
414 return schema_->selectNFTTxForward.bind(tokenID);
416 return schema_->selectNFTTx.bind(tokenID);
419 auto cursor = cursorIn;
421 statement.
bindAt(1, cursor->asTuple());
422 LOG(log_.
debug()) <<
"token_id = " << ripple::strHex(tokenID) <<
" tuple = " << cursor->ledgerSequence
423 << cursor->transactionIndex;
425 auto const seq = forward ? rng->minSequence : rng->maxSequence;
426 auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
428 statement.
bindAt(1, std::make_tuple(placeHolder, placeHolder));
429 LOG(log_.
debug()) <<
"token_id = " << ripple::strHex(tokenID) <<
" idx = " << seq
430 <<
" tuple = " << placeHolder;
435 auto const res = executor_.read(yield, statement);
436 auto const& results = res.value();
437 if (not results.hasRows()) {
438 LOG(log_.
debug()) <<
"No rows returned";
442 std::vector<ripple::uint256> hashes = {};
443 auto numRows = results.numRows();
444 LOG(log_.
info()) <<
"num_rows = " << numRows;
446 for (
auto [hash,
data] :
extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
447 hashes.push_back(hash);
448 if (--numRows == 0) {
449 LOG(log_.
debug()) <<
"Setting cursor";
455 ++cursor->transactionIndex;
460 LOG(log_.
debug()) <<
"NFT Txns = " << txns.size();
462 if (txns.size() == limit) {
463 LOG(log_.
debug()) <<
"Returning cursor";
464 return {txns, cursor};
472 ripple::AccountID
const& issuer,
473 std::optional<std::uint32_t>
const& taxon,
474 std::uint32_t
const ledgerSequence,
475 std::uint32_t
const limit,
476 std::optional<ripple::uint256>
const& cursorIn,
477 boost::asio::yield_context yield
482 Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit,
this]() {
483 if (taxon.has_value()) {
484 auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
486 r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
487 r.bindAt(3,
Limit{limit});
491 auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
495 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
496 cursorIn.value_or(ripple::uint256(0))
499 r.bindAt(2,
Limit{limit});
504 auto const res = executor_.read(yield, idQueryStatement);
506 auto const& idQueryResults = res.value();
507 if (not idQueryResults.hasRows()) {
508 LOG(log_.
debug()) <<
"No rows returned";
512 std::vector<ripple::uint256> nftIDs;
514 nftIDs.push_back(nftID);
519 if (nftIDs.size() == limit)
520 ret.cursor = nftIDs.back();
522 std::vector<Statement> selectNFTStatements;
523 selectNFTStatements.reserve(nftIDs.size());
528 std::back_inserter(selectNFTStatements),
529 [&](
auto const& nftID) {
return schema_->selectNFT.bind(nftID, ledgerSequence); }
532 auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
534 std::vector<Statement> selectNFTURIStatements;
535 selectNFTURIStatements.reserve(nftIDs.size());
540 std::back_inserter(selectNFTURIStatements),
541 [&](
auto const& nftID) {
return schema_->selectNFTURI.bind(nftID, ledgerSequence); }
544 auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
546 for (
auto i = 0u; i < nftIDs.size(); i++) {
547 if (
auto const maybeRow = nftInfos[i].
template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
548 auto [seq, owner, isBurned] = *maybeRow;
549 NFT nft(nftIDs[i], seq, owner, isBurned);
550 if (
auto const maybeUri = nftUris[i].
template get<ripple::Blob>(); maybeUri)
552 ret.nfts.push_back(nft);
560 ripple::uint192
const& mptID,
561 std::uint32_t
const limit,
562 std::optional<ripple::AccountID>
const& cursorIn,
563 std::uint32_t
const ledgerSequence,
564 boost::asio::yield_context yield
567 auto const holderEntries = executor_.read(
568 yield, schema_->selectMPTHolders, mptID, cursorIn.value_or(ripple::AccountID(0)),
Limit{limit}
571 auto const& holderResults = holderEntries.value();
572 if (not holderResults.hasRows()) {
573 LOG(log_.
debug()) <<
"No rows returned";
577 std::vector<ripple::uint256> mptKeys;
578 std::optional<ripple::AccountID> cursor;
580 mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key);
586 auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob
const& mpt) { return mpt.empty(); });
588 mptObjects.erase(it, mptObjects.end());
590 ASSERT(mptKeys.size() <= limit,
"Number of keys can't exceed the limit");
591 if (mptKeys.size() == limit)
592 return {mptObjects, cursor};
594 return {mptObjects, {}};
598 doFetchLedgerObject(ripple::uint256
const& key, std::uint32_t
const sequence, boost::asio::yield_context yield)
601 LOG(log_.
debug()) <<
"Fetching ledger object for seq " << sequence <<
", key = " << ripple::to_string(key);
602 if (
auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
603 if (
auto const result = res->template get<Blob>(); result) {
607 LOG(log_.
debug()) <<
"Could not fetch ledger object - no rows";
610 LOG(log_.
error()) <<
"Could not fetch ledger object: " << res.error();
616 std::optional<std::uint32_t>
620 LOG(log_.
debug()) <<
"Fetching ledger object for seq " << sequence <<
", key = " << ripple::to_string(key);
621 if (
auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
622 if (
auto const result = res->template get<Blob, std::uint32_t>(); result) {
623 auto [_, seq] = result.value();
626 LOG(log_.
debug()) <<
"Could not fetch ledger object sequence - no rows";
628 LOG(log_.
error()) <<
"Could not fetch ledger object sequence: " << res.error();
634 std::optional<TransactionAndMetadata>
635 fetchTransaction(ripple::uint256
const& hash, boost::asio::yield_context yield)
const override
637 if (
auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) {
638 if (
auto const maybeValue = res->template get<Blob, Blob, uint32_t, uint32_t>(); maybeValue) {
639 auto [transaction, meta, seq, date] = *maybeValue;
640 return std::make_optional<TransactionAndMetadata>(transaction, meta, seq, date);
643 LOG(log_.
debug()) <<
"Could not fetch transaction - no rows";
645 LOG(log_.
error()) <<
"Could not fetch transaction: " << res.error();
651 std::optional<ripple::uint256>
652 doFetchSuccessorKey(ripple::uint256 key, std::uint32_t
const ledgerSequence, boost::asio::yield_context yield)
655 if (
auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence); res) {
656 if (
auto const result = res->template get<ripple::uint256>(); result) {
657 if (*result == kLAST_KEY)
662 LOG(log_.
debug()) <<
"Could not fetch successor - no rows";
664 LOG(log_.
error()) <<
"Could not fetch successor: " << res.error();
670 std::vector<TransactionAndMetadata>
671 fetchTransactions(std::vector<ripple::uint256>
const& hashes, boost::asio::yield_context yield)
const override
676 auto const numHashes = hashes.size();
677 std::vector<TransactionAndMetadata> results;
678 results.reserve(numHashes);
680 std::vector<Statement> statements;
681 statements.reserve(numHashes);
683 auto const timeDiff =
util::timed([
this, yield, &results, &hashes, &statements]() {
688 std::back_inserter(statements),
689 [
this](
auto const& hash) {
return schema_->selectTransaction.bind(hash); }
692 auto const entries = executor_.readEach(yield, statements);
694 std::cbegin(entries),
696 std::back_inserter(results),
698 if (
auto const maybeRow = res.template get<Blob, Blob, uint32_t, uint32_t>(); maybeRow)
706 ASSERT(numHashes == results.size(),
"Number of hashes and results must match");
707 LOG(log_.
debug()) <<
"Fetched " << numHashes <<
" transactions from database in " << timeDiff
714 std::vector<ripple::uint256>
const& keys,
715 std::uint32_t
const sequence,
716 boost::asio::yield_context yield
722 auto const numKeys = keys.size();
723 LOG(log_.
trace()) <<
"Fetching " << numKeys <<
" objects";
725 std::vector<Blob> results;
726 results.reserve(numKeys);
728 std::vector<Statement> statements;
729 statements.reserve(numKeys);
735 std::back_inserter(statements),
736 [
this, &sequence](
auto const& key) {
return schema_->selectObject.bind(key, sequence); }
739 auto const entries = executor_.readEach(yield, statements);
741 std::cbegin(entries),
743 std::back_inserter(results),
744 [](
auto const& res) -> Blob {
745 if (
auto const maybeValue = res.template get<Blob>(); maybeValue)
752 LOG(log_.
trace()) <<
"Fetched " << numKeys <<
" objects";
756 std::vector<ripple::uint256>
757 fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield)
760 std::vector<ripple::uint256> liveAccounts;
761 std::optional<ripple::AccountID> lastItem;
763 while (liveAccounts.size() < number) {
764 Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem,
Limit{pageSize})
765 : schema_->selectAccountFromBegining.bind(
Limit{pageSize});
767 auto const res = executor_.read(yield, statement);
769 auto const& results = res.value();
770 if (not results.hasRows()) {
771 LOG(log_.
debug()) <<
"No rows returned";
775 std::vector<ripple::uint256> fullAccounts;
777 fullAccounts.push_back(ripple::keylet::account(account).key);
782 for (
auto i = 0u; i < fullAccounts.size(); i++) {
783 if (not objs[i].empty()) {
784 if (liveAccounts.size() < number) {
785 liveAccounts.push_back(fullAccounts[i]);
792 LOG(log_.
error()) <<
"Could not fetch account from account_tx: " << res.error();
800 std::vector<LedgerObject>
801 fetchLedgerDiff(std::uint32_t
const ledgerSequence, boost::asio::yield_context yield)
const override
803 auto const [keys, timeDiff] =
util::timed([
this, &ledgerSequence, yield]() -> std::vector<ripple::uint256> {
804 auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
806 LOG(log_.
error()) <<
"Could not fetch ledger diff: " << res.error() <<
"; ledger = " << ledgerSequence;
810 auto const& results = res.value();
812 LOG(log_.
error()) <<
"Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
816 std::vector<ripple::uint256> resultKeys;
818 resultKeys.push_back(key);
827 LOG(log_.
debug()) <<
"Fetched " << keys.size() <<
" diff hashes from database in " << timeDiff
831 std::vector<LedgerObject> results;
832 results.reserve(keys.size());
838 std::back_inserter(results),
839 [](
auto const& key,
auto const& obj) {
return LedgerObject{key, obj}; }
845 std::optional<std::string>
848 auto const res = executor_.read(yield, schema_->selectMigratorStatus,
Text(migratorName));
850 LOG(log_.
error()) <<
"Could not fetch migrator status: " << res.error();
854 auto const& results = res.value();
868 LOG(log_.
trace()) <<
" Writing ledger object " << key.size() <<
":" << seq <<
" [" << blob.size() <<
" bytes]";
871 executor_.write(schema_->insertDiff, seq, key);
873 executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));
877 writeSuccessor(std::string&& key, std::uint32_t
const seq, std::string&& successor)
override
879 LOG(log_.
trace()) <<
"Writing successor. key = " << key.size() <<
" bytes. "
880 <<
" seq = " << std::to_string(seq) <<
" successor = " << successor.size() <<
" bytes.";
881 ASSERT(!key.empty(),
"Key must not be empty");
882 ASSERT(!successor.empty(),
"Successor must not be empty");
884 executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor));
890 std::vector<Statement> statements;
891 statements.reserve(
data.size() * 10);
893 for (
auto& record :
data) {
895 std::begin(record.accounts),
896 std::end(record.accounts),
897 std::back_inserter(statements),
898 [
this, &record](
auto&& account) {
899 return schema_->insertAccountTx.bind(
900 std::forward<decltype(account)>(account),
901 std::make_tuple(record.ledgerSequence, record.transactionIndex),
908 executor_.write(std::move(statements));
914 std::vector<Statement> statements;
915 statements.reserve(
data.size());
917 std::transform(std::cbegin(
data), std::cend(
data), std::back_inserter(statements), [
this](
auto const& record) {
918 return schema_->insertNFTTx.bind(
919 record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash
923 executor_.write(std::move(statements));
929 std::uint32_t
const seq,
930 std::uint32_t
const date,
931 std::string&& transaction,
932 std::string&& metadata
935 LOG(log_.
trace()) <<
"Writing txn to database";
937 executor_.write(schema_->insertLedgerTransaction, seq, hash);
939 schema_->insertTransaction, std::move(hash), seq, date, std::move(transaction), std::move(metadata)
946 std::vector<Statement> statements;
947 statements.reserve(
data.size() * 3);
950 if (!record.onlyUriChanged) {
951 statements.push_back(
952 schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned)
961 statements.push_back(schema_->insertIssuerNFT.bind(
962 ripple::nft::getIssuer(record.tokenID),
963 static_cast<uint32_t
>(ripple::nft::getTaxon(record.tokenID)),
966 statements.push_back(
967 schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
972 statements.push_back(
973 schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
978 executor_.writeEach(std::move(statements));
984 std::vector<Statement> statements;
985 statements.reserve(
data.size());
986 for (
auto [mptId, holder] :
data)
987 statements.push_back(schema_->insertMPTHolder.bind(std::move(mptId), std::move(holder)));
989 executor_.write(std::move(statements));
1002 executor_.writeSync(
1010 return executor_.isTooBusy();
1016 return executor_.stats();
1023 auto const res = executor_.writeSync(statement);
1024 auto maybeSuccess = res->template get<bool>();
1025 if (not maybeSuccess) {
1026 LOG(log_.
error()) <<
"executeSyncUpdate - error getting result - no row";
1030 if (not maybeSuccess.value()) {
1031 LOG(log_.
warn()) <<
"Update failed. Checking if DB state is what we expect";
1038 return rng && rng->maxSequence == ledgerSequence_;
1045using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;
The interface to the database used by Clio.
Definition BackendInterface.hpp:138
std::vector< Blob > fetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t sequence, boost::asio::yield_context yield) const
Fetches all ledger objects by their keys.
Definition BackendInterface.cpp:119
std::optional< LedgerRange > hardFetchLedgerRangeNoThrow() const
Fetches the ledger range from DB retrying until no DatabaseTimeout is thrown.
Definition BackendInterface.cpp:77
std::optional< LedgerRange > fetchLedgerRange() const
Fetch the current ledger range.
Definition BackendInterface.cpp:267
Implements BackendInterface for Cassandra/ScyllaDB.
Definition CassandraBackend.hpp:71
std::vector< TransactionAndMetadata > fetchTransactions(std::vector< ripple::uint256 > const &hashes, boost::asio::yield_context yield) const override
Fetches multiple transactions.
Definition CassandraBackend.hpp:671
MPTHoldersAndCursor fetchMPTHolders(ripple::uint192 const &mptID, std::uint32_t const limit, std::optional< ripple::AccountID > const &cursorIn, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all holders' balances for a MPTIssuanceID.
Definition CassandraBackend.hpp:559
void writeTransaction(std::string &&hash, std::uint32_t const seq, std::uint32_t const date, std::string &&transaction, std::string &&metadata) override
Writes a new transaction.
Definition CassandraBackend.hpp:927
void writeNFTTransactions(std::vector< NFTTransactionsData > const &data) override
Write NFTs transactions.
Definition CassandraBackend.hpp:912
std::vector< ripple::uint256 > fetchAllTransactionHashesInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transaction hashes from a specific ledger.
Definition CassandraBackend.hpp:333
std::optional< ripple::uint256 > doFetchSuccessorKey(ripple::uint256 key, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Database-specific implementation of fetching the successor key.
Definition CassandraBackend.hpp:652
std::optional< std::uint32_t > doFetchLedgerObjectSeq(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object sequence.
Definition CassandraBackend.hpp:617
bool isTooBusy() const override
Definition CassandraBackend.hpp:1008
boost::json::object stats() const override
Definition CassandraBackend.hpp:1014
std::optional< std::string > fetchMigratorStatus(std::string const &migratorName, boost::asio::yield_context yield) const override
Fetches the status of migrator by name.
Definition CassandraBackend.hpp:846
std::optional< NFT > fetchNFT(ripple::uint256 const &tokenID, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches a specific NFT.
Definition CassandraBackend.hpp:364
void startWrites() const override
Starts a write transaction with the DB. No-op for cassandra.
Definition CassandraBackend.hpp:993
void writeLedger(ripple::LedgerHeader const &ledgerHeader, std::string &&blob) override
Writes to a specific ledger.
Definition CassandraBackend.hpp:216
void waitForWritesToFinish() override
Wait for all pending writes to finish.
Definition CassandraBackend.hpp:192
void writeMPTHolders(std::vector< MPTHolderData > const &data) override
Write accounts that started holding onto a MPT.
Definition CassandraBackend.hpp:982
void writeNFTs(std::vector< NFTsData > const &data) override
Writes NFTs to the database.
Definition CassandraBackend.hpp:944
void writeAccountTransactions(std::vector< AccountTransactionsData > data) override
Write a new set of account transactions.
Definition CassandraBackend.hpp:888
std::optional< std::uint32_t > fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
Fetches the latest ledger sequence.
Definition CassandraBackend.hpp:226
TransactionsAndCursor fetchAccountTransactions(ripple::AccountID const &account, std::uint32_t const limit, bool forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific account.
Definition CassandraBackend.hpp:124
void doWriteLedgerObject(std::string &&key, std::uint32_t const seq, std::string &&blob) override
Writes a ledger object to the database.
Definition CassandraBackend.hpp:866
NFTsAndCursor fetchNFTsByIssuer(ripple::AccountID const &issuer, std::optional< std::uint32_t > const &taxon, std::uint32_t const ledgerSequence, std::uint32_t const limit, std::optional< ripple::uint256 > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all NFTs issued by a given address.
Definition CassandraBackend.hpp:471
void writeSuccessor(std::string &&key, std::uint32_t const seq, std::string &&successor) override
Write a new successor.
Definition CassandraBackend.hpp:877
BasicCassandraBackend(SettingsProviderType settingsProvider, bool readOnly)
Create a new cassandra/scylla backend instance.
Definition CassandraBackend.hpp:92
std::optional< TransactionAndMetadata > fetchTransaction(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific transaction.
Definition CassandraBackend.hpp:635
std::vector< TransactionAndMetadata > fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transactions from a specific ledger.
Definition CassandraBackend.hpp:326
std::optional< Blob > doFetchLedgerObject(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object.
Definition CassandraBackend.hpp:598
bool doFinishWrites() override
The implementation should wait for all pending writes to finish.
Definition CassandraBackend.hpp:198
std::vector< ripple::uint256 > fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield) const override
Fetch the specified number of account root object indexes by page, the accounts need to exist for seq...
Definition CassandraBackend.hpp:757
std::vector< LedgerObject > fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Returns the difference between ledgers.
Definition CassandraBackend.hpp:801
void writeMigratorStatus(std::string const &migratorName, std::string const &status) override
Mark the migration status of a migrator as Migrated in the database.
Definition CassandraBackend.hpp:1000
TransactionsAndCursor fetchNFTTransactions(ripple::uint256 const &tokenID, std::uint32_t const limit, bool const forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific NFT.
Definition CassandraBackend.hpp:400
std::optional< ripple::LedgerHeader > fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
Fetches a specific ledger by sequence number.
Definition CassandraBackend.hpp:246
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching ledger objects.
Definition CassandraBackend.hpp:713
std::optional< LedgerRange > hardFetchLedgerRange(boost::asio::yield_context yield) const override
Fetches the ledger range from DB.
Definition CassandraBackend.hpp:288
std::optional< ripple::LedgerHeader > fetchLedgerByHash(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific ledger by hash.
Definition CassandraBackend.hpp:268
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:46
MaybeErrorType connect() const
Synchonous version of the above.
Definition Handle.cpp:55
MaybeErrorType executeEach(std::vector< StatementType > const &statements) const
Synchonous version of the above.
Definition Handle.cpp:109
ResultOrErrorType execute(std::string_view query, Args &&... args) const
Synchonous version of the above.
Definition Handle.hpp:185
Manages the DB schema and provides access to prepared statements.
Definition Schema.hpp:55
void prepareStatements(Handle const &handle)
Recreates the prepared statements.
Definition Schema.hpp:814
Definition Statement.hpp:45
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:91
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:210
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:215
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:195
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
This namespace implements a wrapper for the Cassandra C++ driver.
Definition Concepts.hpp:37
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:329
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:203
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
Represents an object in the ledger.
Definition Types.hpp:41
Stores a range of sequences as a min and max pair.
Definition Types.hpp:247
Represents an array of MPTokens.
Definition Types.hpp:239
Represents a NFToken.
Definition Types.hpp:172
Represents a bundle of NFTs with a cursor to the next page.
Definition Types.hpp:231
Represests a bundle of transactions with metadata and a cursor to the next page.
Definition Types.hpp:164
A strong type wrapper for int32_t.
Definition Types.hpp:56
A strong type wrapper for string.
Definition Types.hpp:67