22#include "data/BackendInterface.hpp"
24#include "data/LedgerCacheInterface.hpp"
25#include "data/LedgerHeaderCache.hpp"
26#include "data/Types.hpp"
27#include "data/cassandra/Concepts.hpp"
28#include "data/cassandra/Handle.hpp"
29#include "data/cassandra/Types.hpp"
30#include "data/cassandra/impl/ExecutionStrategy.hpp"
31#include "util/Assert.hpp"
32#include "util/LedgerUtils.hpp"
33#include "util/Profiler.hpp"
34#include "util/log/Logger.hpp"
36#include <boost/asio/spawn.hpp>
37#include <boost/json/object.hpp>
38#include <boost/uuid/string_generator.hpp>
39#include <boost/uuid/uuid.hpp>
41#include <fmt/format.h>
42#include <xrpl/basics/Blob.h>
43#include <xrpl/basics/base_uint.h>
44#include <xrpl/basics/strHex.h>
45#include <xrpl/protocol/AccountID.h>
46#include <xrpl/protocol/Indexes.h>
47#include <xrpl/protocol/LedgerHeader.h>
48#include <xrpl/protocol/nft.h>
64class CacheBackendCassandraTest;
87 SettingsProviderType settingsProvider_;
89 std::atomic_uint32_t ledgerSequence_ = 0u;
90 friend class ::CacheBackendCassandraTest;
95 mutable ExecutionStrategyType executor_;
97 mutable FetchLedgerCacheType ledgerCache_{};
108 SettingsProviderType settingsProvider,
113 , settingsProvider_{std::move(settingsProvider)}
114 , schema_{settingsProvider_}
115 , handle_{settingsProvider_.getSettings()}
116 , executor_{settingsProvider_.getSettings(), handle_}
118 if (
auto const res = handle_.connect(); not res.has_value())
119 throw std::runtime_error(
"Could not connect to database: " + res.error());
122 if (
auto const res = handle_.execute(schema_.createKeyspace); not res.has_value()) {
125 if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
126 throw std::runtime_error(
"Could not create keyspace: " + res.error());
129 if (
auto const res = handle_.executeEach(schema_.createSchema); not res.has_value())
130 throw std::runtime_error(
"Could not create schema: " + res.error());
134 schema_.prepareStatements(handle_);
135 }
catch (std::runtime_error
const& ex) {
136 auto const error = fmt::format(
137 "Failed to prepare the statements: {}; readOnly: {}. ReadOnly should be turned off "
139 "node with write access to DB should be started first.",
143 LOG(log_.error()) << error;
144 throw std::runtime_error(error);
146 LOG(log_.info()) <<
"Created (revamped) CassandraBackend";
156 ripple::AccountID
const& account,
157 std::uint32_t
const limit,
159 std::optional<TransactionsCursor>
const& txnCursor,
160 boost::asio::yield_context yield
165 return {.txns = {}, .cursor = {}};
167 Statement
const statement = [
this, forward, &account]() {
169 return schema_->selectAccountTxForward.bind(account);
171 return schema_->selectAccountTx.bind(account);
174 auto cursor = txnCursor;
176 statement.
bindAt(1, cursor->asTuple());
177 LOG(log_.debug()) <<
"account = " << ripple::strHex(account)
178 <<
" tuple = " << cursor->ledgerSequence << cursor->transactionIndex;
180 auto const seq = forward ? rng->minSequence : rng->maxSequence;
181 auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
183 statement.
bindAt(1, std::make_tuple(placeHolder, placeHolder));
184 LOG(log_.debug()) <<
"account = " << ripple::strHex(account) <<
" idx = " << seq
185 <<
" tuple = " << placeHolder;
192 auto const res = executor_.read(yield, statement);
193 auto const& results = res.value();
194 if (not results.hasRows()) {
195 LOG(log_.debug()) <<
"No rows returned";
199 std::vector<ripple::uint256> hashes = {};
200 auto numRows = results.numRows();
201 LOG(log_.info()) <<
"num_rows = " << numRows;
203 for (
auto [hash,
data] :
204 extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
205 hashes.push_back(hash);
206 if (--numRows == 0) {
207 LOG(log_.debug()) <<
"Setting cursor";
213 LOG(log_.debug()) <<
"Txns = " << txns.size();
215 if (txns.size() == limit) {
216 LOG(log_.debug()) <<
"Returning cursor";
217 return {txns, cursor};
230 writeLedger(ripple::LedgerHeader
const& ledgerHeader, std::string&& blob)
override
232 executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob));
234 executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq);
236 ledgerSequence_ = ledgerHeader.seq;
239 std::optional<std::uint32_t>
242 if (
auto const res = executor_.read(yield, schema_->selectLatestLedger); res.has_value()) {
243 if (
auto const& rows = *res; rows) {
244 if (
auto const maybeRow = rows.template get<uint32_t>(); maybeRow.has_value())
247 LOG(log_.error()) <<
"Could not fetch latest ledger - no rows";
251 LOG(log_.error()) <<
"Could not fetch latest ledger - no result";
253 LOG(log_.error()) <<
"Could not fetch latest ledger: " << res.error();
259 std::optional<ripple::LedgerHeader>
261 std::uint32_t
const sequence,
262 boost::asio::yield_context yield
265 if (
auto const lock = ledgerCache_.get(); lock.has_value() && lock->seq == sequence)
268 auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence);
270 if (
auto const& result = res.value(); result) {
271 if (
auto const maybeValue = result.template get<std::vector<unsigned char>>();
278 LOG(log_.error()) <<
"Could not fetch ledger by sequence - no rows";
282 LOG(log_.error()) <<
"Could not fetch ledger by sequence - no result";
284 LOG(log_.error()) <<
"Could not fetch ledger by sequence: " << res.error();
290 std::optional<ripple::LedgerHeader>
293 if (
auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) {
294 if (
auto const& result = res.value(); result) {
295 if (
auto const maybeValue = result.template get<uint32_t>(); maybeValue)
298 LOG(log_.error()) <<
"Could not fetch ledger by hash - no rows";
302 LOG(log_.error()) <<
"Could not fetch ledger by hash - no result";
304 LOG(log_.error()) <<
"Could not fetch ledger by hash: " << res.error();
310 std::optional<LedgerRange>
313 auto const res = executor_.read(yield, schema_->selectLedgerRange);
315 auto const& results = res.value();
316 if (not results.hasRows()) {
317 LOG(log_.debug()) <<
"Could not fetch ledger range - no rows";
328 range.maxSequence = range.minSequence = seq;
329 }
else if (idx == 1) {
330 range.maxSequence = seq;
336 if (range.minSequence > range.maxSequence)
337 std::swap(range.minSequence, range.maxSequence);
339 LOG(log_.debug()) <<
"After hardFetchLedgerRange range is " << range.minSequence <<
":"
340 << range.maxSequence;
343 LOG(log_.error()) <<
"Could not fetch ledger range: " << res.error();
348 std::vector<TransactionAndMetadata>
350 std::uint32_t
const ledgerSequence,
351 boost::asio::yield_context yield
358 std::vector<ripple::uint256>
360 std::uint32_t
const ledgerSequence,
361 boost::asio::yield_context yield
364 auto start = std::chrono::system_clock::now();
366 executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence);
369 LOG(log_.error()) <<
"Could not fetch all transaction hashes: " << res.error();
373 auto const& result = res.value();
374 if (not result.hasRows()) {
375 LOG(log_.warn()) <<
"Could not fetch all transaction hashes - no rows; ledger = "
376 << std::to_string(ledgerSequence);
380 std::vector<ripple::uint256> hashes;
382 hashes.push_back(std::move(hash));
384 auto end = std::chrono::system_clock::now();
388 << hashes.size() <<
" transaction hashes from database in "
389 << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
397 ripple::uint256
const& tokenID,
398 std::uint32_t
const ledgerSequence,
399 boost::asio::yield_context yield
402 auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence);
406 if (
auto const maybeRow = res->template get<uint32_t, ripple::AccountID, bool>();
408 auto [seq, owner, isBurned] = *maybeRow;
409 auto result = std::make_optional<NFT>(tokenID, seq, owner, isBurned);
422 auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence);
424 if (
auto const maybeUri = uriRes->template get<ripple::Blob>(); maybeUri)
425 result->uri = *maybeUri;
431 LOG(log_.error()) <<
"Could not fetch NFT - no rows";
437 ripple::uint256
const& tokenID,
438 std::uint32_t
const limit,
440 std::optional<TransactionsCursor>
const& cursorIn,
441 boost::asio::yield_context yield
446 return {.txns = {}, .cursor = {}};
448 Statement
const statement = [
this, forward, &tokenID]() {
450 return schema_->selectNFTTxForward.bind(tokenID);
452 return schema_->selectNFTTx.bind(tokenID);
455 auto cursor = cursorIn;
457 statement.
bindAt(1, cursor->asTuple());
458 LOG(log_.debug()) <<
"token_id = " << ripple::strHex(tokenID)
459 <<
" tuple = " << cursor->ledgerSequence << cursor->transactionIndex;
461 auto const seq = forward ? rng->minSequence : rng->maxSequence;
462 auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
464 statement.
bindAt(1, std::make_tuple(placeHolder, placeHolder));
465 LOG(log_.debug()) <<
"token_id = " << ripple::strHex(tokenID) <<
" idx = " << seq
466 <<
" tuple = " << placeHolder;
471 auto const res = executor_.read(yield, statement);
472 auto const& results = res.value();
473 if (not results.hasRows()) {
474 LOG(log_.debug()) <<
"No rows returned";
478 std::vector<ripple::uint256> hashes = {};
479 auto numRows = results.numRows();
480 LOG(log_.info()) <<
"num_rows = " << numRows;
482 for (
auto [hash,
data] :
483 extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
484 hashes.push_back(hash);
485 if (--numRows == 0) {
486 LOG(log_.debug()) <<
"Setting cursor";
492 ++cursor->transactionIndex;
497 LOG(log_.debug()) <<
"NFT Txns = " << txns.size();
499 if (txns.size() == limit) {
500 LOG(log_.debug()) <<
"Returning cursor";
501 return {txns, cursor};
509 ripple::uint192
const& mptID,
510 std::uint32_t
const limit,
511 std::optional<ripple::AccountID>
const& cursorIn,
512 std::uint32_t
const ledgerSequence,
513 boost::asio::yield_context yield
516 auto const holderEntries = executor_.read(
518 schema_->selectMPTHolders,
520 cursorIn.value_or(ripple::AccountID(0)),
524 auto const& holderResults = holderEntries.value();
525 if (not holderResults.hasRows()) {
526 LOG(log_.debug()) <<
"No rows returned";
530 std::vector<ripple::uint256> mptKeys;
531 std::optional<ripple::AccountID> cursor;
533 mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key);
539 auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob
const& mpt) {
543 mptObjects.erase(it, mptObjects.end());
545 ASSERT(mptKeys.size() <= limit,
"Number of keys can't exceed the limit");
546 if (mptKeys.size() == limit)
547 return {mptObjects, cursor};
549 return {mptObjects, {}};
554 ripple::uint256
const& key,
555 std::uint32_t
const sequence,
556 boost::asio::yield_context yield
559 LOG(log_.debug()) <<
"Fetching ledger object for seq " << sequence
560 <<
", key = " << ripple::to_string(key);
561 if (
auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
562 if (
auto const result = res->template get<Blob>(); result) {
566 LOG(log_.debug()) <<
"Could not fetch ledger object - no rows";
569 LOG(log_.error()) <<
"Could not fetch ledger object: " << res.error();
575 std::optional<std::uint32_t>
577 ripple::uint256
const& key,
578 std::uint32_t
const sequence,
579 boost::asio::yield_context yield
582 LOG(log_.debug()) <<
"Fetching ledger object for seq " << sequence
583 <<
", key = " << ripple::to_string(key);
584 if (
auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
585 if (
auto const result = res->template get<Blob, std::uint32_t>(); result) {
586 auto [_, seq] = result.value();
589 LOG(log_.debug()) <<
"Could not fetch ledger object sequence - no rows";
591 LOG(log_.error()) <<
"Could not fetch ledger object sequence: " << res.error();
597 std::optional<TransactionAndMetadata>
598 fetchTransaction(ripple::uint256
const& hash, boost::asio::yield_context yield)
const override
600 if (
auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) {
601 if (
auto const maybeValue = res->template get<Blob, Blob, uint32_t, uint32_t>();
603 auto [transaction, meta, seq, date] = *maybeValue;
604 return std::make_optional<TransactionAndMetadata>(transaction, meta, seq, date);
607 LOG(log_.debug()) <<
"Could not fetch transaction - no rows";
609 LOG(log_.error()) <<
"Could not fetch transaction: " << res.error();
615 std::optional<ripple::uint256>
618 std::uint32_t
const ledgerSequence,
619 boost::asio::yield_context yield
622 if (
auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence);
624 if (
auto const result = res->template get<ripple::uint256>(); result) {
625 if (*result == kLAST_KEY)
630 LOG(log_.debug()) <<
"Could not fetch successor - no rows";
632 LOG(log_.error()) <<
"Could not fetch successor: " << res.error();
638 std::vector<TransactionAndMetadata>
640 std::vector<ripple::uint256>
const& hashes,
641 boost::asio::yield_context yield
647 auto const numHashes = hashes.size();
648 std::vector<TransactionAndMetadata> results;
649 results.reserve(numHashes);
651 std::vector<Statement> statements;
652 statements.reserve(numHashes);
654 auto const timeDiff =
util::timed([
this, yield, &results, &hashes, &statements]() {
659 std::back_inserter(statements),
660 [
this](
auto const& hash) {
return schema_->selectTransaction.bind(hash); }
663 auto const entries = executor_.readEach(yield, statements);
665 std::cbegin(entries),
667 std::back_inserter(results),
669 if (
auto const maybeRow = res.template get<Blob, Blob, uint32_t, uint32_t>();
678 ASSERT(numHashes == results.size(),
"Number of hashes and results must match");
679 LOG(log_.debug()) <<
"Fetched " << numHashes <<
" transactions from database in "
680 << timeDiff <<
" milliseconds";
686 std::vector<ripple::uint256>
const& keys,
687 std::uint32_t
const sequence,
688 boost::asio::yield_context yield
694 auto const numKeys = keys.size();
695 LOG(log_.trace()) <<
"Fetching " << numKeys <<
" objects";
697 std::vector<Blob> results;
698 results.reserve(numKeys);
700 std::vector<Statement> statements;
701 statements.reserve(numKeys);
707 std::back_inserter(statements),
708 [
this, &sequence](
auto const& key) {
return schema_->selectObject.bind(key, sequence); }
711 auto const entries = executor_.readEach(yield, statements);
713 std::cbegin(entries),
715 std::back_inserter(results),
716 [](
auto const& res) -> Blob {
717 if (
auto const maybeValue = res.template get<Blob>(); maybeValue)
724 LOG(log_.trace()) <<
"Fetched " << numKeys <<
" objects";
728 std::vector<LedgerObject>
730 std::uint32_t
const ledgerSequence,
731 boost::asio::yield_context yield
734 auto const [keys, timeDiff] =
735 util::timed([
this, &ledgerSequence, yield]() -> std::vector<ripple::uint256> {
736 auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
738 LOG(log_.error()) <<
"Could not fetch ledger diff: " << res.error()
739 <<
"; ledger = " << ledgerSequence;
743 auto const& results = res.value();
746 <<
"Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
750 std::vector<ripple::uint256> resultKeys;
752 resultKeys.push_back(key);
761 LOG(log_.debug()) <<
"Fetched " << keys.size() <<
" diff hashes from database in "
762 << timeDiff <<
" milliseconds";
765 std::vector<LedgerObject> results;
766 results.reserve(keys.size());
772 std::back_inserter(results),
773 [](
auto const& key,
auto const& obj) {
return LedgerObject{key, obj}; }
779 std::optional<std::string>
781 std::string
const& migratorName,
782 boost::asio::yield_context yield
785 auto const res = executor_.read(yield, schema_->selectMigratorStatus,
Text(migratorName));
787 LOG(log_.error()) <<
"Could not fetch migrator status: " << res.error();
791 auto const& results = res.value();
802 std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>
805 auto const readResult = executor_.read(yield, schema_->selectClioNodesData);
807 return std::unexpected{readResult.error().message()};
809 std::vector<std::pair<boost::uuids::uuid, std::string>> result;
812 result.emplace_back(uuid, std::move(message));
821 LOG(log_.trace()) <<
" Writing ledger object " << key.size() <<
":" << seq <<
" ["
822 << blob.size() <<
" bytes]";
825 executor_.write(schema_->insertDiff, seq, key);
827 executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));
831 writeSuccessor(std::string&& key, std::uint32_t
const seq, std::string&& successor)
override
833 LOG(log_.trace()) <<
"Writing successor. key = " << key.size() <<
" bytes. "
834 <<
" seq = " << std::to_string(seq) <<
" successor = " << successor.size()
836 ASSERT(!key.empty(),
"Key must not be empty");
837 ASSERT(!successor.empty(),
"Successor must not be empty");
839 executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor));
845 std::vector<Statement> statements;
846 statements.reserve(
data.size() * 10);
848 for (
auto& record :
data) {
849 std::ranges::transform(
850 record.accounts, std::back_inserter(statements), [
this, &record](
auto&& account) {
851 return schema_->insertAccountTx.bind(
852 std::forward<decltype(account)>(account),
853 std::make_tuple(record.ledgerSequence, record.transactionIndex),
860 executor_.write(std::move(statements));
866 std::vector<Statement> statements;
867 statements.reserve(record.accounts.size());
869 std::ranges::transform(
870 record.accounts, std::back_inserter(statements), [
this, &record](
auto&& account) {
871 return schema_->insertAccountTx.bind(
872 std::forward<decltype(account)>(account),
873 std::make_tuple(record.ledgerSequence, record.transactionIndex),
879 executor_.write(std::move(statements));
885 std::vector<Statement> statements;
886 statements.reserve(
data.size());
888 std::ranges::transform(
data, std::back_inserter(statements), [
this](
auto const& record) {
889 return schema_->insertNFTTx.bind(
891 std::make_tuple(record.ledgerSequence, record.transactionIndex),
896 executor_.write(std::move(statements));
902 std::uint32_t
const seq,
903 std::uint32_t
const date,
904 std::string&& transaction,
905 std::string&& metadata
908 LOG(log_.trace()) <<
"Writing txn to database";
910 executor_.write(schema_->insertLedgerTransaction, seq, hash);
912 schema_->insertTransaction,
916 std::move(transaction),
924 std::vector<Statement> statements;
925 statements.reserve(
data.size() * 3);
928 if (!record.onlyUriChanged) {
929 statements.push_back(schema_->insertNFT.bind(
930 record.tokenID, record.ledgerSequence, record.owner, record.isBurned
939 statements.push_back(schema_->insertIssuerNFT.bind(
940 ripple::nft::getIssuer(record.tokenID),
941 static_cast<uint32_t
>(ripple::nft::getTaxon(record.tokenID)),
944 statements.push_back(schema_->insertNFTURI.bind(
945 record.tokenID, record.ledgerSequence, record.uri.value()
950 statements.push_back(schema_->insertNFTURI.bind(
951 record.tokenID, record.ledgerSequence, record.uri.value()
956 executor_.writeEach(std::move(statements));
962 std::vector<Statement> statements;
963 statements.reserve(
data.size());
964 for (
auto [mptId, holder] :
data)
965 statements.push_back(schema_->insertMPTHolder.bind(mptId, holder));
967 executor_.write(std::move(statements));
981 schema_->insertMigratorStatus,
998 return executor_.isTooBusy();
1004 return executor_.stats();
1017 auto const res = executor_.writeSync(statement);
1018 auto maybeSuccess = res->template get<bool>();
1019 if (not maybeSuccess) {
1020 LOG(log_.error()) <<
"executeSyncUpdate - error getting result - no row";
1024 if (not maybeSuccess.value()) {
1025 LOG(log_.warn()) <<
"Update failed. Checking if DB state is what we expect";
1032 return rng && rng->maxSequence == ledgerSequence_;
std::vector< Blob > fetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t sequence, boost::asio::yield_context yield) const
Fetches all ledger objects by their keys.
Definition BackendInterface.cpp:114
BackendInterface(LedgerCacheInterface &cache)
Construct a new backend interface instance.
Definition BackendInterface.hpp:158
std::optional< LedgerRange > hardFetchLedgerRangeNoThrow() const
Fetches the ledger range from DB retrying until no DatabaseTimeout is thrown.
Definition BackendInterface.cpp:72
std::optional< LedgerRange > fetchLedgerRange() const
Fetch the current ledger range.
Definition BackendInterface.cpp:268
LedgerCacheInterface const & cache() const
Definition BackendInterface.hpp:170
A simple cache holding one ripple::LedgerHeader to reduce DB lookups.
Definition LedgerHeaderCache.hpp:41
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:40
Implements BackendInterface for Cassandra/ScyllaDB/Keyspace.
Definition CassandraBackendFamily.hpp:83
void writeMigratorStatus(std::string const &migratorName, std::string const &status) override
Mark the migration status of a migrator as Migrated in the database.
Definition CassandraBackendFamily.hpp:978
std::optional< LedgerRange > hardFetchLedgerRange(boost::asio::yield_context yield) const override
Fetches the ledger range from DB.
Definition CassandraBackendFamily.hpp:311
void startWrites() const override
Starts a write transaction with the DB. No-op for cassandra.
Definition CassandraBackendFamily.hpp:971
void doWriteLedgerObject(std::string &&key, std::uint32_t const seq, std::string &&blob) override
Writes a ledger object to the database.
Definition CassandraBackendFamily.hpp:819
std::optional< TransactionAndMetadata > fetchTransaction(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific transaction.
Definition CassandraBackendFamily.hpp:598
TransactionsAndCursor fetchAccountTransactions(ripple::AccountID const &account, std::uint32_t const limit, bool forward, std::optional< TransactionsCursor > const &txnCursor, boost::asio::yield_context yield) const override
Fetches all transactions for a specific account.
Definition CassandraBackendFamily.hpp:155
MPTHoldersAndCursor fetchMPTHolders(ripple::uint192 const &mptID, std::uint32_t const limit, std::optional< ripple::AccountID > const &cursorIn, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all holders' balances for a MPTIssuanceID.
Definition CassandraBackendFamily.hpp:508
void writeNFTs(std::vector< NFTsData > const &data) override
Writes NFTs to the database.
Definition CassandraBackendFamily.hpp:922
std::optional< ripple::LedgerHeader > fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
Fetches a specific ledger by sequence number.
Definition CassandraBackendFamily.hpp:260
void writeNFTTransactions(std::vector< NFTTransactionsData > const &data) override
Write NFTs transactions.
Definition CassandraBackendFamily.hpp:883
void writeNodeMessage(boost::uuids::uuid const &uuid, std::string message) override
Write a node message. Used by ClusterCommunicationService.
Definition CassandraBackendFamily.hpp:988
std::optional< std::uint32_t > fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
Fetches the latest ledger sequence.
Definition CassandraBackendFamily.hpp:240
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Create a new cassandra/scylla backend instance.
Definition CassandraBackendFamily.hpp:107
std::optional< ripple::LedgerHeader > fetchLedgerByHash(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific ledger by hash.
Definition CassandraBackendFamily.hpp:291
std::optional< std::uint32_t > doFetchLedgerObjectSeq(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object sequence.
Definition CassandraBackendFamily.hpp:576
bool isTooBusy() const override
Definition CassandraBackendFamily.hpp:996
std::optional< NFT > fetchNFT(ripple::uint256 const &tokenID, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches a specific NFT.
Definition CassandraBackendFamily.hpp:396
void writeMPTHolders(std::vector< MPTHolderData > const &data) override
Write accounts that started holding onto a MPT.
Definition CassandraBackendFamily.hpp:960
void writeAccountTransaction(AccountTransactionsData record) override
Write a new account transaction.
Definition CassandraBackendFamily.hpp:864
void writeSuccessor(std::string &&key, std::uint32_t const seq, std::string &&successor) override
Write a new successor.
Definition CassandraBackendFamily.hpp:831
std::vector< ripple::uint256 > fetchAllTransactionHashesInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transaction hashes from a specific ledger.
Definition CassandraBackendFamily.hpp:359
void waitForWritesToFinish() override
Wait for all pending writes to finish.
Definition CassandraBackendFamily.hpp:224
std::vector< TransactionAndMetadata > fetchTransactions(std::vector< ripple::uint256 > const &hashes, boost::asio::yield_context yield) const override
Fetches multiple transactions.
Definition CassandraBackendFamily.hpp:639
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching ledger objects.
Definition CassandraBackendFamily.hpp:685
std::optional< ripple::uint256 > doFetchSuccessorKey(ripple::uint256 key, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Database-specific implementation of fetching the successor key.
Definition CassandraBackendFamily.hpp:616
boost::json::object stats() const override
Definition CassandraBackendFamily.hpp:1002
std::optional< std::string > fetchMigratorStatus(std::string const &migratorName, boost::asio::yield_context yield) const override
Fetches the status of migrator by name.
Definition CassandraBackendFamily.hpp:780
std::expected< std::vector< std::pair< boost::uuids::uuid, std::string > >, std::string > fetchClioNodesData(boost::asio::yield_context yield) const override
Fetches the data of all nodes in the cluster.
Definition CassandraBackendFamily.hpp:803
void writeAccountTransactions(std::vector< AccountTransactionsData > data) override
Write a new set of account transactions.
Definition CassandraBackendFamily.hpp:843
std::optional< Blob > doFetchLedgerObject(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object.
Definition CassandraBackendFamily.hpp:553
bool executeSyncUpdate(Statement statement)
Executes statements and tries to write to DB.
Definition CassandraBackendFamily.hpp:1015
void writeLedger(ripple::LedgerHeader const &ledgerHeader, std::string &&blob) override
Writes to a specific ledger.
Definition CassandraBackendFamily.hpp:230
TransactionsAndCursor fetchNFTTransactions(ripple::uint256 const &tokenID, std::uint32_t const limit, bool const forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific NFT.
Definition CassandraBackendFamily.hpp:436
void writeTransaction(std::string &&hash, std::uint32_t const seq, std::uint32_t const date, std::string &&transaction, std::string &&metadata) override
Writes a new transaction.
Definition CassandraBackendFamily.hpp:900
std::vector< LedgerObject > fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Returns the difference between ledgers.
Definition CassandraBackendFamily.hpp:729
std::vector< TransactionAndMetadata > fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transactions from a specific ledger.
Definition CassandraBackendFamily.hpp:349
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:46
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:95
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
The requirements of an execution strategy.
Definition Concepts.hpp:54
The requirements of a settings provider.
Definition Concepts.hpp:43
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:66
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:333
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:75
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:252
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:112
Struct to store ledger header cache entry and the sequence it belongs to.
Definition LedgerHeaderCache.hpp:48
Represents an object in the ledger.
Definition Types.hpp:41
Stores a range of sequences as a min and max pair.
Definition Types.hpp:262
Represents an array of MPTokens.
Definition Types.hpp:254
Represests a bundle of transactions with metadata and a cursor to the next page.
Definition Types.hpp:172
A strong type wrapper for int32_t.
Definition Types.hpp:57
A strong type wrapper for string.
Definition Types.hpp:68