Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CassandraBackendFamily.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "data/DBHelpers.hpp"
5#include "data/LedgerCacheInterface.hpp"
6#include "data/LedgerHeaderCache.hpp"
7#include "data/Types.hpp"
8#include "data/cassandra/Concepts.hpp"
9#include "data/cassandra/Handle.hpp"
10#include "data/cassandra/Types.hpp"
11#include "data/cassandra/impl/ExecutionStrategy.hpp"
12#include "util/Assert.hpp"
13#include "util/LedgerUtils.hpp"
14#include "util/Profiler.hpp"
15#include "util/log/Logger.hpp"
16
17#include <boost/asio/spawn.hpp>
18#include <boost/json/object.hpp>
19#include <boost/uuid/string_generator.hpp>
20#include <boost/uuid/uuid.hpp>
21#include <cassandra.h>
22#include <fmt/format.h>
23#include <xrpl/basics/Blob.h>
24#include <xrpl/basics/base_uint.h>
25#include <xrpl/basics/strHex.h>
26#include <xrpl/protocol/AccountID.h>
27#include <xrpl/protocol/Indexes.h>
28#include <xrpl/protocol/LedgerHeader.h>
29#include <xrpl/protocol/nft.h>
30
31#include <algorithm>
32#include <atomic>
33#include <chrono>
34#include <cstddef>
35#include <cstdint>
36#include <iterator>
37#include <limits>
38#include <optional>
39#include <stdexcept>
40#include <string>
41#include <tuple>
42#include <utility>
43#include <vector>
44
45class CacheBackendCassandraTest;
46
47namespace data::cassandra {
48
59template <
60 SomeSettingsProvider SettingsProviderType,
61 SomeExecutionStrategy ExecutionStrategyType,
62 typename SchemaType,
63 typename FetchLedgerCacheType = FetchLedgerCache>
65protected:
66 util::Logger log_{"Backend"};
67
68 SettingsProviderType settingsProvider_;
69 SchemaType schema_;
70 std::atomic_uint32_t ledgerSequence_ = 0u;
71 friend class ::CacheBackendCassandraTest;
72
73 Handle handle_;
74
75 // have to be mutable because BackendInterface constness :(
76 mutable ExecutionStrategyType executor_;
77 // TODO: move to interface level
78 mutable FetchLedgerCacheType ledgerCache_{};
79
80public:
89 SettingsProviderType settingsProvider,
91 bool readOnly
92 )
94 , settingsProvider_{std::move(settingsProvider)}
95 , schema_{settingsProvider_}
96 , handle_{settingsProvider_.getSettings()}
97 , executor_{settingsProvider_.getSettings(), handle_}
98 {
99 if (auto const res = handle_.connect(); not res.has_value())
100 throw std::runtime_error("Could not connect to database: " + res.error());
101
102 if (not readOnly) {
103 if (auto const res = handle_.execute(schema_.createKeyspace); not res.has_value()) {
104 // on datastax, creation of keyspaces can be configured to only be done thru the
105 // admin interface. this does not mean that the keyspace does not already exist tho.
106 if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
107 throw std::runtime_error("Could not create keyspace: " + res.error());
108 }
109
110 if (auto const res = handle_.executeEach(schema_.createSchema); not res.has_value())
111 throw std::runtime_error("Could not create schema: " + res.error());
112 }
113
114 try {
115 schema_.prepareStatements(handle_);
116 } catch (std::runtime_error const& ex) {
117 auto const error = fmt::format(
118 "Failed to prepare the statements: {}; readOnly: {}. ReadOnly should be turned off "
119 "or another Clio "
120 "node with write access to DB should be started first.",
121 ex.what(),
122 readOnly
123 );
124 LOG(log_.error()) << error;
125 throw std::runtime_error(error);
126 }
127 LOG(log_.info()) << "Created (revamped) CassandraBackend";
128 }
129
130 /*
131 * @brief Move constructor is deleted because handle_ is shared by reference with executor
132 */
134
137 ripple::AccountID const& account,
138 std::uint32_t const limit,
139 bool forward,
140 std::optional<TransactionsCursor> const& txnCursor,
141 boost::asio::yield_context yield
142 ) const override
143 {
144 auto rng = fetchLedgerRange();
145 if (!rng)
146 return {.txns = {}, .cursor = {}};
147
148 Statement const statement = [this, forward, &account]() {
149 if (forward)
150 return schema_->selectAccountTxForward.bind(account);
151
152 return schema_->selectAccountTx.bind(account);
153 }();
154
155 auto cursor = txnCursor;
156 if (cursor) {
157 statement.bindAt(1, cursor->asTuple());
158 LOG(log_.debug()) << "account = " << ripple::strHex(account)
159 << " tuple = " << cursor->ledgerSequence << cursor->transactionIndex;
160 } else {
161 auto const seq = forward ? rng->minSequence : rng->maxSequence;
162 auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
163
164 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
165 LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq
166 << " tuple = " << placeHolder;
167 }
168
169 // FIXME: Limit is a hack to support uint32_t properly for the time
170 // being. Should be removed later and schema updated to use proper
171 // types.
172 statement.bindAt(2, Limit{limit});
173 auto const res = executor_.read(yield, statement);
174 auto const& results = res.value();
175 if (not results.hasRows()) {
176 LOG(log_.debug()) << "No rows returned";
177 return {};
178 }
179
180 std::vector<ripple::uint256> hashes = {};
181 auto numRows = results.numRows();
182 LOG(log_.info()) << "num_rows = " << numRows;
183
184 for (auto [hash, data] :
185 extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
186 hashes.push_back(hash);
187 if (--numRows == 0) {
188 LOG(log_.debug()) << "Setting cursor";
189 cursor = data;
190 }
191 }
192
193 auto const txns = fetchTransactions(hashes, yield);
194 LOG(log_.debug()) << "Txns = " << txns.size();
195
196 if (txns.size() == limit) {
197 LOG(log_.debug()) << "Returning cursor";
198 return {txns, cursor};
199 }
200
201 return {txns, {}};
202 }
203
204 void
206 {
207 executor_.sync();
208 }
209
210 void
211 writeLedger(ripple::LedgerHeader const& ledgerHeader, std::string&& blob) override
212 {
213 executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob));
214
215 executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq);
216
217 ledgerSequence_ = ledgerHeader.seq;
218 }
219
220 std::optional<std::uint32_t>
221 fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
222 {
223 if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res.has_value()) {
224 if (auto const& rows = *res; rows) {
225 if (auto const maybeRow = rows.template get<uint32_t>(); maybeRow.has_value())
226 return maybeRow;
227
228 LOG(log_.error()) << "Could not fetch latest ledger - no rows";
229 return std::nullopt;
230 }
231
232 LOG(log_.error()) << "Could not fetch latest ledger - no result";
233 } else {
234 LOG(log_.error()) << "Could not fetch latest ledger: " << res.error();
235 }
236
237 return std::nullopt;
238 }
239
240 std::optional<ripple::LedgerHeader>
242 std::uint32_t const sequence,
243 boost::asio::yield_context yield
244 ) const override
245 {
246 if (auto const lock = ledgerCache_.get(); lock.has_value() && lock->seq == sequence)
247 return lock->ledger;
248
249 auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence);
250 if (res) {
251 if (auto const& result = res.value(); result) {
252 if (auto const maybeValue = result.template get<std::vector<unsigned char>>();
253 maybeValue) {
254 auto const header = util::deserializeHeader(ripple::makeSlice(*maybeValue));
255 ledgerCache_.put(FetchLedgerCache::CacheEntry{header, sequence});
256 return header;
257 }
258
259 LOG(log_.error()) << "Could not fetch ledger by sequence - no rows";
260 return std::nullopt;
261 }
262
263 LOG(log_.error()) << "Could not fetch ledger by sequence - no result";
264 } else {
265 LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error();
266 }
267
268 return std::nullopt;
269 }
270
271 std::optional<ripple::LedgerHeader>
272 fetchLedgerByHash(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
273 {
274 if (auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) {
275 if (auto const& result = res.value(); result) {
276 if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
277 return fetchLedgerBySequence(*maybeValue, yield);
278
279 LOG(log_.error()) << "Could not fetch ledger by hash - no rows";
280 return std::nullopt;
281 }
282
283 LOG(log_.error()) << "Could not fetch ledger by hash - no result";
284 } else {
285 LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error();
286 }
287
288 return std::nullopt;
289 }
290
291 std::optional<LedgerRange>
292 hardFetchLedgerRange(boost::asio::yield_context yield) const override
293 {
294 auto const res = executor_.read(yield, schema_->selectLedgerRange);
295 if (res) {
296 auto const& results = res.value();
297 if (not results.hasRows()) {
298 LOG(log_.debug()) << "Could not fetch ledger range - no rows";
299 return std::nullopt;
300 }
301
302 // TODO: this is probably a good place to use user type in
303 // cassandra instead of having two rows with bool flag. or maybe at
304 // least use tuple<int, int>?
305 LedgerRange range;
306 std::size_t idx = 0;
307 for (auto [seq] : extract<uint32_t>(results)) {
308 if (idx == 0) {
309 range.maxSequence = range.minSequence = seq;
310 } else if (idx == 1) {
311 range.maxSequence = seq;
312 }
313
314 ++idx;
315 }
316
317 if (range.minSequence > range.maxSequence)
318 std::swap(range.minSequence, range.maxSequence);
319
320 LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":"
321 << range.maxSequence;
322 return range;
323 }
324 LOG(log_.error()) << "Could not fetch ledger range: " << res.error();
325
326 return std::nullopt;
327 }
328
329 std::vector<TransactionAndMetadata>
331 std::uint32_t const ledgerSequence,
332 boost::asio::yield_context yield
333 ) const override
334 {
335 auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence, yield);
336 return fetchTransactions(hashes, yield);
337 }
338
339 std::vector<ripple::uint256>
341 std::uint32_t const ledgerSequence,
342 boost::asio::yield_context yield
343 ) const override
344 {
345 auto start = std::chrono::system_clock::now();
346 auto const res =
347 executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence);
348
349 if (not res) {
350 LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error();
351 return {};
352 }
353
354 auto const& result = res.value();
355 if (not result.hasRows()) {
356 LOG(log_.warn()) << "Could not fetch all transaction hashes - no rows; ledger = "
357 << std::to_string(ledgerSequence);
358 return {};
359 }
360
361 std::vector<ripple::uint256> hashes;
362 for (auto [hash] : extract<ripple::uint256>(result))
363 hashes.push_back(std::move(hash));
364
365 auto end = std::chrono::system_clock::now();
366 LOG(
367 log_.debug()
368 ) << "Fetched "
369 << hashes.size() << " transaction hashes from database in "
370 << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
371 << " milliseconds";
372
373 return hashes;
374 }
375
376 std::optional<NFT>
378 ripple::uint256 const& tokenID,
379 std::uint32_t const ledgerSequence,
380 boost::asio::yield_context yield
381 ) const override
382 {
383 auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence);
384 if (not res)
385 return std::nullopt;
386
387 if (auto const maybeRow = res->template get<uint32_t, ripple::AccountID, bool>();
388 maybeRow) {
389 auto [seq, owner, isBurned] = *maybeRow;
390 auto result = std::make_optional<NFT>(tokenID, seq, owner, isBurned);
391
392 // now fetch URI. Usually we will have the URI even for burned NFTs,
393 // but if the first ledger on this clio included NFTokenBurn
394 // transactions we will not have the URIs for any of those tokens.
395 // In any other case not having the URI indicates something went
396 // wrong with our data.
397 //
398 // TODO - in the future would be great for any handlers that use
399 // this could inject a warning in this case (the case of not having
400 // a URI because it was burned in the first ledger) to indicate that
401 // even though we are returning a blank URI, the NFT might have had
402 // one.
403 auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence);
404 if (uriRes) {
405 if (auto const maybeUri = uriRes->template get<ripple::Blob>(); maybeUri)
406 result->uri = *maybeUri;
407 }
408
409 return result;
410 }
411
412 LOG(log_.error()) << "Could not fetch NFT - no rows";
413 return std::nullopt;
414 }
415
418 ripple::uint256 const& tokenID,
419 std::uint32_t const limit,
420 bool const forward,
421 std::optional<TransactionsCursor> const& cursorIn,
422 boost::asio::yield_context yield
423 ) const override
424 {
425 auto rng = fetchLedgerRange();
426 if (!rng)
427 return {.txns = {}, .cursor = {}};
428
429 Statement const statement = [this, forward, &tokenID]() {
430 if (forward)
431 return schema_->selectNFTTxForward.bind(tokenID);
432
433 return schema_->selectNFTTx.bind(tokenID);
434 }();
435
436 auto cursor = cursorIn;
437 if (cursor) {
438 statement.bindAt(1, cursor->asTuple());
439 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID)
440 << " tuple = " << cursor->ledgerSequence << cursor->transactionIndex;
441 } else {
442 auto const seq = forward ? rng->minSequence : rng->maxSequence;
443 auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
444
445 statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
446 LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq
447 << " tuple = " << placeHolder;
448 }
449
450 statement.bindAt(2, Limit{limit});
451
452 auto const res = executor_.read(yield, statement);
453 auto const& results = res.value();
454 if (not results.hasRows()) {
455 LOG(log_.debug()) << "No rows returned";
456 return {};
457 }
458
459 std::vector<ripple::uint256> hashes = {};
460 auto numRows = results.numRows();
461 LOG(log_.info()) << "num_rows = " << numRows;
462
463 for (auto [hash, data] :
464 extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
465 hashes.push_back(hash);
466 if (--numRows == 0) {
467 LOG(log_.debug()) << "Setting cursor";
468 cursor = data;
469
470 // forward queries by ledger/tx sequence `>=`
471 // so we have to advance the index by one
472 if (forward)
473 ++cursor->transactionIndex;
474 }
475 }
476
477 auto const txns = fetchTransactions(hashes, yield);
478 LOG(log_.debug()) << "NFT Txns = " << txns.size();
479
480 if (txns.size() == limit) {
481 LOG(log_.debug()) << "Returning cursor";
482 return {txns, cursor};
483 }
484
485 return {txns, {}};
486 }
487
490 ripple::uint192 const& mptID,
491 std::uint32_t const limit,
492 std::optional<ripple::AccountID> const& cursorIn,
493 std::uint32_t const ledgerSequence,
494 boost::asio::yield_context yield
495 ) const override
496 {
497 auto const holderEntries = executor_.read(
498 yield,
499 schema_->selectMPTHolders,
500 mptID,
501 cursorIn.value_or(ripple::AccountID(0)),
502 Limit{limit}
503 );
504
505 auto const& holderResults = holderEntries.value();
506 if (not holderResults.hasRows()) {
507 LOG(log_.debug()) << "No rows returned";
508 return {};
509 }
510
511 std::vector<ripple::uint256> mptKeys;
512 std::optional<ripple::AccountID> cursor;
513 for (auto const [holder] : extract<ripple::AccountID>(holderResults)) {
514 mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key);
515 cursor = holder;
516 }
517
518 auto mptObjects = doFetchLedgerObjects(mptKeys, ledgerSequence, yield);
519
520 auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob const& mpt) {
521 return mpt.empty();
522 });
523
524 mptObjects.erase(it, mptObjects.end());
525
526 ASSERT(mptKeys.size() <= limit, "Number of keys can't exceed the limit");
527 if (mptKeys.size() == limit)
528 return {mptObjects, cursor};
529
530 return {mptObjects, {}};
531 }
532
533 std::optional<Blob>
535 ripple::uint256 const& key,
536 std::uint32_t const sequence,
537 boost::asio::yield_context yield
538 ) const override
539 {
540 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence
541 << ", key = " << ripple::to_string(key);
542 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
543 if (auto const result = res->template get<Blob>(); result) {
544 if (result->size())
545 return result;
546 } else {
547 LOG(log_.debug()) << "Could not fetch ledger object - no rows";
548 }
549 } else {
550 LOG(log_.error()) << "Could not fetch ledger object: " << res.error();
551 }
552
553 return std::nullopt;
554 }
555
556 std::optional<std::uint32_t>
558 ripple::uint256 const& key,
559 std::uint32_t const sequence,
560 boost::asio::yield_context yield
561 ) const override
562 {
563 LOG(log_.debug()) << "Fetching ledger object for seq " << sequence
564 << ", key = " << ripple::to_string(key);
565 if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
566 if (auto const result = res->template get<Blob, std::uint32_t>(); result) {
567 auto [_, seq] = result.value();
568 return seq;
569 }
570 LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows";
571 } else {
572 LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error();
573 }
574
575 return std::nullopt;
576 }
577
578 std::optional<TransactionAndMetadata>
579 fetchTransaction(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
580 {
581 if (auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) {
582 if (auto const maybeValue = res->template get<Blob, Blob, uint32_t, uint32_t>();
583 maybeValue) {
584 auto [transaction, meta, seq, date] = *maybeValue;
585 return std::make_optional<TransactionAndMetadata>(transaction, meta, seq, date);
586 }
587
588 LOG(log_.debug()) << "Could not fetch transaction - no rows";
589 } else {
590 LOG(log_.error()) << "Could not fetch transaction: " << res.error();
591 }
592
593 return std::nullopt;
594 }
595
596 std::optional<ripple::uint256>
598 ripple::uint256 key,
599 std::uint32_t const ledgerSequence,
600 boost::asio::yield_context yield
601 ) const override
602 {
603 if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence);
604 res) {
605 if (auto const result = res->template get<ripple::uint256>(); result) {
606 if (*result == kLAST_KEY)
607 return std::nullopt;
608 return result;
609 }
610
611 LOG(log_.debug()) << "Could not fetch successor - no rows";
612 } else {
613 LOG(log_.error()) << "Could not fetch successor: " << res.error();
614 }
615
616 return std::nullopt;
617 }
618
619 std::vector<TransactionAndMetadata>
621 std::vector<ripple::uint256> const& hashes,
622 boost::asio::yield_context yield
623 ) const override
624 {
625 if (hashes.empty())
626 return {};
627
628 auto const numHashes = hashes.size();
629 std::vector<TransactionAndMetadata> results;
630 results.reserve(numHashes);
631
632 std::vector<Statement> statements;
633 statements.reserve(numHashes);
634
635 auto const timeDiff = util::timed([this, yield, &results, &hashes, &statements]() {
636 // TODO: seems like a job for "hash IN (list of hashes)" instead?
637 std::transform(
638 std::cbegin(hashes),
639 std::cend(hashes),
640 std::back_inserter(statements),
641 [this](auto const& hash) { return schema_->selectTransaction.bind(hash); }
642 );
643
644 auto const entries = executor_.readEach(yield, statements);
645 std::transform(
646 std::cbegin(entries),
647 std::cend(entries),
648 std::back_inserter(results),
649 [](auto const& res) -> TransactionAndMetadata {
650 if (auto const maybeRow = res.template get<Blob, Blob, uint32_t, uint32_t>();
651 maybeRow)
652 return *maybeRow;
653
654 return {};
655 }
656 );
657 });
658
659 ASSERT(numHashes == results.size(), "Number of hashes and results must match");
660 LOG(log_.debug()) << "Fetched " << numHashes << " transactions from database in "
661 << timeDiff << " milliseconds";
662 return results;
663 }
664
665 std::vector<Blob>
667 std::vector<ripple::uint256> const& keys,
668 std::uint32_t const sequence,
669 boost::asio::yield_context yield
670 ) const override
671 {
672 if (keys.empty())
673 return {};
674
675 auto const numKeys = keys.size();
676 LOG(log_.trace()) << "Fetching " << numKeys << " objects";
677
678 std::vector<Blob> results;
679 results.reserve(numKeys);
680
681 std::vector<Statement> statements;
682 statements.reserve(numKeys);
683
684 // TODO: seems like a job for "key IN (list of keys)" instead?
685 std::transform(
686 std::cbegin(keys),
687 std::cend(keys),
688 std::back_inserter(statements),
689 [this, &sequence](auto const& key) { return schema_->selectObject.bind(key, sequence); }
690 );
691
692 auto const entries = executor_.readEach(yield, statements);
693 std::transform(
694 std::cbegin(entries),
695 std::cend(entries),
696 std::back_inserter(results),
697 [](auto const& res) -> Blob {
698 if (auto const maybeValue = res.template get<Blob>(); maybeValue)
699 return *maybeValue;
700
701 return {};
702 }
703 );
704
705 LOG(log_.trace()) << "Fetched " << numKeys << " objects";
706 return results;
707 }
708
709 std::vector<LedgerObject>
711 std::uint32_t const ledgerSequence,
712 boost::asio::yield_context yield
713 ) const override
714 {
715 auto const [keys, timeDiff] =
716 util::timed([this, &ledgerSequence, yield]() -> std::vector<ripple::uint256> {
717 auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
718 if (not res) {
719 LOG(log_.error()) << "Could not fetch ledger diff: " << res.error()
720 << "; ledger = " << ledgerSequence;
721 return {};
722 }
723
724 auto const& results = res.value();
725 if (not results) {
726 LOG(log_.error())
727 << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
728 return {};
729 }
730
731 std::vector<ripple::uint256> resultKeys;
732 for (auto [key] : extract<ripple::uint256>(results))
733 resultKeys.push_back(key);
734
735 return resultKeys;
736 });
737
738 // one of the above errors must have happened
739 if (keys.empty())
740 return {};
741
742 LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from database in "
743 << timeDiff << " milliseconds";
744
745 auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield);
746 std::vector<LedgerObject> results;
747 results.reserve(keys.size());
748
749 std::transform(
750 std::cbegin(keys),
751 std::cend(keys),
752 std::cbegin(objs),
753 std::back_inserter(results),
754 [](auto const& key, auto const& obj) { return LedgerObject{key, obj}; }
755 );
756
757 return results;
758 }
759
760 std::optional<std::string>
762 std::string const& migratorName,
763 boost::asio::yield_context yield
764 ) const override
765 {
766 auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName));
767 if (not res) {
768 LOG(log_.error()) << "Could not fetch migrator status: " << res.error();
769 return {};
770 }
771
772 auto const& results = res.value();
773 if (not results) {
774 return {};
775 }
776
777 for (auto [statusString] : extract<std::string>(results))
778 return statusString;
779
780 return {};
781 }
782
783 std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>
784 fetchClioNodesData(boost::asio::yield_context yield) const override
785 {
786 auto const readResult = executor_.read(yield, schema_->selectClioNodesData);
787 if (not readResult)
788 return std::unexpected{readResult.error().message()};
789
790 std::vector<std::pair<boost::uuids::uuid, std::string>> result;
791
792 for (auto [uuid, message] : extract<boost::uuids::uuid, std::string>(*readResult)) {
793 result.emplace_back(uuid, std::move(message));
794 }
795
796 return result;
797 }
798
799 void
800 doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
801 {
802 LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " ["
803 << blob.size() << " bytes]";
804
805 if (range_)
806 executor_.write(schema_->insertDiff, seq, key);
807
808 executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));
809 }
810
811 void
812 writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override
813 {
814 LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. "
815 << " seq = " << std::to_string(seq) << " successor = " << successor.size()
816 << " bytes.";
817 ASSERT(!key.empty(), "Key must not be empty");
818 ASSERT(!successor.empty(), "Successor must not be empty");
819
820 executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor));
821 }
822
823 void
824 writeAccountTransactions(std::vector<AccountTransactionsData> data) override
825 {
826 std::vector<Statement> statements;
827 statements.reserve(data.size() * 10); // assume 10 transactions avg
828
829 for (auto& record : data) {
830 std::ranges::transform(
831 record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
832 return schema_->insertAccountTx.bind(
833 std::forward<decltype(account)>(account),
834 std::make_tuple(record.ledgerSequence, record.transactionIndex),
835 record.txHash
836 );
837 }
838 );
839 }
840
841 executor_.write(std::move(statements));
842 }
843
844 void
846 {
847 std::vector<Statement> statements;
848 statements.reserve(record.accounts.size());
849
850 std::ranges::transform(
851 record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
852 return schema_->insertAccountTx.bind(
853 std::forward<decltype(account)>(account),
854 std::make_tuple(record.ledgerSequence, record.transactionIndex),
855 record.txHash
856 );
857 }
858 );
859
860 executor_.write(std::move(statements));
861 }
862
863 void
864 writeNFTTransactions(std::vector<NFTTransactionsData> const& data) override
865 {
866 std::vector<Statement> statements;
867 statements.reserve(data.size());
868
869 std::ranges::transform(data, std::back_inserter(statements), [this](auto const& record) {
870 return schema_->insertNFTTx.bind(
871 record.tokenID,
872 std::make_tuple(record.ledgerSequence, record.transactionIndex),
873 record.txHash
874 );
875 });
876
877 executor_.write(std::move(statements));
878 }
879
880 void
882 std::string&& hash,
883 std::uint32_t const seq,
884 std::uint32_t const date,
885 std::string&& transaction,
886 std::string&& metadata
887 ) override
888 {
889 LOG(log_.trace()) << "Writing txn to database";
890
891 executor_.write(schema_->insertLedgerTransaction, seq, hash);
892 executor_.write(
893 schema_->insertTransaction,
894 std::move(hash),
895 seq,
896 date,
897 std::move(transaction),
898 std::move(metadata)
899 );
900 }
901
902 void
903 writeNFTs(std::vector<NFTsData> const& data) override
904 {
905 std::vector<Statement> statements;
906 statements.reserve(data.size() * 3);
907
908 for (NFTsData const& record : data) {
909 if (!record.onlyUriChanged) {
910 statements.push_back(schema_->insertNFT.bind(
911 record.tokenID, record.ledgerSequence, record.owner, record.isBurned
912 ));
913
914 // If `uri` is set (and it can be set to an empty uri), we know this
915 // is a net-new NFT. That is, this NFT has not been seen before by
916 // us _OR_ it is in the extreme edge case of a re-minted NFT ID with
917 // the same NFT ID as an already-burned token. In this case, we need
918 // to record the URI and link to the issuer_nf_tokens table.
919 if (record.uri) {
920 statements.push_back(schema_->insertIssuerNFT.bind(
921 ripple::nft::getIssuer(record.tokenID),
922 static_cast<uint32_t>(ripple::nft::getTaxon(record.tokenID)),
923 record.tokenID
924 ));
925 statements.push_back(schema_->insertNFTURI.bind(
926 record.tokenID, record.ledgerSequence, record.uri.value()
927 ));
928 }
929 } else {
930 // only uri changed, we update the uri table only
931 statements.push_back(schema_->insertNFTURI.bind(
932 record.tokenID, record.ledgerSequence, record.uri.value()
933 ));
934 }
935 }
936
937 executor_.writeEach(std::move(statements));
938 }
939
940 void
941 writeMPTHolders(std::vector<MPTHolderData> const& data) override
942 {
943 std::vector<Statement> statements;
944 statements.reserve(data.size());
945 for (auto [mptId, holder] : data)
946 statements.push_back(schema_->insertMPTHolder.bind(mptId, holder));
947
948 executor_.write(std::move(statements));
949 }
950
951 void
952 startWrites() const override
953 {
954 // Note: no-op in original implementation too.
955 // probably was used in PG to start a transaction or smth.
956 }
957
958 void
959 writeMigratorStatus(std::string const& migratorName, std::string const& status) override
960 {
961 executor_.writeSync(
962 schema_->insertMigratorStatus,
963 data::cassandra::Text{migratorName},
965 );
966 }
967
968 void
969 writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override
970 {
971 executor_.writeSync(
972 schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid
973 );
974 }
975
976 bool
977 isTooBusy() const override
978 {
979 return executor_.isTooBusy();
980 }
981
982 boost::json::object
983 stats() const override
984 {
985 return executor_.stats();
986 }
987
988protected:
995 bool
996 executeSyncUpdate(Statement statement)
997 {
998 auto const res = executor_.writeSync(statement);
999 auto maybeSuccess = res->template get<bool>();
1000 if (not maybeSuccess) {
1001 LOG(log_.error()) << "executeSyncUpdate - error getting result - no row";
1002 return false;
1003 }
1004
1005 if (not maybeSuccess.value()) {
1006 LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect";
1007
1008 // error may indicate that another writer wrote something.
1009 // in this case let's just compare the current state of things
1010 // against what we were trying to write in the first place and
1011 // use that as the source of truth for the result.
1012 auto rng = hardFetchLedgerRangeNoThrow();
1013 return rng && rng->maxSequence == ledgerSequence_;
1014 }
1015
1016 return true;
1017 }
1018};
1019
1020} // namespace data::cassandra
std::vector< Blob > fetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t sequence, boost::asio::yield_context yield) const
Fetches all ledger objects by their keys.
Definition BackendInterface.cpp:95
BackendInterface(LedgerCacheInterface &cache)
Construct a new backend interface instance.
Definition BackendInterface.hpp:139
std::optional< LedgerRange > hardFetchLedgerRangeNoThrow() const
Fetches the ledger range from DB retrying until no DatabaseTimeout is thrown.
Definition BackendInterface.cpp:53
std::optional< LedgerRange > fetchLedgerRange() const
Fetch the current ledger range.
Definition BackendInterface.cpp:249
LedgerCacheInterface const & cache() const
Definition BackendInterface.hpp:151
A simple cache holding one ripple::LedgerHeader to reduce DB lookups.
Definition LedgerHeaderCache.hpp:22
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:21
Implements BackendInterface for Cassandra/ScyllaDB/Keyspace.
Definition CassandraBackendFamily.hpp:64
void writeMigratorStatus(std::string const &migratorName, std::string const &status) override
Mark the migration status of a migrator as Migrated in the database.
Definition CassandraBackendFamily.hpp:959
std::optional< LedgerRange > hardFetchLedgerRange(boost::asio::yield_context yield) const override
Fetches the ledger range from DB.
Definition CassandraBackendFamily.hpp:292
void startWrites() const override
Starts a write transaction with the DB. No-op for cassandra.
Definition CassandraBackendFamily.hpp:952
void doWriteLedgerObject(std::string &&key, std::uint32_t const seq, std::string &&blob) override
Writes a ledger object to the database.
Definition CassandraBackendFamily.hpp:800
std::optional< TransactionAndMetadata > fetchTransaction(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific transaction.
Definition CassandraBackendFamily.hpp:579
TransactionsAndCursor fetchAccountTransactions(ripple::AccountID const &account, std::uint32_t const limit, bool forward, std::optional< TransactionsCursor > const &txnCursor, boost::asio::yield_context yield) const override
Fetches all transactions for a specific account.
Definition CassandraBackendFamily.hpp:136
MPTHoldersAndCursor fetchMPTHolders(ripple::uint192 const &mptID, std::uint32_t const limit, std::optional< ripple::AccountID > const &cursorIn, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all holders' balances for a MPTIssuanceID.
Definition CassandraBackendFamily.hpp:489
void writeNFTs(std::vector< NFTsData > const &data) override
Writes NFTs to the database.
Definition CassandraBackendFamily.hpp:903
std::optional< ripple::LedgerHeader > fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
Fetches a specific ledger by sequence number.
Definition CassandraBackendFamily.hpp:241
void writeNFTTransactions(std::vector< NFTTransactionsData > const &data) override
Write NFTs transactions.
Definition CassandraBackendFamily.hpp:864
void writeNodeMessage(boost::uuids::uuid const &uuid, std::string message) override
Write a node message. Used by ClusterCommunicationService.
Definition CassandraBackendFamily.hpp:969
std::optional< std::uint32_t > fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
Fetches the latest ledger sequence.
Definition CassandraBackendFamily.hpp:221
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Create a new cassandra/scylla backend instance.
Definition CassandraBackendFamily.hpp:88
std::optional< ripple::LedgerHeader > fetchLedgerByHash(ripple::uint256 const &hash, boost::asio::yield_context yield) const override
Fetches a specific ledger by hash.
Definition CassandraBackendFamily.hpp:272
std::optional< std::uint32_t > doFetchLedgerObjectSeq(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object sequence.
Definition CassandraBackendFamily.hpp:557
bool isTooBusy() const override
Definition CassandraBackendFamily.hpp:977
std::optional< NFT > fetchNFT(ripple::uint256 const &tokenID, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches a specific NFT.
Definition CassandraBackendFamily.hpp:377
void writeMPTHolders(std::vector< MPTHolderData > const &data) override
Write accounts that started holding onto a MPT.
Definition CassandraBackendFamily.hpp:941
void writeAccountTransaction(AccountTransactionsData record) override
Write a new account transaction.
Definition CassandraBackendFamily.hpp:845
void writeSuccessor(std::string &&key, std::uint32_t const seq, std::string &&successor) override
Write a new successor.
Definition CassandraBackendFamily.hpp:812
std::vector< ripple::uint256 > fetchAllTransactionHashesInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transaction hashes from a specific ledger.
Definition CassandraBackendFamily.hpp:340
void waitForWritesToFinish() override
Wait for all pending writes to finish.
Definition CassandraBackendFamily.hpp:205
std::vector< TransactionAndMetadata > fetchTransactions(std::vector< ripple::uint256 > const &hashes, boost::asio::yield_context yield) const override
Fetches multiple transactions.
Definition CassandraBackendFamily.hpp:620
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching ledger objects.
Definition CassandraBackendFamily.hpp:666
std::optional< ripple::uint256 > doFetchSuccessorKey(ripple::uint256 key, std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Database-specific implementation of fetching the successor key.
Definition CassandraBackendFamily.hpp:597
boost::json::object stats() const override
Definition CassandraBackendFamily.hpp:983
std::optional< std::string > fetchMigratorStatus(std::string const &migratorName, boost::asio::yield_context yield) const override
Fetches the status of migrator by name.
Definition CassandraBackendFamily.hpp:761
std::expected< std::vector< std::pair< boost::uuids::uuid, std::string > >, std::string > fetchClioNodesData(boost::asio::yield_context yield) const override
Fetches the data of all nodes in the cluster.
Definition CassandraBackendFamily.hpp:784
void writeAccountTransactions(std::vector< AccountTransactionsData > data) override
Write a new set of account transactions.
Definition CassandraBackendFamily.hpp:824
std::optional< Blob > doFetchLedgerObject(ripple::uint256 const &key, std::uint32_t const sequence, boost::asio::yield_context yield) const override
The database-specific implementation for fetching a ledger object.
Definition CassandraBackendFamily.hpp:534
bool executeSyncUpdate(Statement statement)
Executes statements and tries to write to DB.
Definition CassandraBackendFamily.hpp:996
void writeLedger(ripple::LedgerHeader const &ledgerHeader, std::string &&blob) override
Writes to a specific ledger.
Definition CassandraBackendFamily.hpp:211
TransactionsAndCursor fetchNFTTransactions(ripple::uint256 const &tokenID, std::uint32_t const limit, bool const forward, std::optional< TransactionsCursor > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all transactions for a specific NFT.
Definition CassandraBackendFamily.hpp:417
void writeTransaction(std::string &&hash, std::uint32_t const seq, std::uint32_t const date, std::string &&transaction, std::string &&metadata) override
Writes a new transaction.
Definition CassandraBackendFamily.hpp:881
std::vector< LedgerObject > fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Returns the difference between ledgers.
Definition CassandraBackendFamily.hpp:710
std::vector< TransactionAndMetadata > fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
Fetches all transactions from a specific ledger.
Definition CassandraBackendFamily.hpp:330
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:27
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:76
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
The requirements of an execution strategy.
Definition Concepts.hpp:35
The requirements of a settings provider.
Definition Concepts.hpp:24
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:47
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:314
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:56
ripple::LedgerHeader deserializeHeader(ripple::Slice data)
Deserializes a ripple::LedgerHeader from ripple::Slice of data.
Definition LedgerUtils.hpp:233
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:21
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:26
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:93
Struct to store ledger header cache entry and the sequence it belongs to.
Definition LedgerHeaderCache.hpp:29
Represents an object in the ledger.
Definition Types.hpp:22
Stores a range of sequences as a min and max pair.
Definition Types.hpp:243
Represents an array of MPTokens.
Definition Types.hpp:235
Represents a transaction and its metadata bundled together.
Definition Types.hpp:49
Represests a bundle of transactions with metadata and a cursor to the next page.
Definition Types.hpp:153
A strong type wrapper for int32_t.
Definition Types.hpp:38
A strong type wrapper for string.
Definition Types.hpp:49