Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
KeyspaceBackend.hpp
1#pragma once
2
3#include "data/LedgerHeaderCache.hpp"
4#include "data/Types.hpp"
5#include "data/cassandra/CassandraBackendFamily.hpp"
6#include "data/cassandra/Concepts.hpp"
7#include "data/cassandra/KeyspaceSchema.hpp"
8#include "data/cassandra/SettingsProvider.hpp"
9#include "data/cassandra/Types.hpp"
10#include "data/cassandra/impl/ExecutionStrategy.hpp"
11#include "util/Assert.hpp"
12#include "util/log/Logger.hpp"
13
14#include <boost/asio/spawn.hpp>
15#include <boost/json/object.hpp>
16#include <boost/uuid/string_generator.hpp>
17#include <boost/uuid/uuid.hpp>
18#include <cassandra.h>
19#include <fmt/format.h>
20#include <xrpl/basics/Blob.h>
21#include <xrpl/basics/base_uint.h>
22#include <xrpl/basics/strHex.h>
23#include <xrpl/protocol/AccountID.h>
24#include <xrpl/protocol/Indexes.h>
25#include <xrpl/protocol/LedgerHeader.h>
26#include <xrpl/protocol/nft.h>
27
28#include <cstddef>
29#include <cstdint>
30#include <iterator>
31#include <optional>
32#include <stdexcept>
33#include <utility>
34#include <vector>
35
36namespace data::cassandra {
37
45template <
46 SomeSettingsProvider SettingsProviderType,
47 SomeExecutionStrategy ExecutionStrategyType,
48 typename FetchLedgerCacheType = FetchLedgerCache>
50 SettingsProviderType,
51 ExecutionStrategyType,
52 KeyspaceSchema<SettingsProviderType>,
53 FetchLedgerCacheType> {
54 using DefaultCassandraFamily = CassandraBackendFamily<
55 SettingsProviderType,
56 ExecutionStrategyType,
58 FetchLedgerCacheType>;
59
60 using DefaultCassandraFamily::executor_;
61 using DefaultCassandraFamily::ledgerSequence_;
62 using DefaultCassandraFamily::log_;
63 using DefaultCassandraFamily::range_;
64 using DefaultCassandraFamily::schema_;
65
66public:
70 using DefaultCassandraFamily::DefaultCassandraFamily;
71
76
77 bool
78 doFinishWrites() override
79 {
81
82 // !range_.has_value() means the table 'ledger_range' is not populated;
83 // This would be the first write to the table.
84 // In this case, insert both min_sequence/max_sequence range into the table.
85 if (not range_.has_value()) {
86 executor_.writeSync(
87 schema_->insertLedgerRange, /* isLatestLedger =*/false, ledgerSequence_
88 );
89 executor_.writeSync(
90 schema_->insertLedgerRange, /* isLatestLedger =*/true, ledgerSequence_
91 );
92 }
93
94 if (not this->executeSyncUpdate(
95 schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1)
96 )) {
97 log_.warn() << "Update failed for ledger " << ledgerSequence_;
98 return false;
99 }
100
101 log_.info() << "Committed ledger " << ledgerSequence_;
102 return true;
103 }
104
107 ripple::AccountID const& issuer,
108 std::optional<std::uint32_t> const& taxon,
109 std::uint32_t const ledgerSequence,
110 std::uint32_t const limit,
111 std::optional<ripple::uint256> const& cursorIn,
112 boost::asio::yield_context yield
113 ) const override
114 {
115 std::vector<ripple::uint256> nftIDs;
116 if (taxon.has_value()) {
117 // Keyspace and ScyllaDB uses the same logic for taxon-filtered queries
118 nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
119 } else {
120 // Amazon Keyspaces Workflow for non-taxon queries
121 auto const startTaxon =
122 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
123 auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
124
125 Statement const firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
126 firstQuery.bindAt(1, startTaxon);
127 firstQuery.bindAt(2, startTokenID);
128 firstQuery.bindAt(3, Limit{limit});
129
130 auto const firstRes = executor_.read(yield, firstQuery);
131 if (firstRes.has_value()) {
132 for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
133 nftIDs.push_back(nftID);
134 }
135
136 if (nftIDs.size() < limit) {
137 auto const remainingLimit = limit - nftIDs.size();
138 Statement const secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
139 secondQuery.bindAt(1, startTaxon);
140 secondQuery.bindAt(2, Limit{remainingLimit});
141
142 auto const secondRes = executor_.read(yield, secondQuery);
143 if (secondRes.has_value()) {
144 for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
145 nftIDs.push_back(nftID);
146 }
147 }
148 }
149 return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
150 }
151
165 std::vector<ripple::uint256>
167 [[maybe_unused]] std::uint32_t number,
168 [[maybe_unused]] std::uint32_t pageSize,
169 [[maybe_unused]] std::uint32_t seq,
170 [[maybe_unused]] boost::asio::yield_context yield
171 ) const override
172 {
173 ASSERT(false, "Fetching account roots is not supported by the Keyspaces backend.");
174 std::unreachable();
175 }
176
177private:
178 std::vector<ripple::uint256>
179 fetchNFTIDsByTaxon(
180 ripple::AccountID const& issuer,
181 std::uint32_t const taxon,
182 std::uint32_t const limit,
183 std::optional<ripple::uint256> const& cursorIn,
184 boost::asio::yield_context yield
185 ) const
186 {
187 std::vector<ripple::uint256> nftIDs;
188 Statement const statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
189 statement.bindAt(1, taxon);
190 statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
191 statement.bindAt(3, Limit{limit});
192
193 auto const res = executor_.read(yield, statement);
194 if (res.has_value() && res->hasRows()) {
195 for (auto const [nftID] : extract<ripple::uint256>(*res))
196 nftIDs.push_back(nftID);
197 }
198 return nftIDs;
199 }
200
201 std::vector<ripple::uint256>
202 fetchNFTIDsWithoutTaxon(
203 ripple::AccountID const& issuer,
204 std::uint32_t const limit,
205 std::optional<ripple::uint256> const& cursorIn,
206 boost::asio::yield_context yield
207 ) const
208 {
209 std::vector<ripple::uint256> nftIDs;
210
211 auto const startTaxon =
212 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
213 auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
214
215 Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
216 firstQuery.bindAt(1, startTaxon);
217 firstQuery.bindAt(2, startTokenID);
218 firstQuery.bindAt(3, Limit{limit});
219
220 auto const firstRes = executor_.read(yield, firstQuery);
221 if (firstRes.has_value()) {
222 for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
223 nftIDs.push_back(nftID);
224 }
225
226 if (nftIDs.size() < limit) {
227 auto const remainingLimit = limit - nftIDs.size();
228 Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
229 secondQuery.bindAt(1, startTaxon);
230 secondQuery.bindAt(2, Limit{remainingLimit});
231
232 auto const secondRes = executor_.read(yield, secondQuery);
233 if (secondRes.has_value()) {
234 for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
235 nftIDs.push_back(nftID);
236 }
237 }
238 return nftIDs;
239 }
240
245 NFTsAndCursor
246 populateNFTsAndCreateCursor(
247 std::vector<ripple::uint256> const& nftIDs,
248 std::uint32_t const ledgerSequence,
249 std::uint32_t const limit,
250 boost::asio::yield_context yield
251 ) const
252 {
253 if (nftIDs.empty()) {
254 LOG(log_.debug()) << "No rows returned";
255 return {};
256 }
257
258 NFTsAndCursor ret;
259 if (nftIDs.size() == limit)
260 ret.cursor = nftIDs.back();
261
262 // Prepare and execute queries to fetch NFT info and URIs in parallel.
263 std::vector<Statement> selectNFTStatements;
264 selectNFTStatements.reserve(nftIDs.size());
265 std::transform(
266 std::cbegin(nftIDs),
267 std::cend(nftIDs),
268 std::back_inserter(selectNFTStatements),
269 [&](auto const& nftID) { return schema_->selectNFT.bind(nftID, ledgerSequence); }
270 );
271
272 std::vector<Statement> selectNFTURIStatements;
273 selectNFTURIStatements.reserve(nftIDs.size());
274 std::transform(
275 std::cbegin(nftIDs),
276 std::cend(nftIDs),
277 std::back_inserter(selectNFTURIStatements),
278 [&](auto const& nftID) { return schema_->selectNFTURI.bind(nftID, ledgerSequence); }
279 );
280
281 auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
282 auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
283
284 // Combine the results into final NFT objects.
285 for (auto i = 0u; i < nftIDs.size(); ++i) {
286 if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>();
287 maybeRow.has_value()) {
288 auto [seq, owner, isBurned] = *maybeRow;
289 NFT nft(nftIDs[i], seq, owner, isBurned);
290 if (auto const maybeUri = nftUris[i].template get<ripple::Blob>();
291 maybeUri.has_value())
292 nft.uri = *maybeUri;
293 ret.nfts.push_back(nft);
294 }
295 }
296 return ret;
297 }
298};
299
301
302} // namespace data::cassandra
Implements CassandraBackendFamily for Keyspace.
Definition KeyspaceBackend.hpp:53
std::vector< ripple::uint256 > fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield) const override
(Unsupported in Keyspaces) Fetches account root object indexes by page.
Definition KeyspaceBackend.hpp:166
BasicKeyspaceBackend(BasicKeyspaceBackend &&)=delete
Move constructor is deleted because handle_ is shared by reference with executor.
bool doFinishWrites() override
The implementation should wait for all pending writes to finish.
Definition KeyspaceBackend.hpp:78
NFTsAndCursor fetchNFTsByIssuer(ripple::AccountID const &issuer, std::optional< std::uint32_t > const &taxon, std::uint32_t const ledgerSequence, std::uint32_t const limit, std::optional< ripple::uint256 > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all NFTs issued by a given address.
Definition KeyspaceBackend.hpp:106
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Definition CassandraBackendFamily.hpp:88
Manages the DB schema and provides access to prepared statements.
Definition KeyspaceSchema.hpp:22
void bind(Args &&... args) const
Binds the given arguments to the statement.
Definition Statement.hpp:62
void bindAt(std::size_t const idx, Type &&value) const
Binds an argument to a specific index.
Definition Statement.hpp:76
The requirements of an execution strategy.
Definition Concepts.hpp:35
The requirements of a settings provider.
Definition Concepts.hpp:24
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:47
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:314
Represents a bundle of NFTs with a cursor to the next page.
Definition Types.hpp:227
A strong type wrapper for int32_t.
Definition Types.hpp:38