Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CassandraBackend.hpp
1#pragma once
2
3#include "data/LedgerHeaderCache.hpp"
4#include "data/Types.hpp"
5#include "data/cassandra/CassandraBackendFamily.hpp"
6#include "data/cassandra/CassandraSchema.hpp"
7#include "data/cassandra/Concepts.hpp"
8#include "data/cassandra/Handle.hpp"
9#include "data/cassandra/SettingsProvider.hpp"
10#include "data/cassandra/Types.hpp"
11#include "data/cassandra/impl/ExecutionStrategy.hpp"
12#include "util/log/Logger.hpp"
13
14#include <boost/asio/spawn.hpp>
15#include <boost/json/object.hpp>
16#include <boost/uuid/string_generator.hpp>
17#include <boost/uuid/uuid.hpp>
18#include <cassandra.h>
19#include <fmt/format.h>
20#include <xrpl/basics/Blob.h>
21#include <xrpl/basics/base_uint.h>
22#include <xrpl/basics/strHex.h>
23#include <xrpl/protocol/AccountID.h>
24#include <xrpl/protocol/Indexes.h>
25#include <xrpl/protocol/LedgerHeader.h>
26#include <xrpl/protocol/nft.h>
27
28#include <algorithm>
29#include <cstddef>
30#include <cstdint>
31#include <iterator>
32#include <optional>
33#include <string>
34#include <tuple>
35#include <vector>
36
37namespace data::cassandra {
38
46template <
47 SomeSettingsProvider SettingsProviderType,
48 SomeExecutionStrategy ExecutionStrategyType,
49 typename FetchLedgerCacheType = FetchLedgerCache>
50class BasicCassandraBackend : public CassandraBackendFamily<
51 SettingsProviderType,
52 ExecutionStrategyType,
53 CassandraSchema<SettingsProviderType>,
54 FetchLedgerCacheType> {
55 using DefaultCassandraFamily = CassandraBackendFamily<
56 SettingsProviderType,
57 ExecutionStrategyType,
59 FetchLedgerCacheType>;
60
61 // protected because CassandraMigrationBackend inherits from this class
62protected:
63 using DefaultCassandraFamily::executor_;
64 using DefaultCassandraFamily::ledgerSequence_;
65 using DefaultCassandraFamily::log_;
66 using DefaultCassandraFamily::range_;
67 using DefaultCassandraFamily::schema_;
68
69public:
73 using DefaultCassandraFamily::DefaultCassandraFamily;
74
75 /*
76 * @brief Move constructor is deleted because handle_ is shared by reference with executor
77 */
78 BasicCassandraBackend(BasicCassandraBackend&&) = delete;
79
80 bool
81 doFinishWrites() override
82 {
84
85 if (!range_) {
86 executor_.writeSync(
87 schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_
88 );
89 }
90
91 if (not this->executeSyncUpdate(
92 schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1)
93 )) {
94 LOG(log_.warn()) << "Update failed for ledger " << ledgerSequence_;
95 return false;
96 }
97
98 LOG(log_.info()) << "Committed ledger " << ledgerSequence_;
99 return true;
100 }
101
104 ripple::AccountID const& issuer,
105 std::optional<std::uint32_t> const& taxon,
106 std::uint32_t const ledgerSequence,
107 std::uint32_t const limit,
108 std::optional<ripple::uint256> const& cursorIn,
109 boost::asio::yield_context yield
110 ) const override
111 {
112 NFTsAndCursor ret;
113
114 Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() {
115 if (taxon.has_value()) {
116 auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
117 r.bindAt(1, *taxon);
118 r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
119 r.bindAt(3, Limit{limit});
120 return r;
121 }
122
123 auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
124 r.bindAt(
125 1,
126 std::make_tuple(
127 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn))
128 : 0,
129 cursorIn.value_or(ripple::uint256(0))
130 )
131 );
132 r.bindAt(2, Limit{limit});
133 return r;
134 }();
135
136 // Query for all the NFTs issued by the account, potentially filtered by the taxon
137 auto const res = executor_.read(yield, idQueryStatement);
138
139 auto const& idQueryResults = res.value();
140 if (not idQueryResults.hasRows()) {
141 LOG(log_.debug()) << "No rows returned";
142 return {};
143 }
144
145 std::vector<ripple::uint256> nftIDs;
146 for (auto const [nftID] : extract<ripple::uint256>(idQueryResults))
147 nftIDs.push_back(nftID);
148
149 if (nftIDs.empty())
150 return ret;
151
152 if (nftIDs.size() == limit)
153 ret.cursor = nftIDs.back();
154
155 std::vector<Statement> selectNFTStatements;
156 selectNFTStatements.reserve(nftIDs.size());
157
158 std::transform(
159 std::cbegin(nftIDs),
160 std::cend(nftIDs),
161 std::back_inserter(selectNFTStatements),
162 [&](auto const& nftID) { return schema_->selectNFT.bind(nftID, ledgerSequence); }
163 );
164
165 auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
166
167 std::vector<Statement> selectNFTURIStatements;
168 selectNFTURIStatements.reserve(nftIDs.size());
169
170 std::transform(
171 std::cbegin(nftIDs),
172 std::cend(nftIDs),
173 std::back_inserter(selectNFTURIStatements),
174 [&](auto const& nftID) { return schema_->selectNFTURI.bind(nftID, ledgerSequence); }
175 );
176
177 auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
178
179 for (auto i = 0u; i < nftIDs.size(); i++) {
180 if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>();
181 maybeRow.has_value()) {
182 auto [seq, owner, isBurned] = *maybeRow;
183 NFT nft(nftIDs[i], seq, owner, isBurned);
184 if (auto const maybeUri = nftUris[i].template get<ripple::Blob>();
185 maybeUri.has_value())
186 nft.uri = *maybeUri;
187 ret.nfts.push_back(nft);
188 }
189 }
190 return ret;
191 }
192
193 std::vector<ripple::uint256>
195 std::uint32_t number,
196 std::uint32_t pageSize,
197 std::uint32_t seq,
198 boost::asio::yield_context yield
199 ) const override
200 {
201 std::vector<ripple::uint256> liveAccounts;
202 std::optional<ripple::AccountID> lastItem;
203
204 while (liveAccounts.size() < number) {
205 Statement const statement = lastItem
206 ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize})
207 : schema_->selectAccountFromBeginning.bind(Limit{pageSize});
208
209 auto const res = executor_.read(yield, statement);
210 if (res) {
211 auto const& results = res.value();
212 if (not results.hasRows()) {
213 LOG(log_.debug()) << "No rows returned";
214 break;
215 }
216 // The results should not contain duplicates, we just filter out deleted accounts
217 std::vector<ripple::uint256> fullAccounts;
218 for (auto [account] : extract<ripple::AccountID>(results)) {
219 fullAccounts.push_back(ripple::keylet::account(account).key);
220 lastItem = account;
221 }
222 auto const objs = this->doFetchLedgerObjects(fullAccounts, seq, yield);
223
224 for (auto i = 0u; i < fullAccounts.size(); i++) {
225 if (not objs[i].empty()) {
226 if (liveAccounts.size() < number) {
227 liveAccounts.push_back(fullAccounts[i]);
228 } else {
229 break;
230 }
231 }
232 }
233 } else {
234 LOG(log_.error()) << "Could not fetch account from account_tx: " << res.error();
235 break;
236 }
237 }
238
239 return liveAccounts;
240 }
241};
242
243using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;
244
245} // namespace data::cassandra
std::vector< ripple::uint256 > fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield) const override
Fetch the specified number of account root object indexes by page, the accounts need to exist for seq...
Definition CassandraBackend.hpp:194
NFTsAndCursor fetchNFTsByIssuer(ripple::AccountID const &issuer, std::optional< std::uint32_t > const &taxon, std::uint32_t const ledgerSequence, std::uint32_t const limit, std::optional< ripple::uint256 > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all NFTs issued by a given address.
Definition CassandraBackend.hpp:103
bool doFinishWrites() override
The implementation should wait for all pending writes to finish.
Definition CassandraBackend.hpp:81
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Definition CassandraBackendFamily.hpp:88
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
Definition CassandraBackendFamily.hpp:666
Manages the DB schema and provides access to prepared statements.
Definition CassandraSchema.hpp:22
The requirements of an execution strategy.
Definition Concepts.hpp:35
The requirements of a settings provider.
Definition Concepts.hpp:24
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:47
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:314
Represents a NFToken.
Definition Types.hpp:161
Represents a bundle of NFTs with a cursor to the next page.
Definition Types.hpp:227
A strong type wrapper for int32_t.
Definition Types.hpp:38