Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
KeyspaceBackend.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/LedgerHeaderCache.hpp"
23#include "data/Types.hpp"
24#include "data/cassandra/CassandraBackendFamily.hpp"
25#include "data/cassandra/Concepts.hpp"
26#include "data/cassandra/KeyspaceSchema.hpp"
27#include "data/cassandra/SettingsProvider.hpp"
28#include "data/cassandra/Types.hpp"
29#include "data/cassandra/impl/ExecutionStrategy.hpp"
30#include "util/Assert.hpp"
31#include "util/log/Logger.hpp"
32
33#include <boost/asio/spawn.hpp>
34#include <boost/json/object.hpp>
35#include <boost/uuid/string_generator.hpp>
36#include <boost/uuid/uuid.hpp>
37#include <cassandra.h>
38#include <fmt/format.h>
39#include <xrpl/basics/Blob.h>
40#include <xrpl/basics/base_uint.h>
41#include <xrpl/basics/strHex.h>
42#include <xrpl/protocol/AccountID.h>
43#include <xrpl/protocol/Indexes.h>
44#include <xrpl/protocol/LedgerHeader.h>
45#include <xrpl/protocol/nft.h>
46
47#include <cstddef>
48#include <cstdint>
49#include <iterator>
50#include <optional>
51#include <stdexcept>
52#include <utility>
53#include <vector>
54
55namespace data::cassandra {
56
64template <
65 SomeSettingsProvider SettingsProviderType,
66 SomeExecutionStrategy ExecutionStrategyType,
67 typename FetchLedgerCacheType = FetchLedgerCache>
69 SettingsProviderType,
70 ExecutionStrategyType,
71 KeyspaceSchema<SettingsProviderType>,
72 FetchLedgerCacheType> {
73 using DefaultCassandraFamily = CassandraBackendFamily<
74 SettingsProviderType,
75 ExecutionStrategyType,
77 FetchLedgerCacheType>;
78
79 using DefaultCassandraFamily::executor_;
80 using DefaultCassandraFamily::ledgerSequence_;
81 using DefaultCassandraFamily::log_;
82 using DefaultCassandraFamily::range_;
83 using DefaultCassandraFamily::schema_;
84
85public:
89 using DefaultCassandraFamily::DefaultCassandraFamily;
90
95
96 bool
97 doFinishWrites() override
98 {
100
101 // !range_.has_value() means the table 'ledger_range' is not populated;
102 // This would be the first write to the table.
103 // In this case, insert both min_sequence/max_sequence range into the table.
104 if (not range_.has_value()) {
105 executor_.writeSync(schema_->insertLedgerRange, /* isLatestLedger =*/false, ledgerSequence_);
106 executor_.writeSync(schema_->insertLedgerRange, /* isLatestLedger =*/true, ledgerSequence_);
107 }
108
109 if (not this->executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
110 log_.warn() << "Update failed for ledger " << ledgerSequence_;
111 return false;
112 }
113
114 log_.info() << "Committed ledger " << ledgerSequence_;
115 return true;
116 }
117
120 ripple::AccountID const& issuer,
121 std::optional<std::uint32_t> const& taxon,
122 std::uint32_t const ledgerSequence,
123 std::uint32_t const limit,
124 std::optional<ripple::uint256> const& cursorIn,
125 boost::asio::yield_context yield
126 ) const override
127 {
128 std::vector<ripple::uint256> nftIDs;
129 if (taxon.has_value()) {
130 // Keyspace and ScyllaDB uses the same logic for taxon-filtered queries
131 nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
132 } else {
133 // Amazon Keyspaces Workflow for non-taxon queries
134 auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
135 auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
136
137 Statement const firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
138 firstQuery.bindAt(1, startTaxon);
139 firstQuery.bindAt(2, startTokenID);
140 firstQuery.bindAt(3, Limit{limit});
141
142 auto const firstRes = executor_.read(yield, firstQuery);
143 if (firstRes.has_value()) {
144 for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
145 nftIDs.push_back(nftID);
146 }
147
148 if (nftIDs.size() < limit) {
149 auto const remainingLimit = limit - nftIDs.size();
150 Statement const secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
151 secondQuery.bindAt(1, startTaxon);
152 secondQuery.bindAt(2, Limit{remainingLimit});
153
154 auto const secondRes = executor_.read(yield, secondQuery);
155 if (secondRes.has_value()) {
156 for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
157 nftIDs.push_back(nftID);
158 }
159 }
160 }
161 return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
162 }
163
177 std::vector<ripple::uint256>
179 [[maybe_unused]] std::uint32_t number,
180 [[maybe_unused]] std::uint32_t pageSize,
181 [[maybe_unused]] std::uint32_t seq,
182 [[maybe_unused]] boost::asio::yield_context yield
183 ) const override
184 {
185 ASSERT(false, "Fetching account roots is not supported by the Keyspaces backend.");
186 std::unreachable();
187 }
188
189private:
190 std::vector<ripple::uint256>
191 fetchNFTIDsByTaxon(
192 ripple::AccountID const& issuer,
193 std::uint32_t const taxon,
194 std::uint32_t const limit,
195 std::optional<ripple::uint256> const& cursorIn,
196 boost::asio::yield_context yield
197 ) const
198 {
199 std::vector<ripple::uint256> nftIDs;
200 Statement const statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
201 statement.bindAt(1, taxon);
202 statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
203 statement.bindAt(3, Limit{limit});
204
205 auto const res = executor_.read(yield, statement);
206 if (res.has_value() && res->hasRows()) {
207 for (auto const [nftID] : extract<ripple::uint256>(*res))
208 nftIDs.push_back(nftID);
209 }
210 return nftIDs;
211 }
212
213 std::vector<ripple::uint256>
214 fetchNFTIDsWithoutTaxon(
215 ripple::AccountID const& issuer,
216 std::uint32_t const limit,
217 std::optional<ripple::uint256> const& cursorIn,
218 boost::asio::yield_context yield
219 ) const
220 {
221 std::vector<ripple::uint256> nftIDs;
222
223 auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
224 auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
225
226 Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
227 firstQuery.bindAt(1, startTaxon);
228 firstQuery.bindAt(2, startTokenID);
229 firstQuery.bindAt(3, Limit{limit});
230
231 auto const firstRes = executor_.read(yield, firstQuery);
232 if (firstRes.has_value()) {
233 for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
234 nftIDs.push_back(nftID);
235 }
236
237 if (nftIDs.size() < limit) {
238 auto const remainingLimit = limit - nftIDs.size();
239 Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
240 secondQuery.bindAt(1, startTaxon);
241 secondQuery.bindAt(2, Limit{remainingLimit});
242
243 auto const secondRes = executor_.read(yield, secondQuery);
244 if (secondRes.has_value()) {
245 for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
246 nftIDs.push_back(nftID);
247 }
248 }
249 return nftIDs;
250 }
251
255 NFTsAndCursor
256 populateNFTsAndCreateCursor(
257 std::vector<ripple::uint256> const& nftIDs,
258 std::uint32_t const ledgerSequence,
259 std::uint32_t const limit,
260 boost::asio::yield_context yield
261 ) const
262 {
263 if (nftIDs.empty()) {
264 LOG(log_.debug()) << "No rows returned";
265 return {};
266 }
267
268 NFTsAndCursor ret;
269 if (nftIDs.size() == limit)
270 ret.cursor = nftIDs.back();
271
272 // Prepare and execute queries to fetch NFT info and URIs in parallel.
273 std::vector<Statement> selectNFTStatements;
274 selectNFTStatements.reserve(nftIDs.size());
275 std::transform(
276 std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
277 return schema_->selectNFT.bind(nftID, ledgerSequence);
278 }
279 );
280
281 std::vector<Statement> selectNFTURIStatements;
282 selectNFTURIStatements.reserve(nftIDs.size());
283 std::transform(
284 std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
285 return schema_->selectNFTURI.bind(nftID, ledgerSequence);
286 }
287 );
288
289 auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
290 auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
291
292 // Combine the results into final NFT objects.
293 for (auto i = 0u; i < nftIDs.size(); ++i) {
294 if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>();
295 maybeRow.has_value()) {
296 auto [seq, owner, isBurned] = *maybeRow;
297 NFT nft(nftIDs[i], seq, owner, isBurned);
298 if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri.has_value())
299 nft.uri = *maybeUri;
300 ret.nfts.push_back(nft);
301 }
302 }
303 return ret;
304 }
305};
306
308
309} // namespace data::cassandra
Implements CassandraBackendFamily for Keyspace.
Definition KeyspaceBackend.hpp:72
std::vector< ripple::uint256 > fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield) const override
(Unsupported in Keyspaces) Fetches account root object indexes by page.
Definition KeyspaceBackend.hpp:178
BasicKeyspaceBackend(BasicKeyspaceBackend &&)=delete
Move constructor is deleted because handle_ is shared by reference with executor.
bool doFinishWrites() override
The implementation should wait for all pending writes to finish.
Definition KeyspaceBackend.hpp:97
NFTsAndCursor fetchNFTsByIssuer(ripple::AccountID const &issuer, std::optional< std::uint32_t > const &taxon, std::uint32_t const ledgerSequence, std::uint32_t const limit, std::optional< ripple::uint256 > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all NFTs issued by a given address.
Definition KeyspaceBackend.hpp:119
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Definition CassandraBackendFamily.hpp:107
Manages the DB schema and provides access to prepared statements.
Definition KeyspaceSchema.hpp:41
void bind(Args &&... args) const
Binds the given arguments to the statement.
Definition Statement.hpp:81
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:95
The requirements of an execution strategy.
Definition Concepts.hpp:54
The requirements of a settings provider.
Definition Concepts.hpp:43
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:66
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:329
Represents a bundle of NFTs with a cursor to the next page.
Definition Types.hpp:231
A strong type wrapper for int32_t.
Definition Types.hpp:57