Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CassandraBackend.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/LedgerHeaderCache.hpp"
23#include "data/Types.hpp"
24#include "data/cassandra/CassandraBackendFamily.hpp"
25#include "data/cassandra/CassandraSchema.hpp"
26#include "data/cassandra/Concepts.hpp"
27#include "data/cassandra/Handle.hpp"
28#include "data/cassandra/SettingsProvider.hpp"
29#include "data/cassandra/Types.hpp"
30#include "data/cassandra/impl/ExecutionStrategy.hpp"
31#include "util/log/Logger.hpp"
32
33#include <boost/asio/spawn.hpp>
34#include <boost/json/object.hpp>
35#include <boost/uuid/string_generator.hpp>
36#include <boost/uuid/uuid.hpp>
37#include <cassandra.h>
38#include <fmt/format.h>
39#include <xrpl/basics/Blob.h>
40#include <xrpl/basics/base_uint.h>
41#include <xrpl/basics/strHex.h>
42#include <xrpl/protocol/AccountID.h>
43#include <xrpl/protocol/Indexes.h>
44#include <xrpl/protocol/LedgerHeader.h>
45#include <xrpl/protocol/nft.h>
46
47#include <algorithm>
48#include <cstddef>
49#include <cstdint>
50#include <iterator>
51#include <optional>
52#include <string>
53#include <tuple>
54#include <vector>
55
56namespace data::cassandra {
57
65template <
66 SomeSettingsProvider SettingsProviderType,
67 SomeExecutionStrategy ExecutionStrategyType,
68 typename FetchLedgerCacheType = FetchLedgerCache>
69class BasicCassandraBackend : public CassandraBackendFamily<
70 SettingsProviderType,
71 ExecutionStrategyType,
72 CassandraSchema<SettingsProviderType>,
73 FetchLedgerCacheType> {
74 using DefaultCassandraFamily = CassandraBackendFamily<
75 SettingsProviderType,
76 ExecutionStrategyType,
78 FetchLedgerCacheType>;
79
80 // protected because CassandraMigrationBackend inherits from this class
81protected:
82 using DefaultCassandraFamily::executor_;
83 using DefaultCassandraFamily::ledgerSequence_;
84 using DefaultCassandraFamily::log_;
85 using DefaultCassandraFamily::range_;
86 using DefaultCassandraFamily::schema_;
87
88public:
92 using DefaultCassandraFamily::DefaultCassandraFamily;
93
94 /*
95 * @brief Move constructor is deleted because handle_ is shared by reference with executor
96 */
97 BasicCassandraBackend(BasicCassandraBackend&&) = delete;
98
99 bool
100 doFinishWrites() override
101 {
102 this->waitForWritesToFinish();
103
104 if (!range_) {
105 executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
106 }
107
108 if (not this->executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
109 LOG(log_.warn()) << "Update failed for ledger " << ledgerSequence_;
110 return false;
111 }
112
113 LOG(log_.info()) << "Committed ledger " << ledgerSequence_;
114 return true;
115 }
116
119 ripple::AccountID const& issuer,
120 std::optional<std::uint32_t> const& taxon,
121 std::uint32_t const ledgerSequence,
122 std::uint32_t const limit,
123 std::optional<ripple::uint256> const& cursorIn,
124 boost::asio::yield_context yield
125 ) const override
126 {
127 NFTsAndCursor ret;
128
129 Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() {
130 if (taxon.has_value()) {
131 auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
132 r.bindAt(1, *taxon);
133 r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
134 r.bindAt(3, Limit{limit});
135 return r;
136 }
137
138 auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
139 r.bindAt(
140 1,
141 std::make_tuple(
142 cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
143 cursorIn.value_or(ripple::uint256(0))
144 )
145 );
146 r.bindAt(2, Limit{limit});
147 return r;
148 }();
149
150 // Query for all the NFTs issued by the account, potentially filtered by the taxon
151 auto const res = executor_.read(yield, idQueryStatement);
152
153 auto const& idQueryResults = res.value();
154 if (not idQueryResults.hasRows()) {
155 LOG(log_.debug()) << "No rows returned";
156 return {};
157 }
158
159 std::vector<ripple::uint256> nftIDs;
160 for (auto const [nftID] : extract<ripple::uint256>(idQueryResults))
161 nftIDs.push_back(nftID);
162
163 if (nftIDs.empty())
164 return ret;
165
166 if (nftIDs.size() == limit)
167 ret.cursor = nftIDs.back();
168
169 std::vector<Statement> selectNFTStatements;
170 selectNFTStatements.reserve(nftIDs.size());
171
172 std::transform(
173 std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
174 return schema_->selectNFT.bind(nftID, ledgerSequence);
175 }
176 );
177
178 auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
179
180 std::vector<Statement> selectNFTURIStatements;
181 selectNFTURIStatements.reserve(nftIDs.size());
182
183 std::transform(
184 std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
185 return schema_->selectNFTURI.bind(nftID, ledgerSequence);
186 }
187 );
188
189 auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
190
191 for (auto i = 0u; i < nftIDs.size(); i++) {
192 if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>();
193 maybeRow.has_value()) {
194 auto [seq, owner, isBurned] = *maybeRow;
195 NFT nft(nftIDs[i], seq, owner, isBurned);
196 if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri.has_value())
197 nft.uri = *maybeUri;
198 ret.nfts.push_back(nft);
199 }
200 }
201 return ret;
202 }
203
204 std::vector<ripple::uint256>
206 std::uint32_t number,
207 std::uint32_t pageSize,
208 std::uint32_t seq,
209 boost::asio::yield_context yield
210 ) const override
211 {
212 std::vector<ripple::uint256> liveAccounts;
213 std::optional<ripple::AccountID> lastItem;
214
215 while (liveAccounts.size() < number) {
216 Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize})
217 : schema_->selectAccountFromBeginning.bind(Limit{pageSize});
218
219 auto const res = executor_.read(yield, statement);
220 if (res) {
221 auto const& results = res.value();
222 if (not results.hasRows()) {
223 LOG(log_.debug()) << "No rows returned";
224 break;
225 }
226 // The results should not contain duplicates, we just filter out deleted accounts
227 std::vector<ripple::uint256> fullAccounts;
228 for (auto [account] : extract<ripple::AccountID>(results)) {
229 fullAccounts.push_back(ripple::keylet::account(account).key);
230 lastItem = account;
231 }
232 auto const objs = this->doFetchLedgerObjects(fullAccounts, seq, yield);
233
234 for (auto i = 0u; i < fullAccounts.size(); i++) {
235 if (not objs[i].empty()) {
236 if (liveAccounts.size() < number) {
237 liveAccounts.push_back(fullAccounts[i]);
238 } else {
239 break;
240 }
241 }
242 }
243 } else {
244 LOG(log_.error()) << "Could not fetch account from account_tx: " << res.error();
245 break;
246 }
247 }
248
249 return liveAccounts;
250 }
251};
252
253using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;
254
255} // namespace data::cassandra
std::vector< ripple::uint256 > fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield) const override
Fetch the specified number of account root object indexes by page, the accounts need to exist for seq...
Definition CassandraBackend.hpp:205
NFTsAndCursor fetchNFTsByIssuer(ripple::AccountID const &issuer, std::optional< std::uint32_t > const &taxon, std::uint32_t const ledgerSequence, std::uint32_t const limit, std::optional< ripple::uint256 > const &cursorIn, boost::asio::yield_context yield) const override
Fetches all NFTs issued by a given address.
Definition CassandraBackend.hpp:118
bool doFinishWrites() override
The implementation should wait for all pending writes to finish.
Definition CassandraBackend.hpp:100
CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface &cache, bool readOnly)
Definition CassandraBackendFamily.hpp:107
std::vector< Blob > doFetchLedgerObjects(std::vector< ripple::uint256 > const &keys, std::uint32_t const sequence, boost::asio::yield_context yield) const override
Definition CassandraBackendFamily.hpp:651
Manages the DB schema and provides access to prepared statements.
Definition CassandraSchema.hpp:41
The requirements of an execution strategy.
Definition Concepts.hpp:54
The requirements of a settings provider.
Definition Concepts.hpp:43
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:66
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:329
Represents a NFToken.
Definition Types.hpp:172
Represents a bundle of NFTs with a cursor to the next page.
Definition Types.hpp:231
A strong type wrapper for int32_t.
Definition Types.hpp:57