Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CassandraBackendFamily.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/LedgerCacheInterface.hpp"
25#include "data/LedgerHeaderCache.hpp"
26#include "data/Types.hpp"
27#include "data/cassandra/Concepts.hpp"
28#include "data/cassandra/Handle.hpp"
29#include "data/cassandra/Types.hpp"
30#include "data/cassandra/impl/ExecutionStrategy.hpp"
31#include "util/Assert.hpp"
32#include "util/LedgerUtils.hpp"
33#include "util/Profiler.hpp"
34#include "util/log/Logger.hpp"
35
36#include <boost/asio/spawn.hpp>
37#include <boost/json/object.hpp>
38#include <boost/uuid/string_generator.hpp>
39#include <boost/uuid/uuid.hpp>
40#include <cassandra.h>
41#include <fmt/format.h>
42#include <xrpl/basics/Blob.h>
43#include <xrpl/basics/base_uint.h>
44#include <xrpl/basics/strHex.h>
45#include <xrpl/protocol/AccountID.h>
46#include <xrpl/protocol/Indexes.h>
47#include <xrpl/protocol/LedgerHeader.h>
48#include <xrpl/protocol/nft.h>
49
50#include <algorithm>
51#include <atomic>
52#include <chrono>
53#include <cstddef>
54#include <cstdint>
55#include <iterator>
56#include <limits>
57#include <optional>
58#include <stdexcept>
59#include <string>
60#include <tuple>
61#include <utility>
62#include <vector>
63
64class CacheBackendCassandraTest;
65
66namespace data::cassandra {
67
78template <
79 SomeSettingsProvider SettingsProviderType,
80 SomeExecutionStrategy ExecutionStrategyType,
81 typename SchemaType,
82 typename FetchLedgerCacheType = FetchLedgerCache>
84protected:
85 util::Logger log_{"Backend"};
86
87 SettingsProviderType settingsProvider_;
88 SchemaType schema_;
89 std::atomic_uint32_t ledgerSequence_ = 0u;
90 friend class ::CacheBackendCassandraTest;
91
92 Handle handle_;
93
94 // have to be mutable because BackendInterface constness :(
95 mutable ExecutionStrategyType executor_;
96 // TODO: move to interface level
97 mutable FetchLedgerCacheType ledgerCache_{};
98
99public:
108 SettingsProviderType settingsProvider,
110 bool readOnly
111 )
113 , settingsProvider_{std::move(settingsProvider)}
114 , schema_{settingsProvider_}
115 , handle_{settingsProvider_.getSettings()}
116 , executor_{settingsProvider_.getSettings(), handle_}
117 {
118 if (auto const res = handle_.connect(); not res.has_value())
119 throw std::runtime_error("Could not connect to database: " + res.error());
120
121 if (not readOnly) {
122 if (auto const res = handle_.execute(schema_.createKeyspace); not res.has_value()) {
123 // on datastax, creation of keyspaces can be configured to only be done thru the
124 // admin interface. this does not mean that the keyspace does not already exist tho.
125 if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
126 throw std::runtime_error("Could not create keyspace: " + res.error());
127 }
128
129 if (auto const res = handle_.executeEach(schema_.createSchema); not res.has_value())
130 throw std::runtime_error("Could not create schema: " + res.error());
131 }
132
133 try {
134 schema_.prepareStatements(handle_);
135 } catch (std::runtime_error const& ex) {
136 auto const error = fmt::format(
137 "Failed to prepare the statements: {}; readOnly: {}. ReadOnly should be turned off "
138 "or another Clio "
139 "node with write access to DB should be started first.",
140 ex.what(),
141 readOnly
142 );
143 LOG(log_.error()) << error;
144 throw std::runtime_error(error);
145 }
146 LOG(log_.info()) << "Created (revamped) CassandraBackend";
147 }
148
149 /*
150 * @brief Move constructor is deleted because handle_ is shared by reference with executor
151 */
153
156 ripple::AccountID const& account,
157 std::uint32_t const limit,
158 bool forward,
159 std::optional<TransactionsCursor> const& txnCursor,
160 boost::asio::yield_context yield
161 ) const override
162 {
163 auto rng = fetchLedgerRange();
164 if (!rng)
165 return {.txns = {}, .cursor = {}};
166
167 Statement const statement = [this, forward, &account]() {
168 if (forward)
169 return schema_->selectAccountTxForward.bind(account);
170
171 return schema_->selectAccountTx.bind(account);
172 }();
173
174 auto cursor = txnCursor;
175 if (cursor) {
176 statement.bindAt(1, cursor->asTuple());
177 LOG(log_.debug()) << "account = " << ripple::strHex(account)
178 << " tuple = " << cursor->ledgerSequence << cursor->transactionIndex;
179 } else {
180 auto const seq = forward ? rng->minSequence : rng->maxSequence;
181 auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
182
183 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
184 LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq
185 << " tuple = " << placeHolder;
186 }
187
188 // FIXME: Limit is a hack to support uint32_t properly for the time
189 // being. Should be removed later and schema updated to use proper
190 // types.
191 statement.bindAt(2, Limit{limit});
192 auto const res = executor_.read(yield, statement);
193 auto const& results = res.value();
194 if (not results.hasRows()) {
195 LOG(log_.debug()) << "No rows returned";
196 return {};
197 }
198
199 std::vector<ripple::uint256> hashes = {};
200 auto numRows = results.numRows();
201 LOG(log_.info()) << "num_rows = " << numRows;
202
203 for (auto [hash, data] :
204 extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
205 hashes.push_back(hash);
206 if (--numRows == 0) {
207 LOG(log_.debug()) << "Setting cursor";
208 cursor = data;
209 }
210 }
211
212 auto const txns = fetchTransactions(hashes, yield);
213 LOG(log_.debug()) << "Txns = " << txns.size();
214
215 if (txns.size() == limit) {
216 LOG(log_.debug()) << "Returning cursor";
217 return {txns, cursor};
218 }
219
220 return {txns, {}};
221 }
222
223 void
225 {
226 executor_.sync();
227 }
228
229 void
230 writeLedger(ripple::LedgerHeader const& ledgerHeader, std::string&& blob) override
231 {
232 executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob));
233
234 executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq);
235
236 ledgerSequence_ = ledgerHeader.seq;
237 }
238
239 std::optional<std::uint32_t>
240 fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
241 {
242 if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res.has_value()) {
243 if (auto const& rows = *res; rows) {
244 if (auto const maybeRow = rows.template get<uint32_t>(); maybeRow.has_value())
245 return maybeRow;
246
247 LOG(log_.error()) << "Could not fetch latest ledger - no rows";
248 return std::nullopt;
249 }
250
251 LOG(log_.error()) << "Could not fetch latest ledger - no result";
252 } else {
253 LOG(log_.error()) << "Could not fetch latest ledger: " << res.error();
254 }
255
256 return std::nullopt;
257 }
258
259 std::optional<ripple::LedgerHeader>
261 std::uint32_t const sequence,
262 boost::asio::yield_context yield
263 ) const override
264 {
265 if (auto const lock = ledgerCache_.get(); lock.has_value() && lock->seq == sequence)
266 return lock->ledger;
267
268 auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence);
269 if (res) {
270 if (auto const& result = res.value(); result) {
271 if (auto const maybeValue = result.template get<std::vector<unsigned char>>();
272 maybeValue) {
273 auto const header = util::deserializeHeader(ripple::makeSlice(*maybeValue));
274 ledgerCache_.put(FetchLedgerCache::CacheEntry{header, sequence});
275 return header;
276 }
277
278 LOG(log_.error()) << "Could not fetch ledger by sequence - no rows";
279 return std::nullopt;
280 }
281
282 LOG(log_.error()) << "Could not fetch ledger by sequence - no result";
283 } else {
284 LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error();
285 }
286
287 return std::nullopt;
288 }
289
290 std::optional<ripple::LedgerHeader>
291 fetchLedgerByHash(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
292 {
293 if (auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) {
294 if (auto const& result = res.value(); result) {
295 if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
296 return fetchLedgerBySequence(*maybeValue, yield);
297
298 LOG(log_.error()) << "Could not fetch ledger by hash - no rows";
299 return std::nullopt;
300 }
301
302 LOG(log_.error()) << "Could not fetch ledger by hash - no result";
303 } else {
304 LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error();
305 }
306
307 return std::nullopt;
308 }
309
310 std::optional<LedgerRange>
311 hardFetchLedgerRange(boost::asio::yield_context yield) const override
312 {
313 auto const res = executor_.read(yield, schema_->selectLedgerRange);
314 if (res) {
315 auto const& results = res.value();
316 if (not results.hasRows()) {
317 LOG(log_.debug()) << "Could not fetch ledger range - no rows";
318 return std::nullopt;
319 }
320
321 // TODO: this is probably a good place to use user type in
322 // cassandra instead of having two rows with bool flag. or maybe at
323 // least use tuple<int, int>?
324 LedgerRange range;
325 std::size_t idx = 0;
326 for (auto [seq] : extract<uint32_t>(results)) {
327 if (idx == 0) {
328 range.maxSequence = range.minSequence = seq;
329 } else if (idx == 1) {
330 range.maxSequence = seq;
331 }
332
333 ++idx;
334 }
335
336 if (range.minSequence > range.maxSequence)
337 std::swap(range.minSequence, range.maxSequence);
338
339 LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":"
340 << range.maxSequence;
341 return range;
342 }
343 LOG(log_.error()) << "Could not fetch ledger range: " << res.error();
344
345 return std::nullopt;
346 }
347
348 std::vector<TransactionAndMetadata>
350 std::uint32_t const ledgerSequence,
351 boost::asio::yield_context yield
352 ) const override
353 {
354 auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence, yield);
355 return fetchTransactions(hashes, yield);
356 }
357
358 std::vector<ripple::uint256>
360 std::uint32_t const ledgerSequence,
361 boost::asio::yield_context yield
362 ) const override
363 {
364 auto start = std::chrono::system_clock::now();
365 auto const res =
366 executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence);
367
368 if (not res) {
369 LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error();
370 return {};
371 }
372
373 auto const& result = res.value();
374 if (not result.hasRows()) {
375 LOG(log_.warn()) << "Could not fetch all transaction hashes - no rows; ledger = "
376 << std::to_string(ledgerSequence);
377 return {};
378 }
379
380 std::vector<ripple::uint256> hashes;
381 for (auto [hash] : extract<ripple::uint256>(result))
382 hashes.push_back(std::move(hash));
383
384 auto end = std::chrono::system_clock::now();
385 LOG(
386 log_.debug()
387 ) << "Fetched "
388 << hashes.size() << " transaction hashes from database in "
389 << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
390 << " milliseconds";
391
392 return hashes;
393 }
394
395 std::optional<NFT>
397 ripple::uint256 const& tokenID,
398 std::uint32_t const ledgerSequence,
399 boost::asio::yield_context yield
400 ) const override
401 {
402 auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence);
403 if (not res)
404 return std::nullopt;
405
406 if (auto const maybeRow = res->template get<uint32_t, ripple::AccountID, bool>();
407 maybeRow) {
408 auto [seq, owner, isBurned] = *maybeRow;
409 auto result = std::make_optional<NFT>(tokenID, seq, owner, isBurned);
410
411 // now fetch URI. Usually we will have the URI even for burned NFTs,
412 // but if the first ledger on this clio included NFTokenBurn
413 // transactions we will not have the URIs for any of those tokens.
414 // In any other case not having the URI indicates something went
415 // wrong with our data.
416 //
417 // TODO - in the future would be great for any handlers that use
418 // this could inject a warning in this case (the case of not having
419 // a URI because it was burned in the first ledger) to indicate that
420 // even though we are returning a blank URI, the NFT might have had
421 // one.
422 auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence);
423 if (uriRes) {
424 if (auto const maybeUri = uriRes->template get<ripple::Blob>(); maybeUri)
425 result->uri = *maybeUri;
426 }
427
428 return result;
429 }
430
431 LOG(log_.error()) << "Could not fetch NFT - no rows";
432 return std::nullopt;
433 }
434
437 ripple::uint256 const& tokenID,
438 std::uint32_t const limit,
439 bool const forward,
440 std::optional<TransactionsCursor> const& cursorIn,
441 boost::asio::yield_context yield
442 ) const override
443 {
444 auto rng = fetchLedgerRange();
445 if (!rng)
446 return {.txns = {}, .cursor = {}};
447
448 Statement const statement = [this, forward, &tokenID]() {
449 if (forward)
450 return schema_->selectNFTTxForward.bind(tokenID);
451
452 return schema_->selectNFTTx.bind(tokenID);
453 }();
454
455 auto cursor = cursorIn;
456 if (cursor) {
457 statement.bindAt(1, cursor->asTuple());
458 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID)
459 << " tuple = " << cursor->ledgerSequence << cursor->transactionIndex;
460 } else {
461 auto const seq = forward ? rng->minSequence : rng->maxSequence;
462 auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
463
464 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
465 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq
466 << " tuple = " << placeHolder;
467 }
468
469 statement.bindAt(2, Limit{limit});
470
471 auto const res = executor_.read(yield, statement);
472 auto const& results = res.value();
473 if (not results.hasRows()) {
474 LOG(log_.debug()) << "No rows returned";
475 return {};
476 }
477
478 std::vector<ripple::uint256> hashes = {};
479 auto numRows = results.numRows();
480 LOG(log_.info()) << "num_rows = " << numRows;
481
482 for (auto [hash, data] :
483 extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
484 hashes.push_back(hash);
485 if (--numRows == 0) {
486 LOG(log_.debug()) << "Setting cursor";
487 cursor = data;
488
489 // forward queries by ledger/tx sequence `>=`
490 // so we have to advance the index by one
491 if (forward)
492 ++cursor->transactionIndex;
493 }
494 }
495
496 auto const txns = fetchTransactions(hashes, yield);
497 LOG(log_.debug()) << "NFT Txns = " << txns.size();
498
499 if (txns.size() == limit) {
500 LOG(log_.debug()) << "Returning cursor";
501 return {txns, cursor};
502 }
503
504 return {txns, {}};
505 }
506
509 ripple::uint192 const& mptID,
510 std::uint32_t const limit,
511 std::optional<ripple::AccountID> const& cursorIn,
512 std::uint32_t const ledgerSequence,
513 boost::asio::yield_context yield
514 ) const override
515 {
516 auto const holderEntries = executor_.read(
517 yield,
518 schema_->selectMPTHolders,
519 mptID,
520 cursorIn.value_or(ripple::AccountID(0)),
521 Limit{limit}
522 );
523
524 auto const& holderResults = holderEntries.value();
525 if (not holderResults.hasRows()) {
526 LOG(log_.debug()) << "No rows returned";
527 return {};
528 }
529
530 std::vector<ripple::uint256> mptKeys;
531 std::optional<ripple::AccountID> cursor;
532 for (auto const [holder] : extract<ripple::AccountID>(holderResults)) {
533 mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key);
534 cursor = holder;
535 }
536
537 auto mptObjects = doFetchLedgerObjects(mptKeys, ledgerSequence, yield);
538
539 auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob const& mpt) {
540 return mpt.empty();
541 });
542
543 mptObjects.erase(it, mptObjects.end());
544
545 ASSERT(mptKeys.size() <= limit, "Number of keys can't exceed the limit");
546 if (mptKeys.size() == limit)
547 return {mptObjects, cursor};
548
549 return {mptObjects, {}};
550 }
551
552 std::optional<Blob>
554 ripple::uint256 const& key,
555 std::uint32_t const sequence,
556 boost::asio::yield_context yield
557 ) const override
558 {
559 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence
560 << ", key = " << ripple::to_string(key);
561 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
562 if (auto const result = res->template get<Blob>(); result) {
563 if (result->size())
564 return result;
565 } else {
566 LOG(log_.debug()) << "Could not fetch ledger object - no rows";
567 }
568 } else {
569 LOG(log_.error()) << "Could not fetch ledger object: " << res.error();
570 }
571
572 return std::nullopt;
573 }
574
575 std::optional<std::uint32_t>
577 ripple::uint256 const& key,
578 std::uint32_t const sequence,
579 boost::asio::yield_context yield
580 ) const override
581 {
582 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence
583 << ", key = " << ripple::to_string(key);
584 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
585 if (auto const result = res->template get<Blob, std::uint32_t>(); result) {
586 auto [_, seq] = result.value();
587 return seq;
588 }
589 LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows";
590 } else {
591 LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error();
592 }
593
594 return std::nullopt;
595 }
596
597 std::optional<TransactionAndMetadata>
598 fetchTransaction(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
599 {
600 if (auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) {
601 if (auto const maybeValue = res->template get<Blob, Blob, uint32_t, uint32_t>();
602 maybeValue) {
603 auto [transaction, meta, seq, date] = *maybeValue;
604 return std::make_optional<TransactionAndMetadata>(transaction, meta, seq, date);
605 }
606
607 LOG(log_.debug()) << "Could not fetch transaction - no rows";
608 } else {
609 LOG(log_.error()) << "Could not fetch transaction: " << res.error();
610 }
611
612 return std::nullopt;
613 }
614
615 std::optional<ripple::uint256>
617 ripple::uint256 key,
618 std::uint32_t const ledgerSequence,
619 boost::asio::yield_context yield
620 ) const override
621 {
622 if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence);
623 res) {
624 if (auto const result = res->template get<ripple::uint256>(); result) {
625 if (*result == kLAST_KEY)
626 return std::nullopt;
627 return result;
628 }
629
630 LOG(log_.debug()) << "Could not fetch successor - no rows";
631 } else {
632 LOG(log_.error()) << "Could not fetch successor: " << res.error();
633 }
634
635 return std::nullopt;
636 }
637
638 std::vector<TransactionAndMetadata>
640 std::vector<ripple::uint256> const& hashes,
641 boost::asio::yield_context yield
642 ) const override
643 {
644 if (hashes.empty())
645 return {};
646
647 auto const numHashes = hashes.size();
648 std::vector<TransactionAndMetadata> results;
649 results.reserve(numHashes);
650
651 std::vector<Statement> statements;
652 statements.reserve(numHashes);
653
654 auto const timeDiff = util::timed([this, yield, &results, &hashes, &statements]() {
655 // TODO: seems like a job for "hash IN (list of hashes)" instead?
656 std::transform(
657 std::cbegin(hashes),
658 std::cend(hashes),
659 std::back_inserter(statements),
660 [this](auto const& hash) { return schema_->selectTransaction.bind(hash); }
661 );
662
663 auto const entries = executor_.readEach(yield, statements);
664 std::transform(
665 std::cbegin(entries),
666 std::cend(entries),
667 std::back_inserter(results),
668 [](auto const& res) -> TransactionAndMetadata {
669 if (auto const maybeRow = res.template get<Blob, Blob, uint32_t, uint32_t>();
670 maybeRow)
671 return *maybeRow;
672
673 return {};
674 }
675 );
676 });
677
678 ASSERT(numHashes == results.size(), "Number of hashes and results must match");
679 LOG(log_.debug()) << "Fetched " << numHashes << " transactions from database in "
680 << timeDiff << " milliseconds";
681 return results;
682 }
683
684 std::vector<Blob>
686 std::vector<ripple::uint256> const& keys,
687 std::uint32_t const sequence,
688 boost::asio::yield_context yield
689 ) const override
690 {
691 if (keys.empty())
692 return {};
693
694 auto const numKeys = keys.size();
695 LOG(log_.trace()) << "Fetching " << numKeys << " objects";
696
697 std::vector<Blob> results;
698 results.reserve(numKeys);
699
700 std::vector<Statement> statements;
701 statements.reserve(numKeys);
702
703 // TODO: seems like a job for "key IN (list of keys)" instead?
704 std::transform(
705 std::cbegin(keys),
706 std::cend(keys),
707 std::back_inserter(statements),
708 [this, &sequence](auto const& key) { return schema_->selectObject.bind(key, sequence); }
709 );
710
711 auto const entries = executor_.readEach(yield, statements);
712 std::transform(
713 std::cbegin(entries),
714 std::cend(entries),
715 std::back_inserter(results),
716 [](auto const& res) -> Blob {
717 if (auto const maybeValue = res.template get<Blob>(); maybeValue)
718 return *maybeValue;
719
720 return {};
721 }
722 );
723
724 LOG(log_.trace()) << "Fetched " << numKeys << " objects";
725 return results;
726 }
727
728 std::vector<LedgerObject>
730 std::uint32_t const ledgerSequence,
731 boost::asio::yield_context yield
732 ) const override
733 {
734 auto const [keys, timeDiff] =
735 util::timed([this, &ledgerSequence, yield]() -> std::vector<ripple::uint256> {
736 auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
737 if (not res) {
738 LOG(log_.error()) << "Could not fetch ledger diff: " << res.error()
739 << "; ledger = " << ledgerSequence;
740 return {};
741 }
742
743 auto const& results = res.value();
744 if (not results) {
745 LOG(log_.error())
746 << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
747 return {};
748 }
749
750 std::vector<ripple::uint256> resultKeys;
751 for (auto [key] : extract<ripple::uint256>(results))
752 resultKeys.push_back(key);
753
754 return resultKeys;
755 });
756
757 // one of the above errors must have happened
758 if (keys.empty())
759 return {};
760
761 LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from database in "
762 << timeDiff << " milliseconds";
763
764 auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield);
765 std::vector<LedgerObject> results;
766 results.reserve(keys.size());
767
768 std::transform(
769 std::cbegin(keys),
770 std::cend(keys),
771 std::cbegin(objs),
772 std::back_inserter(results),
773 [](auto const& key, auto const& obj) { return LedgerObject{key, obj}; }
774 );
775
776 return results;
777 }
778
779 std::optional<std::string>
781 std::string const& migratorName,
782 boost::asio::yield_context yield
783 ) const override
784 {
785 auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName));
786 if (not res) {
787 LOG(log_.error()) << "Could not fetch migrator status: " << res.error();
788 return {};
789 }
790
791 auto const& results = res.value();
792 if (not results) {
793 return {};
794 }
795
796 for (auto [statusString] : extract<std::string>(results))
797 return statusString;
798
799 return {};
800 }
801
802 std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>
803 fetchClioNodesData(boost::asio::yield_context yield) const override
804 {
805 auto const readResult = executor_.read(yield, schema_->selectClioNodesData);
806 if (not readResult)
807 return std::unexpected{readResult.error().message()};
808
809 std::vector<std::pair<boost::uuids::uuid, std::string>> result;
810
811 for (auto [uuid, message] : extract<boost::uuids::uuid, std::string>(*readResult)) {
812 result.emplace_back(uuid, std::move(message));
813 }
814
815 return result;
816 }
817
818 void
819 doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
820 {
821 LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " ["
822 << blob.size() << " bytes]";
823
824 if (range_)
825 executor_.write(schema_->insertDiff, seq, key);
826
827 executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));
828 }
829
830 void
831 writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override
832 {
833 LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. "
834 << " seq = " << std::to_string(seq) << " successor = " << successor.size()
835 << " bytes.";
836 ASSERT(!key.empty(), "Key must not be empty");
837 ASSERT(!successor.empty(), "Successor must not be empty");
838
839 executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor));
840 }
841
842 void
843 writeAccountTransactions(std::vector<AccountTransactionsData> data) override
844 {
845 std::vector<Statement> statements;
846 statements.reserve(data.size() * 10); // assume 10 transactions avg
847
848 for (auto& record : data) {
849 std::ranges::transform(
850 record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
851 return schema_->insertAccountTx.bind(
852 std::forward<decltype(account)>(account),
853 std::make_tuple(record.ledgerSequence, record.transactionIndex),
854 record.txHash
855 );
856 }
857 );
858 }
859
860 executor_.write(std::move(statements));
861 }
862
863 void
865 {
866 std::vector<Statement> statements;
867 statements.reserve(record.accounts.size());
868
869 std::ranges::transform(
870 record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
871 return schema_->insertAccountTx.bind(
872 std::forward<decltype(account)>(account),
873 std::make_tuple(record.ledgerSequence, record.transactionIndex),
874 record.txHash
875 );
876 }
877 );
878
879 executor_.write(std::move(statements));
880 }
881
882 void
883 writeNFTTransactions(std::vector<NFTTransactionsData> const& data) override
884 {
885 std::vector<Statement> statements;
886 statements.reserve(data.size());
887
888 std::ranges::transform(data, std::back_inserter(statements), [this](auto const& record) {
889 return schema_->insertNFTTx.bind(
890 record.tokenID,
891 std::make_tuple(record.ledgerSequence, record.transactionIndex),
892 record.txHash
893 );
894 });
895
896 executor_.write(std::move(statements));
897 }
898
899 void
901 std::string&& hash,
902 std::uint32_t const seq,
903 std::uint32_t const date,
904 std::string&& transaction,
905 std::string&& metadata
906 ) override
907 {
908 LOG(log_.trace()) << "Writing txn to database";
909
910 executor_.write(schema_->insertLedgerTransaction, seq, hash);
911 executor_.write(
912 schema_->insertTransaction,
913 std::move(hash),
914 seq,
915 date,
916 std::move(transaction),
917 std::move(metadata)
918 );
919 }
920
921 void
922 writeNFTs(std::vector<NFTsData> const& data) override
923 {
924 std::vector<Statement> statements;
925 statements.reserve(data.size() * 3);
926
927 for (NFTsData const& record : data) {
928 if (!record.onlyUriChanged) {
929 statements.push_back(schema_->insertNFT.bind(
930 record.tokenID, record.ledgerSequence, record.owner, record.isBurned
931 ));
932
933 // If `uri` is set (and it can be set to an empty uri), we know this
934 // is a net-new NFT. That is, this NFT has not been seen before by
935 // us _OR_ it is in the extreme edge case of a re-minted NFT ID with
936 // the same NFT ID as an already-burned token. In this case, we need
937 // to record the URI and link to the issuer_nf_tokens table.
938 if (record.uri) {
939 statements.push_back(schema_->insertIssuerNFT.bind(
940 ripple::nft::getIssuer(record.tokenID),
941 static_cast<uint32_t>(ripple::nft::getTaxon(record.tokenID)),
942 record.tokenID
943 ));
944 statements.push_back(schema_->insertNFTURI.bind(
945 record.tokenID, record.ledgerSequence, record.uri.value()
946 ));
947 }
948 } else {
949 // only uri changed, we update the uri table only
950 statements.push_back(schema_->insertNFTURI.bind(
951 record.tokenID, record.ledgerSequence, record.uri.value()
952 ));
953 }
954 }
955
956 executor_.writeEach(std::move(statements));
957 }
958
959 void
960 writeMPTHolders(std::vector<MPTHolderData> const& data) override
961 {
962 std::vector<Statement> statements;
963 statements.reserve(data.size());
964 for (auto [mptId, holder] : data)
965 statements.push_back(schema_->insertMPTHolder.bind(mptId, holder));
966
967 executor_.write(std::move(statements));
968 }
969
970 void
971 startWrites() const override
972 {
973 // Note: no-op in original implementation too.
974 // probably was used in PG to start a transaction or smth.
975 }
976
977 void
978 writeMigratorStatus(std::string const& migratorName, std::string const& status) override
979 {
980 executor_.writeSync(
981 schema_->insertMigratorStatus,
982 data::cassandra::Text{migratorName},
984 );
985 }
986
987 void
988 writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override
989 {
990 executor_.writeSync(
991 schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid
992 );
993 }
994
995 bool
996 isTooBusy() const override
997 {
998 return executor_.isTooBusy();
999 }
1000
1001 boost::json::object
1002 stats() const override
1003 {
1004 return executor_.stats();
1005 }
1006
1007protected:
1014 bool
1015 executeSyncUpdate(Statement statement)
1016 {
1017 auto const res = executor_.writeSync(statement);
1018 auto maybeSuccess = res->template get<bool>();
1019 if (not maybeSuccess) {
1020 LOG(log_.error()) << "executeSyncUpdate - error getting result - no row";
1021 return false;
1022 }
1023
1024 if (not maybeSuccess.value()) {
1025 LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect";
1026
1027 // error may indicate that another writer wrote something.
1028 // in this case let's just compare the current state of things
1029 // against what we were trying to write in the first place and
1030 // use that as the source of truth for the result.
1031 auto rng = hardFetchLedgerRangeNoThrow();
1032 return rng && rng->maxSequence == ledgerSequence_;
1033 }
1034
1035 return true;
1036 }
1037};
1038
1039} // namespace data::cassandra
std::vector< Blob > fetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t sequence, boost::asio::yield_context yield) const
Fetches all ledger objects by their keys.
Definition BackendInterface.cpp:114
BackendInterface(LedgerCacheInterface &cache)
Construct a new backend interface instance.
Definition BackendInterface.hpp:158
std::optional< LedgerRange > hardFetchLedgerRangeNoThrow() const
Fetches the ledger range from DB retrying until no DatabaseTimeout is thrown.
Definition BackendInterface.cpp:72
std::optional< LedgerRange > fetchLedgerRange() const
Fetch the current ledger range.
Definition BackendInterface.cpp:268
LedgerCacheInterface const & cache() const
Definition BackendInterface.hpp:170
A simple cache holding one ripple::LedgerHeader to reduce DB lookups.
Definition LedgerHeaderCache.hpp:41
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:40
Implements BackendInterface for Cassandra/ScyllaDB/Keyspace.
Definition CassandraBackendFamily.hpp:83
void writeMigratorStatus(std::string const &migratorName, std::string const &status) override
Mark the migration status of a migrator as Migrated in the database.
Definition CassandraBackendFamily.hpp:978
std::optional< LedgerRange > hardFetchLedgerRange(boost::asio::yield_context yield) const override
Fetches the ledger range from DB.
Definition CassandraBackendFamily.hpp:311
void startWrites() const override
Starts a write transaction with the DB. No-op for cassandra.
Definition CassandraBackendFamily.hpp:971
void doWriteLedgerObject(std::string &&key, std::uint32_t const seq, std::string &&blob) override
Writes a ledger object to the database.
Definition CassandraBackendFamily.hpp:819
std::optional< TransactionAndMetadata > fetchTransaction(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific transaction.
Definition CassandraBackendFamily.hpp:598
TransactionsAndCursor fetchAccountTransactions(ripple::AccountID const &account, std::uint32_t const limit, bool forward, std::optional< TransactionsCursor > const &txnCursor, boost::asio::yield_context yield) const override
Fetches all transactions for a specific account.
Definition CassandraBackendFamily.hpp:155
MPTHoldersAndCursor fetchMPTHolders(ripple::uint192 const &mptID, std::uint32_t const limit, std::optional< ripple::AccountID > const &cursorIn, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all holders' balances for a MPTIssuanceID.
Definition CassandraBackendFamily.hpp:508
void writeNFTs(std::vector< NFTsData > const &data) override
Writes NFTs to the database.
Definition CassandraBackendFamily.hpp:922
std::optional< ripple::LedgerHeader > fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
Fetches a specific ledger by sequence number.
Definition CassandraBackendFamily.hpp:260
void writeNFTTransactions(std::vector< NFTTransactionsData > const &data) override
Write NFTs transactions.
Definition CassandraBackendFamily.hpp:883
void writeNodeMessage(boost::uuids::uuid const &uuid, std::string message) override
Write a node message. Used by ClusterCommunicationService.
Definition CassandraBackendFamily.hpp:988
std::optional< std::uint32_t > fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
Fetches the latest ledger sequence.
Definition CassandraBackendFamily.hpp:240
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Create a new cassandra/scylla backend instance.
Definition CassandraBackendFamily.hpp:107
std::optional< ripple::LedgerHeader > fetchLedgerByHash(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific ledger by hash.
Definition CassandraBackendFamily.hpp:291
std::optional< std::uint32_t > doFetchLedgerObjectSeq(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object sequence.
Definition CassandraBackendFamily.hpp:576
bool isTooBusy() const override
Definition CassandraBackendFamily.hpp:996
std::optional< NFT > fetchNFT(ripple::uint256 const &tokenID, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches a specific NFT.
Definition CassandraBackendFamily.hpp:396
void writeMPTHolders(std::vector< MPTHolderData > const &data) override
Write accounts that started holding onto a MPT.
Definition CassandraBackendFamily.hpp:960
void writeAccountTransaction(AccountTransactionsData record) override
Write a new account transaction.
Definition CassandraBackendFamily.hpp:864
void writeSuccessor(std::string &&key, std::uint32_t const seq, std::string &&successor) override
Write a new successor.
Definition CassandraBackendFamily.hpp:831
std::vector< ripple::uint256 > fetchAllTransactionHashesInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transaction hashes from a specific ledger.
Definition CassandraBackendFamily.hpp:359
void waitForWritesToFinish() override
Wait for all pending writes to finish.
Definition CassandraBackendFamily.hpp:224
std::vector< TransactionAndMetadata > fetchTransactions(std::vector< ripple::uint256 > const &hashes, boost::asio::yield_context yield) const override
Fetches multiple transactions.
Definition CassandraBackendFamily.hpp:639
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching ledger objects.
Definition CassandraBackendFamily.hpp:685
std::optional< ripple::uint256 > doFetchSuccessorKey(ripple::uint256 key, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Database-specific implementation of fetching the successor key.
Definition CassandraBackendFamily.hpp:616
boost::json::object stats() const override
Definition CassandraBackendFamily.hpp:1002
std::optional< std::string > fetchMigratorStatus(std::string const &migratorName, boost::asio::yield_context yield) const override
Fetches the status of migrator by name.
Definition CassandraBackendFamily.hpp:780
std::expected< std::vector< std::pair< boost::uuids::uuid, std::string > >, std::string > fetchClioNodesData(boost::asio::yield_context yield) const override
Fetches the data of all nodes in the cluster.
Definition CassandraBackendFamily.hpp:803
void writeAccountTransactions(std::vector< AccountTransactionsData > data) override
Write a new set of account transactions.
Definition CassandraBackendFamily.hpp:843
std::optional< Blob > doFetchLedgerObject(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object.
Definition CassandraBackendFamily.hpp:553
bool executeSyncUpdate(Statement statement)
Executes statements and tries to write to DB.
Definition CassandraBackendFamily.hpp:1015
void writeLedger(ripple::LedgerHeader const &ledgerHeader, std::string &&blob) override
Writes to a specific ledger.
Definition CassandraBackendFamily.hpp:230
TransactionsAndCursor fetchNFTTransactions(ripple::uint256 const &tokenID, std::uint32_t const limit, bool const forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific NFT.
Definition CassandraBackendFamily.hpp:436
void writeTransaction(std::string &&hash, std::uint32_t const seq, std::uint32_t const date, std::string &&transaction, std::string &&metadata) override
Writes a new transaction.
Definition CassandraBackendFamily.hpp:900
std::vector< LedgerObject > fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Returns the difference between ledgers.
Definition CassandraBackendFamily.hpp:729
std::vector< TransactionAndMetadata > fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transactions from a specific ledger.
Definition CassandraBackendFamily.hpp:349
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:46
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:95
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
The requirements of an execution strategy.
Definition Concepts.hpp:54
The requirements of a settings provider.
Definition Concepts.hpp:43
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:66
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:333
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:75
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:252
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:112
Struct to store ledger header cache entry and the sequence it belongs to.
Definition LedgerHeaderCache.hpp:48
Represents an object in the ledger.
Definition Types.hpp:41
Stores a range of sequences as a min and max pair.
Definition Types.hpp:262
Represents an array of MPTokens.
Definition Types.hpp:254
Represents a transaction and its metadata bundled together.
Definition Types.hpp:68
Represests a bundle of transactions with metadata and a cursor to the next page.
Definition Types.hpp:172
A strong type wrapper for int32_t.
Definition Types.hpp:57
A strong type wrapper for string.
Definition Types.hpp:68