Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
KeyspaceBackend.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/LedgerHeaderCache.hpp"
23#include "data/Types.hpp"
24#include "data/cassandra/CassandraBackendFamily.hpp"
25#include "data/cassandra/Concepts.hpp"
26#include "data/cassandra/KeyspaceSchema.hpp"
27#include "data/cassandra/SettingsProvider.hpp"
28#include "data/cassandra/Types.hpp"
29#include "data/cassandra/impl/ExecutionStrategy.hpp"
30#include "util/Assert.hpp"
31#include "util/log/Logger.hpp"
32
33#include <boost/asio/spawn.hpp>
34#include <boost/json/object.hpp>
35#include <boost/uuid/string_generator.hpp>
36#include <boost/uuid/uuid.hpp>
37#include <cassandra.h>
38#include <fmt/format.h>
39#include <xrpl/basics/Blob.h>
40#include <xrpl/basics/base_uint.h>
41#include <xrpl/basics/strHex.h>
42#include <xrpl/protocol/AccountID.h>
43#include <xrpl/protocol/Indexes.h>
44#include <xrpl/protocol/LedgerHeader.h>
45#include <xrpl/protocol/nft.h>
46
47#include <cstddef>
48#include <cstdint>
49#include <iterator>
50#include <optional>
51#include <stdexcept>
52#include <utility>
53#include <vector>
54
55namespace data::cassandra {
56
64template <
65 SomeSettingsProvider SettingsProviderType,
66 SomeExecutionStrategy ExecutionStrategyType,
67 typename FetchLedgerCacheType = FetchLedgerCache>
69 SettingsProviderType,
70 ExecutionStrategyType,
71 KeyspaceSchema<SettingsProviderType>,
72 FetchLedgerCacheType> {
73 using DefaultCassandraFamily = CassandraBackendFamily<
74 SettingsProviderType,
75 ExecutionStrategyType,
77 FetchLedgerCacheType>;
78
79 using DefaultCassandraFamily::executor_;
80 using DefaultCassandraFamily::ledgerSequence_;
81 using DefaultCassandraFamily::log_;
82 using DefaultCassandraFamily::range_;
83 using DefaultCassandraFamily::schema_;
84
85public:
89 using DefaultCassandraFamily::DefaultCassandraFamily;
90
95
96 bool
97 doFinishWrites() override
98 {
100
101 // !range_.has_value() means the table 'ledger_range' is not populated;
102 // This would be the first write to the table.
103 // In this case, insert both min_sequence/max_sequence range into the table.
104 if (not range_.has_value()) {
105 executor_.writeSync(
106 schema_->insertLedgerRange, /* isLatestLedger =*/false, ledgerSequence_
107 );
108 executor_.writeSync(
109 schema_->insertLedgerRange, /* isLatestLedger =*/true, ledgerSequence_
110 );
111 }
112
113 if (not this->executeSyncUpdate(
114 schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1)
115 )) {
116 log_.warn() << "Update failed for ledger " << ledgerSequence_;
117 return false;
118 }
119
120 log_.info() << "Committed ledger " << ledgerSequence_;
121 return true;
122 }
123
126 ripple::AccountID const& issuer,
127 std::optional<std::uint32_t> const& taxon,
128 std::uint32_t const ledgerSequence,
129 std::uint32_t const limit,
130 std::optional<ripple::uint256> const& cursorIn,
131 boost::asio::yield_context yield
132 ) const override
133 {
134 std::vector<ripple::uint256> nftIDs;
135 if (taxon.has_value()) {
136 // Keyspace and ScyllaDB uses the same logic for taxon-filtered queries
137 nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
138 } else {
139 // Amazon Keyspaces Workflow for non-taxon queries
140 auto const startTaxon =
141 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
142 auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
143
144 Statement const firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
145 firstQuery.bindAt(1, startTaxon);
146 firstQuery.bindAt(2, startTokenID);
147 firstQuery.bindAt(3, Limit{limit});
148
149 auto const firstRes = executor_.read(yield, firstQuery);
150 if (firstRes.has_value()) {
151 for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
152 nftIDs.push_back(nftID);
153 }
154
155 if (nftIDs.size() < limit) {
156 auto const remainingLimit = limit - nftIDs.size();
157 Statement const secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
158 secondQuery.bindAt(1, startTaxon);
159 secondQuery.bindAt(2, Limit{remainingLimit});
160
161 auto const secondRes = executor_.read(yield, secondQuery);
162 if (secondRes.has_value()) {
163 for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
164 nftIDs.push_back(nftID);
165 }
166 }
167 }
168 return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
169 }
170
184 std::vector<ripple::uint256>
186 [[maybe_unused]] std::uint32_t number,
187 [[maybe_unused]] std::uint32_t pageSize,
188 [[maybe_unused]] std::uint32_t seq,
189 [[maybe_unused]] boost::asio::yield_context yield
190 ) const override
191 {
192 ASSERT(false, "Fetching account roots is not supported by the Keyspaces backend.");
193 std::unreachable();
194 }
195
196private:
197 std::vector<ripple::uint256>
198 fetchNFTIDsByTaxon(
199 ripple::AccountID const& issuer,
200 std::uint32_t const taxon,
201 std::uint32_t const limit,
202 std::optional<ripple::uint256> const& cursorIn,
203 boost::asio::yield_context yield
204 ) const
205 {
206 std::vector<ripple::uint256> nftIDs;
207 Statement const statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
208 statement.bindAt(1, taxon);
209 statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
210 statement.bindAt(3, Limit{limit});
211
212 auto const res = executor_.read(yield, statement);
213 if (res.has_value() && res->hasRows()) {
214 for (auto const [nftID] : extract<ripple::uint256>(*res))
215 nftIDs.push_back(nftID);
216 }
217 return nftIDs;
218 }
219
220 std::vector<ripple::uint256>
221 fetchNFTIDsWithoutTaxon(
222 ripple::AccountID const& issuer,
223 std::uint32_t const limit,
224 std::optional<ripple::uint256> const& cursorIn,
225 boost::asio::yield_context yield
226 ) const
227 {
228 std::vector<ripple::uint256> nftIDs;
229
230 auto const startTaxon =
231 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
232 auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
233
234 Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
235 firstQuery.bindAt(1, startTaxon);
236 firstQuery.bindAt(2, startTokenID);
237 firstQuery.bindAt(3, Limit{limit});
238
239 auto const firstRes = executor_.read(yield, firstQuery);
240 if (firstRes.has_value()) {
241 for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
242 nftIDs.push_back(nftID);
243 }
244
245 if (nftIDs.size() < limit) {
246 auto const remainingLimit = limit - nftIDs.size();
247 Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
248 secondQuery.bindAt(1, startTaxon);
249 secondQuery.bindAt(2, Limit{remainingLimit});
250
251 auto const secondRes = executor_.read(yield, secondQuery);
252 if (secondRes.has_value()) {
253 for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
254 nftIDs.push_back(nftID);
255 }
256 }
257 return nftIDs;
258 }
259
264 NFTsAndCursor
265 populateNFTsAndCreateCursor(
266 std::vector<ripple::uint256> const& nftIDs,
267 std::uint32_t const ledgerSequence,
268 std::uint32_t const limit,
269 boost::asio::yield_context yield
270 ) const
271 {
272 if (nftIDs.empty()) {
273 LOG(log_.debug()) << "No rows returned";
274 return {};
275 }
276
277 NFTsAndCursor ret;
278 if (nftIDs.size() == limit)
279 ret.cursor = nftIDs.back();
280
281 // Prepare and execute queries to fetch NFT info and URIs in parallel.
282 std::vector<Statement> selectNFTStatements;
283 selectNFTStatements.reserve(nftIDs.size());
284 std::transform(
285 std::cbegin(nftIDs),
286 std::cend(nftIDs),
287 std::back_inserter(selectNFTStatements),
288 [&](auto const& nftID) { return schema_->selectNFT.bind(nftID, ledgerSequence); }
289 );
290
291 std::vector<Statement> selectNFTURIStatements;
292 selectNFTURIStatements.reserve(nftIDs.size());
293 std::transform(
294 std::cbegin(nftIDs),
295 std::cend(nftIDs),
296 std::back_inserter(selectNFTURIStatements),
297 [&](auto const& nftID) { return schema_->selectNFTURI.bind(nftID, ledgerSequence); }
298 );
299
300 auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
301 auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
302
303 // Combine the results into final NFT objects.
304 for (auto i = 0u; i < nftIDs.size(); ++i) {
305 if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>();
306 maybeRow.has_value()) {
307 auto [seq, owner, isBurned] = *maybeRow;
308 NFT nft(nftIDs[i], seq, owner, isBurned);
309 if (auto const maybeUri = nftUris[i].template get<ripple::Blob>();
310 maybeUri.has_value())
311 nft.uri = *maybeUri;
312 ret.nfts.push_back(nft);
313 }
314 }
315 return ret;
316 }
317};
318
320
321} // namespace data::cassandra
Implements CassandraBackendFamily for Keyspace.
Definition KeyspaceBackend.hpp:72
std::vector< ripple::uint256 > fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield) const override
(Unsupported in Keyspaces) Fetches account root object indexes by page.
Definition KeyspaceBackend.hpp:185
BasicKeyspaceBackend(BasicKeyspaceBackend &&)=delete
Move constructor is deleted because handle_ is shared by reference with executor.
bool doFinishWrites() override
The implementation should wait for all pending writes to finish.
Definition KeyspaceBackend.hpp:97
NFTsAndCursor fetchNFTsByIssuer(ripple::AccountID const &issuer, std::optional< std::uint32_t > const &taxon, std::uint32_t const ledgerSequence, std::uint32_t const limit, std::optional< ripple::uint256 > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all NFTs issued by a given address.
Definition KeyspaceBackend.hpp:125
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Definition CassandraBackendFamily.hpp:107
Manages the DB schema and provides access to prepared statements.
Definition KeyspaceSchema.hpp:41
void bind(Args &&... args) const
Binds the given arguments to the statement.
Definition Statement.hpp:81
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:95
The requirements of an execution strategy.
Definition Concepts.hpp:54
The requirements of a settings provider.
Definition Concepts.hpp:43
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:66
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:333
Represents a bundle of NFTs with a cursor to the next page.
Definition Types.hpp:246
A strong type wrapper for int32_t.
Definition Types.hpp:57