Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CassandraBackendFamily.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/LedgerCacheInterface.hpp"
25#include "data/LedgerHeaderCache.hpp"
26#include "data/Types.hpp"
27#include "data/cassandra/Concepts.hpp"
28#include "data/cassandra/Handle.hpp"
29#include "data/cassandra/Types.hpp"
30#include "data/cassandra/impl/ExecutionStrategy.hpp"
31#include "util/Assert.hpp"
32#include "util/LedgerUtils.hpp"
33#include "util/Profiler.hpp"
34#include "util/log/Logger.hpp"
35
36#include <boost/asio/spawn.hpp>
37#include <boost/json/object.hpp>
38#include <boost/uuid/string_generator.hpp>
39#include <boost/uuid/uuid.hpp>
40#include <cassandra.h>
41#include <fmt/format.h>
42#include <xrpl/basics/Blob.h>
43#include <xrpl/basics/base_uint.h>
44#include <xrpl/basics/strHex.h>
45#include <xrpl/protocol/AccountID.h>
46#include <xrpl/protocol/Indexes.h>
47#include <xrpl/protocol/LedgerHeader.h>
48#include <xrpl/protocol/nft.h>
49
50#include <algorithm>
51#include <atomic>
52#include <chrono>
53#include <cstddef>
54#include <cstdint>
55#include <iterator>
56#include <limits>
57#include <optional>
58#include <stdexcept>
59#include <string>
60#include <tuple>
61#include <utility>
62#include <vector>
63
64class CacheBackendCassandraTest;
65
66namespace data::cassandra {
67
78template <
79 SomeSettingsProvider SettingsProviderType,
80 SomeExecutionStrategy ExecutionStrategyType,
81 typename SchemaType,
82 typename FetchLedgerCacheType = FetchLedgerCache>
84protected:
85 util::Logger log_{"Backend"};
86
87 SettingsProviderType settingsProvider_;
88 SchemaType schema_;
89 std::atomic_uint32_t ledgerSequence_ = 0u;
90 friend class ::CacheBackendCassandraTest;
91
92 Handle handle_;
93
94 // have to be mutable because BackendInterface constness :(
95 mutable ExecutionStrategyType executor_;
96 // TODO: move to interface level
97 mutable FetchLedgerCacheType ledgerCache_{};
98
99public:
107 CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface& cache, bool readOnly)
109 , settingsProvider_{std::move(settingsProvider)}
110 , schema_{settingsProvider_}
111 , handle_{settingsProvider_.getSettings()}
112 , executor_{settingsProvider_.getSettings(), handle_}
113 {
114 if (auto const res = handle_.connect(); not res.has_value())
115 throw std::runtime_error("Could not connect to database: " + res.error());
116
117 if (not readOnly) {
118 if (auto const res = handle_.execute(schema_.createKeyspace); not res.has_value()) {
119 // on datastax, creation of keyspaces can be configured to only be done thru the admin
120 // interface. this does not mean that the keyspace does not already exist tho.
121 if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
122 throw std::runtime_error("Could not create keyspace: " + res.error());
123 }
124
125 if (auto const res = handle_.executeEach(schema_.createSchema); not res.has_value())
126 throw std::runtime_error("Could not create schema: " + res.error());
127 }
128
129 try {
130 schema_.prepareStatements(handle_);
131 } catch (std::runtime_error const& ex) {
132 auto const error = fmt::format(
133 "Failed to prepare the statements: {}; readOnly: {}. ReadOnly should be turned off or another Clio "
134 "node with write access to DB should be started first.",
135 ex.what(),
136 readOnly
137 );
138 LOG(log_.error()) << error;
139 throw std::runtime_error(error);
140 }
141 LOG(log_.info()) << "Created (revamped) CassandraBackend";
142 }
143
144 /*
145 * @brief Move constructor is deleted because handle_ is shared by reference with executor
146 */
148
151 ripple::AccountID const& account,
152 std::uint32_t const limit,
153 bool forward,
154 std::optional<TransactionsCursor> const& txnCursor,
155 boost::asio::yield_context yield
156 ) const override
157 {
158 auto rng = fetchLedgerRange();
159 if (!rng)
160 return {.txns = {}, .cursor = {}};
161
162 Statement const statement = [this, forward, &account]() {
163 if (forward)
164 return schema_->selectAccountTxForward.bind(account);
165
166 return schema_->selectAccountTx.bind(account);
167 }();
168
169 auto cursor = txnCursor;
170 if (cursor) {
171 statement.bindAt(1, cursor->asTuple());
172 LOG(log_.debug()) << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence
173 << cursor->transactionIndex;
174 } else {
175 auto const seq = forward ? rng->minSequence : rng->maxSequence;
176 auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
177
178 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
179 LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq
180 << " tuple = " << placeHolder;
181 }
182
183 // FIXME: Limit is a hack to support uint32_t properly for the time
184 // being. Should be removed later and schema updated to use proper
185 // types.
186 statement.bindAt(2, Limit{limit});
187 auto const res = executor_.read(yield, statement);
188 auto const& results = res.value();
189 if (not results.hasRows()) {
190 LOG(log_.debug()) << "No rows returned";
191 return {};
192 }
193
194 std::vector<ripple::uint256> hashes = {};
195 auto numRows = results.numRows();
196 LOG(log_.info()) << "num_rows = " << numRows;
197
198 for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
199 hashes.push_back(hash);
200 if (--numRows == 0) {
201 LOG(log_.debug()) << "Setting cursor";
202 cursor = data;
203 }
204 }
205
206 auto const txns = fetchTransactions(hashes, yield);
207 LOG(log_.debug()) << "Txns = " << txns.size();
208
209 if (txns.size() == limit) {
210 LOG(log_.debug()) << "Returning cursor";
211 return {txns, cursor};
212 }
213
214 return {txns, {}};
215 }
216
217 void
219 {
220 executor_.sync();
221 }
222
223 void
224 writeLedger(ripple::LedgerHeader const& ledgerHeader, std::string&& blob) override
225 {
226 executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob));
227
228 executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq);
229
230 ledgerSequence_ = ledgerHeader.seq;
231 }
232
233 std::optional<std::uint32_t>
234 fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
235 {
236 if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res.has_value()) {
237 if (auto const& rows = *res; rows) {
238 if (auto const maybeRow = rows.template get<uint32_t>(); maybeRow.has_value())
239 return maybeRow;
240
241 LOG(log_.error()) << "Could not fetch latest ledger - no rows";
242 return std::nullopt;
243 }
244
245 LOG(log_.error()) << "Could not fetch latest ledger - no result";
246 } else {
247 LOG(log_.error()) << "Could not fetch latest ledger: " << res.error();
248 }
249
250 return std::nullopt;
251 }
252
253 std::optional<ripple::LedgerHeader>
254 fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
255 {
256 if (auto const lock = ledgerCache_.get(); lock.has_value() && lock->seq == sequence)
257 return lock->ledger;
258
259 auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence);
260 if (res) {
261 if (auto const& result = res.value(); result) {
262 if (auto const maybeValue = result.template get<std::vector<unsigned char>>(); maybeValue) {
263 auto const header = util::deserializeHeader(ripple::makeSlice(*maybeValue));
264 ledgerCache_.put(FetchLedgerCache::CacheEntry{header, sequence});
265 return header;
266 }
267
268 LOG(log_.error()) << "Could not fetch ledger by sequence - no rows";
269 return std::nullopt;
270 }
271
272 LOG(log_.error()) << "Could not fetch ledger by sequence - no result";
273 } else {
274 LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error();
275 }
276
277 return std::nullopt;
278 }
279
280 std::optional<ripple::LedgerHeader>
281 fetchLedgerByHash(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
282 {
283 if (auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) {
284 if (auto const& result = res.value(); result) {
285 if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
286 return fetchLedgerBySequence(*maybeValue, yield);
287
288 LOG(log_.error()) << "Could not fetch ledger by hash - no rows";
289 return std::nullopt;
290 }
291
292 LOG(log_.error()) << "Could not fetch ledger by hash - no result";
293 } else {
294 LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error();
295 }
296
297 return std::nullopt;
298 }
299
300 std::optional<LedgerRange>
301 hardFetchLedgerRange(boost::asio::yield_context yield) const override
302 {
303 auto const res = executor_.read(yield, schema_->selectLedgerRange);
304 if (res) {
305 auto const& results = res.value();
306 if (not results.hasRows()) {
307 LOG(log_.debug()) << "Could not fetch ledger range - no rows";
308 return std::nullopt;
309 }
310
311 // TODO: this is probably a good place to use user type in
312 // cassandra instead of having two rows with bool flag. or maybe at
313 // least use tuple<int, int>?
314 LedgerRange range;
315 std::size_t idx = 0;
316 for (auto [seq] : extract<uint32_t>(results)) {
317 if (idx == 0) {
318 range.maxSequence = range.minSequence = seq;
319 } else if (idx == 1) {
320 range.maxSequence = seq;
321 }
322
323 ++idx;
324 }
325
326 if (range.minSequence > range.maxSequence)
327 std::swap(range.minSequence, range.maxSequence);
328
329 LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":"
330 << range.maxSequence;
331 return range;
332 }
333 LOG(log_.error()) << "Could not fetch ledger range: " << res.error();
334
335 return std::nullopt;
336 }
337
338 std::vector<TransactionAndMetadata>
339 fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
340 {
341 auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence, yield);
342 return fetchTransactions(hashes, yield);
343 }
344
345 std::vector<ripple::uint256>
347 std::uint32_t const ledgerSequence,
348 boost::asio::yield_context yield
349 ) const override
350 {
351 auto start = std::chrono::system_clock::now();
352 auto const res = executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence);
353
354 if (not res) {
355 LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error();
356 return {};
357 }
358
359 auto const& result = res.value();
360 if (not result.hasRows()) {
361 LOG(log_.warn()) << "Could not fetch all transaction hashes - no rows; ledger = "
362 << std::to_string(ledgerSequence);
363 return {};
364 }
365
366 std::vector<ripple::uint256> hashes;
367 for (auto [hash] : extract<ripple::uint256>(result))
368 hashes.push_back(std::move(hash));
369
370 auto end = std::chrono::system_clock::now();
371 LOG(log_.debug()) << "Fetched " << hashes.size() << " transaction hashes from database in "
372 << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
373 << " milliseconds";
374
375 return hashes;
376 }
377
378 std::optional<NFT>
380 ripple::uint256 const& tokenID,
381 std::uint32_t const ledgerSequence,
382 boost::asio::yield_context yield
383 ) const override
384 {
385 auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence);
386 if (not res)
387 return std::nullopt;
388
389 if (auto const maybeRow = res->template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
390 auto [seq, owner, isBurned] = *maybeRow;
391 auto result = std::make_optional<NFT>(tokenID, seq, owner, isBurned);
392
393 // now fetch URI. Usually we will have the URI even for burned NFTs,
394 // but if the first ledger on this clio included NFTokenBurn
395 // transactions we will not have the URIs for any of those tokens.
396 // In any other case not having the URI indicates something went
397 // wrong with our data.
398 //
399 // TODO - in the future would be great for any handlers that use
400 // this could inject a warning in this case (the case of not having
401 // a URI because it was burned in the first ledger) to indicate that
402 // even though we are returning a blank URI, the NFT might have had
403 // one.
404 auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence);
405 if (uriRes) {
406 if (auto const maybeUri = uriRes->template get<ripple::Blob>(); maybeUri)
407 result->uri = *maybeUri;
408 }
409
410 return result;
411 }
412
413 LOG(log_.error()) << "Could not fetch NFT - no rows";
414 return std::nullopt;
415 }
416
419 ripple::uint256 const& tokenID,
420 std::uint32_t const limit,
421 bool const forward,
422 std::optional<TransactionsCursor> const& cursorIn,
423 boost::asio::yield_context yield
424 ) const override
425 {
426 auto rng = fetchLedgerRange();
427 if (!rng)
428 return {.txns = {}, .cursor = {}};
429
430 Statement const statement = [this, forward, &tokenID]() {
431 if (forward)
432 return schema_->selectNFTTxForward.bind(tokenID);
433
434 return schema_->selectNFTTx.bind(tokenID);
435 }();
436
437 auto cursor = cursorIn;
438 if (cursor) {
439 statement.bindAt(1, cursor->asTuple());
440 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence
441 << cursor->transactionIndex;
442 } else {
443 auto const seq = forward ? rng->minSequence : rng->maxSequence;
444 auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
445
446 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
447 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq
448 << " tuple = " << placeHolder;
449 }
450
451 statement.bindAt(2, Limit{limit});
452
453 auto const res = executor_.read(yield, statement);
454 auto const& results = res.value();
455 if (not results.hasRows()) {
456 LOG(log_.debug()) << "No rows returned";
457 return {};
458 }
459
460 std::vector<ripple::uint256> hashes = {};
461 auto numRows = results.numRows();
462 LOG(log_.info()) << "num_rows = " << numRows;
463
464 for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
465 hashes.push_back(hash);
466 if (--numRows == 0) {
467 LOG(log_.debug()) << "Setting cursor";
468 cursor = data;
469
470 // forward queries by ledger/tx sequence `>=`
471 // so we have to advance the index by one
472 if (forward)
473 ++cursor->transactionIndex;
474 }
475 }
476
477 auto const txns = fetchTransactions(hashes, yield);
478 LOG(log_.debug()) << "NFT Txns = " << txns.size();
479
480 if (txns.size() == limit) {
481 LOG(log_.debug()) << "Returning cursor";
482 return {txns, cursor};
483 }
484
485 return {txns, {}};
486 }
487
490 ripple::uint192 const& mptID,
491 std::uint32_t const limit,
492 std::optional<ripple::AccountID> const& cursorIn,
493 std::uint32_t const ledgerSequence,
494 boost::asio::yield_context yield
495 ) const override
496 {
497 auto const holderEntries = executor_.read(
498 yield, schema_->selectMPTHolders, mptID, cursorIn.value_or(ripple::AccountID(0)), Limit{limit}
499 );
500
501 auto const& holderResults = holderEntries.value();
502 if (not holderResults.hasRows()) {
503 LOG(log_.debug()) << "No rows returned";
504 return {};
505 }
506
507 std::vector<ripple::uint256> mptKeys;
508 std::optional<ripple::AccountID> cursor;
509 for (auto const [holder] : extract<ripple::AccountID>(holderResults)) {
510 mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key);
511 cursor = holder;
512 }
513
514 auto mptObjects = doFetchLedgerObjects(mptKeys, ledgerSequence, yield);
515
516 auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob const& mpt) { return mpt.empty(); });
517
518 mptObjects.erase(it, mptObjects.end());
519
520 ASSERT(mptKeys.size() <= limit, "Number of keys can't exceed the limit");
521 if (mptKeys.size() == limit)
522 return {mptObjects, cursor};
523
524 return {mptObjects, {}};
525 }
526
527 std::optional<Blob>
529 ripple::uint256 const& key,
530 std::uint32_t const sequence,
531 boost::asio::yield_context yield
532 ) const override
533 {
534 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
535 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
536 if (auto const result = res->template get<Blob>(); result) {
537 if (result->size())
538 return result;
539 } else {
540 LOG(log_.debug()) << "Could not fetch ledger object - no rows";
541 }
542 } else {
543 LOG(log_.error()) << "Could not fetch ledger object: " << res.error();
544 }
545
546 return std::nullopt;
547 }
548
549 std::optional<std::uint32_t>
551 ripple::uint256 const& key,
552 std::uint32_t const sequence,
553 boost::asio::yield_context yield
554 ) const override
555 {
556 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
557 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
558 if (auto const result = res->template get<Blob, std::uint32_t>(); result) {
559 auto [_, seq] = result.value();
560 return seq;
561 }
562 LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows";
563 } else {
564 LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error();
565 }
566
567 return std::nullopt;
568 }
569
570 std::optional<TransactionAndMetadata>
571 fetchTransaction(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
572 {
573 if (auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) {
574 if (auto const maybeValue = res->template get<Blob, Blob, uint32_t, uint32_t>(); maybeValue) {
575 auto [transaction, meta, seq, date] = *maybeValue;
576 return std::make_optional<TransactionAndMetadata>(transaction, meta, seq, date);
577 }
578
579 LOG(log_.debug()) << "Could not fetch transaction - no rows";
580 } else {
581 LOG(log_.error()) << "Could not fetch transaction: " << res.error();
582 }
583
584 return std::nullopt;
585 }
586
587 std::optional<ripple::uint256>
589 ripple::uint256 key,
590 std::uint32_t const ledgerSequence,
591 boost::asio::yield_context yield
592 ) const override
593 {
594 if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence); res) {
595 if (auto const result = res->template get<ripple::uint256>(); result) {
596 if (*result == kLAST_KEY)
597 return std::nullopt;
598 return result;
599 }
600
601 LOG(log_.debug()) << "Could not fetch successor - no rows";
602 } else {
603 LOG(log_.error()) << "Could not fetch successor: " << res.error();
604 }
605
606 return std::nullopt;
607 }
608
609 std::vector<TransactionAndMetadata>
610 fetchTransactions(std::vector<ripple::uint256> const& hashes, boost::asio::yield_context yield) const override
611 {
612 if (hashes.empty())
613 return {};
614
615 auto const numHashes = hashes.size();
616 std::vector<TransactionAndMetadata> results;
617 results.reserve(numHashes);
618
619 std::vector<Statement> statements;
620 statements.reserve(numHashes);
621
622 auto const timeDiff = util::timed([this, yield, &results, &hashes, &statements]() {
623 // TODO: seems like a job for "hash IN (list of hashes)" instead?
624 std::transform(
625 std::cbegin(hashes), std::cend(hashes), std::back_inserter(statements), [this](auto const& hash) {
626 return schema_->selectTransaction.bind(hash);
627 }
628 );
629
630 auto const entries = executor_.readEach(yield, statements);
631 std::transform(
632 std::cbegin(entries),
633 std::cend(entries),
634 std::back_inserter(results),
635 [](auto const& res) -> TransactionAndMetadata {
636 if (auto const maybeRow = res.template get<Blob, Blob, uint32_t, uint32_t>(); maybeRow)
637 return *maybeRow;
638
639 return {};
640 }
641 );
642 });
643
644 ASSERT(numHashes == results.size(), "Number of hashes and results must match");
645 LOG(log_.debug()) << "Fetched " << numHashes << " transactions from database in " << timeDiff
646 << " milliseconds";
647 return results;
648 }
649
650 std::vector<Blob>
652 std::vector<ripple::uint256> const& keys,
653 std::uint32_t const sequence,
654 boost::asio::yield_context yield
655 ) const override
656 {
657 if (keys.empty())
658 return {};
659
660 auto const numKeys = keys.size();
661 LOG(log_.trace()) << "Fetching " << numKeys << " objects";
662
663 std::vector<Blob> results;
664 results.reserve(numKeys);
665
666 std::vector<Statement> statements;
667 statements.reserve(numKeys);
668
669 // TODO: seems like a job for "key IN (list of keys)" instead?
670 std::transform(
671 std::cbegin(keys), std::cend(keys), std::back_inserter(statements), [this, &sequence](auto const& key) {
672 return schema_->selectObject.bind(key, sequence);
673 }
674 );
675
676 auto const entries = executor_.readEach(yield, statements);
677 std::transform(
678 std::cbegin(entries), std::cend(entries), std::back_inserter(results), [](auto const& res) -> Blob {
679 if (auto const maybeValue = res.template get<Blob>(); maybeValue)
680 return *maybeValue;
681
682 return {};
683 }
684 );
685
686 LOG(log_.trace()) << "Fetched " << numKeys << " objects";
687 return results;
688 }
689
690 std::vector<LedgerObject>
691 fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
692 {
693 auto const [keys, timeDiff] = util::timed([this, &ledgerSequence, yield]() -> std::vector<ripple::uint256> {
694 auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
695 if (not res) {
696 LOG(log_.error()) << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence;
697 return {};
698 }
699
700 auto const& results = res.value();
701 if (not results) {
702 LOG(log_.error()) << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
703 return {};
704 }
705
706 std::vector<ripple::uint256> resultKeys;
707 for (auto [key] : extract<ripple::uint256>(results))
708 resultKeys.push_back(key);
709
710 return resultKeys;
711 });
712
713 // one of the above errors must have happened
714 if (keys.empty())
715 return {};
716
717 LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from database in " << timeDiff
718 << " milliseconds";
719
720 auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield);
721 std::vector<LedgerObject> results;
722 results.reserve(keys.size());
723
724 std::transform(
725 std::cbegin(keys),
726 std::cend(keys),
727 std::cbegin(objs),
728 std::back_inserter(results),
729 [](auto const& key, auto const& obj) { return LedgerObject{key, obj}; }
730 );
731
732 return results;
733 }
734
735 std::optional<std::string>
736 fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const override
737 {
738 auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName));
739 if (not res) {
740 LOG(log_.error()) << "Could not fetch migrator status: " << res.error();
741 return {};
742 }
743
744 auto const& results = res.value();
745 if (not results) {
746 return {};
747 }
748
749 for (auto [statusString] : extract<std::string>(results))
750 return statusString;
751
752 return {};
753 }
754
755 std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>
756 fetchClioNodesData(boost::asio::yield_context yield) const override
757 {
758 auto const readResult = executor_.read(yield, schema_->selectClioNodesData);
759 if (not readResult)
760 return std::unexpected{readResult.error().message()};
761
762 std::vector<std::pair<boost::uuids::uuid, std::string>> result;
763
764 for (auto [uuid, message] : extract<boost::uuids::uuid, std::string>(*readResult)) {
765 result.emplace_back(uuid, std::move(message));
766 }
767
768 return result;
769 }
770
771 void
772 doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
773 {
774 LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]";
775
776 if (range_)
777 executor_.write(schema_->insertDiff, seq, key);
778
779 executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));
780 }
781
782 void
783 writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override
784 {
785 LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. "
786 << " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes.";
787 ASSERT(!key.empty(), "Key must not be empty");
788 ASSERT(!successor.empty(), "Successor must not be empty");
789
790 executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor));
791 }
792
793 void
794 writeAccountTransactions(std::vector<AccountTransactionsData> data) override
795 {
796 std::vector<Statement> statements;
797 statements.reserve(data.size() * 10); // assume 10 transactions avg
798
799 for (auto& record : data) {
800 std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
801 return schema_->insertAccountTx.bind(
802 std::forward<decltype(account)>(account),
803 std::make_tuple(record.ledgerSequence, record.transactionIndex),
804 record.txHash
805 );
806 });
807 }
808
809 executor_.write(std::move(statements));
810 }
811
812 void
814 {
815 std::vector<Statement> statements;
816 statements.reserve(record.accounts.size());
817
818 std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
819 return schema_->insertAccountTx.bind(
820 std::forward<decltype(account)>(account),
821 std::make_tuple(record.ledgerSequence, record.transactionIndex),
822 record.txHash
823 );
824 });
825
826 executor_.write(std::move(statements));
827 }
828
829 void
830 writeNFTTransactions(std::vector<NFTTransactionsData> const& data) override
831 {
832 std::vector<Statement> statements;
833 statements.reserve(data.size());
834
835 std::ranges::transform(data, std::back_inserter(statements), [this](auto const& record) {
836 return schema_->insertNFTTx.bind(
837 record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash
838 );
839 });
840
841 executor_.write(std::move(statements));
842 }
843
844 void
846 std::string&& hash,
847 std::uint32_t const seq,
848 std::uint32_t const date,
849 std::string&& transaction,
850 std::string&& metadata
851 ) override
852 {
853 LOG(log_.trace()) << "Writing txn to database";
854
855 executor_.write(schema_->insertLedgerTransaction, seq, hash);
856 executor_.write(
857 schema_->insertTransaction, std::move(hash), seq, date, std::move(transaction), std::move(metadata)
858 );
859 }
860
861 void
862 writeNFTs(std::vector<NFTsData> const& data) override
863 {
864 std::vector<Statement> statements;
865 statements.reserve(data.size() * 3);
866
867 for (NFTsData const& record : data) {
868 if (!record.onlyUriChanged) {
869 statements.push_back(
870 schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned)
871 );
872
873 // If `uri` is set (and it can be set to an empty uri), we know this
874 // is a net-new NFT. That is, this NFT has not been seen before by
875 // us _OR_ it is in the extreme edge case of a re-minted NFT ID with
876 // the same NFT ID as an already-burned token. In this case, we need
877 // to record the URI and link to the issuer_nf_tokens table.
878 if (record.uri) {
879 statements.push_back(schema_->insertIssuerNFT.bind(
880 ripple::nft::getIssuer(record.tokenID),
881 static_cast<uint32_t>(ripple::nft::getTaxon(record.tokenID)),
882 record.tokenID
883 ));
884 statements.push_back(
885 schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
886 );
887 }
888 } else {
889 // only uri changed, we update the uri table only
890 statements.push_back(
891 schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
892 );
893 }
894 }
895
896 executor_.writeEach(std::move(statements));
897 }
898
899 void
900 writeMPTHolders(std::vector<MPTHolderData> const& data) override
901 {
902 std::vector<Statement> statements;
903 statements.reserve(data.size());
904 for (auto [mptId, holder] : data)
905 statements.push_back(schema_->insertMPTHolder.bind(mptId, holder));
906
907 executor_.write(std::move(statements));
908 }
909
910 void
911 startWrites() const override
912 {
913 // Note: no-op in original implementation too.
914 // probably was used in PG to start a transaction or smth.
915 }
916
917 void
918 writeMigratorStatus(std::string const& migratorName, std::string const& status) override
919 {
920 executor_.writeSync(
921 schema_->insertMigratorStatus, data::cassandra::Text{migratorName}, data::cassandra::Text(status)
922 );
923 }
924
925 void
926 writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override
927 {
928 executor_.writeSync(schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid);
929 }
930
931 bool
932 isTooBusy() const override
933 {
934 return executor_.isTooBusy();
935 }
936
937 boost::json::object
938 stats() const override
939 {
940 return executor_.stats();
941 }
942
943protected:
950 bool
951 executeSyncUpdate(Statement statement)
952 {
953 auto const res = executor_.writeSync(statement);
954 auto maybeSuccess = res->template get<bool>();
955 if (not maybeSuccess) {
956 LOG(log_.error()) << "executeSyncUpdate - error getting result - no row";
957 return false;
958 }
959
960 if (not maybeSuccess.value()) {
961 LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect";
962
963 // error may indicate that another writer wrote something.
964 // in this case let's just compare the current state of things
965 // against what we were trying to write in the first place and
966 // use that as the source of truth for the result.
967 auto rng = hardFetchLedgerRangeNoThrow();
968 return rng && rng->maxSequence == ledgerSequence_;
969 }
970
971 return true;
972 }
973};
974
975} // namespace data::cassandra
std::vector< Blob > fetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t sequence, boost::asio::yield_context yield) const
Fetches all ledger objects by their keys.
Definition BackendInterface.cpp:114
BackendInterface(LedgerCacheInterface &cache)
Construct a new backend interface instance.
Definition BackendInterface.hpp:153
std::optional< LedgerRange > hardFetchLedgerRangeNoThrow() const
Fetches the ledger range from DB retrying until no DatabaseTimeout is thrown.
Definition BackendInterface.cpp:72
std::optional< LedgerRange > fetchLedgerRange() const
Fetch the current ledger range.
Definition BackendInterface.cpp:262
LedgerCacheInterface const & cache() const
Definition BackendInterface.hpp:165
A simple cache holding one ripple::LedgerHeader to reduce DB lookups.
Definition LedgerHeaderCache.hpp:41
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:38
Implements BackendInterface for Cassandra/ScyllaDB/Keyspace.
Definition CassandraBackendFamily.hpp:83
void writeMigratorStatus(std::string const &migratorName, std::string const &status) override
Mark the migration status of a migrator as Migrated in the database.
Definition CassandraBackendFamily.hpp:918
std::optional< LedgerRange > hardFetchLedgerRange(boost::asio::yield_context yield) const override
Fetches the ledger range from DB.
Definition CassandraBackendFamily.hpp:301
void startWrites() const override
Starts a write transaction with the DB. No-op for cassandra.
Definition CassandraBackendFamily.hpp:911
void doWriteLedgerObject(std::string &&key, std::uint32_t const seq, std::string &&blob) override
Writes a ledger object to the database.
Definition CassandraBackendFamily.hpp:772
std::optional< TransactionAndMetadata > fetchTransaction(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific transaction.
Definition CassandraBackendFamily.hpp:571
TransactionsAndCursor fetchAccountTransactions(ripple::AccountID const &account, std::uint32_t const limit, bool forward, std::optional< TransactionsCursor > const &txnCursor, boost::asio::yield_context yield) const override
Fetches all transactions for a specific account.
Definition CassandraBackendFamily.hpp:150
MPTHoldersAndCursor fetchMPTHolders(ripple::uint192 const &mptID, std::uint32_t const limit, std::optional< ripple::AccountID > const &cursorIn, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all holders' balances for a MPTIssuanceID.
Definition CassandraBackendFamily.hpp:489
void writeNFTs(std::vector< NFTsData > const &data) override
Writes NFTs to the database.
Definition CassandraBackendFamily.hpp:862
std::optional< ripple::LedgerHeader > fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
Fetches a specific ledger by sequence number.
Definition CassandraBackendFamily.hpp:254
void writeNFTTransactions(std::vector< NFTTransactionsData > const &data) override
Write NFTs transactions.
Definition CassandraBackendFamily.hpp:830
void writeNodeMessage(boost::uuids::uuid const &uuid, std::string message) override
Write a node message. Used by ClusterCommunicationService.
Definition CassandraBackendFamily.hpp:926
std::optional< std::uint32_t > fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
Fetches the latest ledger sequence.
Definition CassandraBackendFamily.hpp:234
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Create a new cassandra/scylla backend instance.
Definition CassandraBackendFamily.hpp:107
std::optional< ripple::LedgerHeader > fetchLedgerByHash(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific ledger by hash.
Definition CassandraBackendFamily.hpp:281
std::optional< std::uint32_t > doFetchLedgerObjectSeq(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object sequence.
Definition CassandraBackendFamily.hpp:550
bool isTooBusy() const override
Definition CassandraBackendFamily.hpp:932
std::optional< NFT > fetchNFT(ripple::uint256 const &tokenID, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches a specific NFT.
Definition CassandraBackendFamily.hpp:379
void writeMPTHolders(std::vector< MPTHolderData > const &data) override
Write accounts that started holding onto a MPT.
Definition CassandraBackendFamily.hpp:900
void writeAccountTransaction(AccountTransactionsData record) override
Write a new account transaction.
Definition CassandraBackendFamily.hpp:813
void writeSuccessor(std::string &&key, std::uint32_t const seq, std::string &&successor) override
Write a new successor.
Definition CassandraBackendFamily.hpp:783
std::vector< ripple::uint256 > fetchAllTransactionHashesInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transaction hashes from a specific ledger.
Definition CassandraBackendFamily.hpp:346
void waitForWritesToFinish() override
Wait for all pending writes to finish.
Definition CassandraBackendFamily.hpp:218
std::vector< TransactionAndMetadata > fetchTransactions(std::vector< ripple::uint256 > const &hashes, boost::asio::yield_context yield) const override
Fetches multiple transactions.
Definition CassandraBackendFamily.hpp:610
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching ledger objects.
Definition CassandraBackendFamily.hpp:651
std::optional< ripple::uint256 > doFetchSuccessorKey(ripple::uint256 key, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Database-specific implementation of fetching the successor key.
Definition CassandraBackendFamily.hpp:588
boost::json::object stats() const override
Definition CassandraBackendFamily.hpp:938
std::optional< std::string > fetchMigratorStatus(std::string const &migratorName, boost::asio::yield_context yield) const override
Fetches the status of migrator by name.
Definition CassandraBackendFamily.hpp:736
std::expected< std::vector< std::pair< boost::uuids::uuid, std::string > >, std::string > fetchClioNodesData(boost::asio::yield_context yield) const override
Fetches the data of all nodes in the cluster.
Definition CassandraBackendFamily.hpp:756
void writeAccountTransactions(std::vector< AccountTransactionsData > data) override
Write a new set of account transactions.
Definition CassandraBackendFamily.hpp:794
std::optional< Blob > doFetchLedgerObject(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object.
Definition CassandraBackendFamily.hpp:528
bool executeSyncUpdate(Statement statement)
Executes statements and tries to write to DB.
Definition CassandraBackendFamily.hpp:951
void writeLedger(ripple::LedgerHeader const &ledgerHeader, std::string &&blob) override
Writes to a specific ledger.
Definition CassandraBackendFamily.hpp:224
TransactionsAndCursor fetchNFTTransactions(ripple::uint256 const &tokenID, std::uint32_t const limit, bool const forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific NFT.
Definition CassandraBackendFamily.hpp:418
void writeTransaction(std::string &&hash, std::uint32_t const seq, std::uint32_t const date, std::string &&transaction, std::string &&metadata) override
Writes a new transaction.
Definition CassandraBackendFamily.hpp:845
std::vector< LedgerObject > fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Returns the difference between ledgers.
Definition CassandraBackendFamily.hpp:691
std::vector< TransactionAndMetadata > fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transactions from a specific ledger.
Definition CassandraBackendFamily.hpp:339
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:46
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:95
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:94
The requirements of an execution strategy.
Definition Concepts.hpp:54
The requirements of a settings provider.
Definition Concepts.hpp:43
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:66
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:329
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:217
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
Struct to store ledger header cache entry and the sequence it belongs to.
Definition LedgerHeaderCache.hpp:48
Represents an object in the ledger.
Definition Types.hpp:41
Stores a range of sequences as a min and max pair.
Definition Types.hpp:247
Represents an array of MPTokens.
Definition Types.hpp:239
Represents a transaction and its metadata bundled together.
Definition Types.hpp:68
Represests a bundle of transactions with metadata and a cursor to the next page.
Definition Types.hpp:164
A strong type wrapper for int32_t.
Definition Types.hpp:57
A strong type wrapper for string.
Definition Types.hpp:68