Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
AsyncData.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/Types.hpp"
24#include "etl/ETLHelpers.hpp"
25#include "etl/MPTHelpers.hpp"
26#include "etl/NFTHelpers.hpp"
27#include "util/Assert.hpp"
28#include "util/log/Logger.hpp"
29
30#include <grpcpp/client_context.h>
31#include <grpcpp/grpcpp.h>
32#include <grpcpp/support/status.h>
33#include <org/xrpl/rpc/v1/get_ledger_data.pb.h>
34#include <xrpl/basics/base_uint.h>
35#include <xrpl/basics/strHex.h>
36#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
37
38#include <cstddef>
39#include <cstdint>
40#include <memory>
41#include <optional>
42#include <string>
43#include <utility>
44#include <vector>
45
46namespace etl::impl {
47
49 util::Logger log_{"ETL"};
50
51 std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> cur_;
52 std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> next_;
53
54 org::xrpl::rpc::v1::GetLedgerDataRequest request_;
55 std::unique_ptr<grpc::ClientContext> context_;
56
57 grpc::Status status_;
58 unsigned char nextPrefix_;
59
60 std::string lastKey_;
61
62public:
63 AsyncCallData(uint32_t seq, ripple::uint256 const& marker, std::optional<ripple::uint256> const& nextMarker)
64 {
65 request_.mutable_ledger()->set_sequence(seq);
66 if (marker.isNonZero()) {
67 request_.set_marker(marker.data(), ripple::uint256::size());
68 }
69 request_.set_user("ETL");
70 nextPrefix_ = 0x00;
71 if (nextMarker)
72 nextPrefix_ = nextMarker->data()[0];
73
74 unsigned char const prefix = marker.data()[0];
75
76 LOG(log_.debug()) << "Setting up AsyncCallData. marker = " << ripple::strHex(marker)
77 << " . prefix = " << ripple::strHex(std::string(1, prefix))
78 << " . nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_));
79
80 ASSERT(
81 nextPrefix_ > prefix || nextPrefix_ == 0x00,
82 "Next prefix must be greater than current prefix. Got: nextPrefix_ = {}, prefix = {}",
83 nextPrefix_,
84 prefix
85 );
86
87 cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
88 next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
89 context_ = std::make_unique<grpc::ClientContext>();
90 }
91
92 enum class CallStatus { MORE, DONE, ERRORED };
93
94 CallStatus
95 process(
96 std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
97 grpc::CompletionQueue& cq,
98 BackendInterface& backend,
99 bool abort,
100 bool cacheOnly = false
101 )
102 {
103 LOG(log_.trace()) << "Processing response. "
104 << "Marker prefix = " << getMarkerPrefix();
105 if (abort) {
106 LOG(log_.error()) << "AsyncCallData aborted";
107 return CallStatus::ERRORED;
108 }
109 if (!status_.ok()) {
110 LOG(log_.error()) << "AsyncCallData status_ not ok: code = " << status_.error_code()
111 << " message = " << status_.error_message();
112 return CallStatus::ERRORED;
113 }
114 if (!next_->is_unlimited()) {
115 LOG(log_.warn()) << "AsyncCallData is_unlimited is false. "
116 << "Make sure secure_gateway is set correctly at the ETL source";
117 }
118
119 std::swap(cur_, next_);
120
121 bool more = true;
122
123 // if no marker returned, we are done
124 if (cur_->marker().empty())
125 more = false;
126
127 // if returned marker is greater than our end, we are done
128 unsigned char const prefix = cur_->marker()[0];
129 if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
130 more = false;
131
132 // if we are not done, make the next async call
133 if (more) {
134 request_.set_marker(cur_->marker());
135 call(stub, cq);
136 }
137
138 auto const numObjects = cur_->ledger_objects().objects_size();
139 LOG(log_.debug()) << "Writing " << numObjects << " objects";
140
141 std::vector<data::LedgerObject> cacheUpdates;
142 cacheUpdates.reserve(numObjects);
143
144 for (int i = 0; i < numObjects; ++i) {
145 auto& obj = *(cur_->mutable_ledger_objects()->mutable_objects(i));
146 if (!more && nextPrefix_ != 0x00) {
147 if (static_cast<unsigned char>(obj.key()[0]) >= nextPrefix_)
148 continue;
149 }
150 cacheUpdates.push_back(
151 {*ripple::uint256::fromVoidChecked(obj.key()), {obj.data().begin(), obj.data().end()}}
152 );
153 if (!cacheOnly) {
154 if (!lastKey_.empty())
155 backend.writeSuccessor(std::move(lastKey_), request_.ledger().sequence(), std::string{obj.key()});
156 lastKey_ = obj.key();
157 backend.writeNFTs(getNFTDataFromObj(request_.ledger().sequence(), obj.key(), obj.data()));
158
159 auto const maybeMPTHolder = getMPTHolderFromObj(obj.key(), obj.data());
160 if (maybeMPTHolder)
161 backend.writeMPTHolders({*maybeMPTHolder});
162
163 backend.writeLedgerObject(
164 std::move(*obj.mutable_key()), request_.ledger().sequence(), std::move(*obj.mutable_data())
165 );
166 }
167 }
168 backend.cache().update(cacheUpdates, request_.ledger().sequence(), cacheOnly);
169 LOG(log_.debug()) << "Wrote " << numObjects << " objects. Got more: " << (more ? "YES" : "NO");
170
171 return more ? CallStatus::MORE : CallStatus::DONE;
172 }
173
174 void
175 call(std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, grpc::CompletionQueue& cq)
176 {
177 context_ = std::make_unique<grpc::ClientContext>();
178
179 std::unique_ptr<grpc::ClientAsyncResponseReader<org::xrpl::rpc::v1::GetLedgerDataResponse>> rpc(
180 stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq)
181 );
182
183 rpc->StartCall();
184
185 rpc->Finish(next_.get(), &status_, this);
186 }
187
188 std::string
189 getMarkerPrefix()
190 {
191 if (next_->marker().empty()) {
192 return "";
193 }
194 return ripple::strHex(std::string{next_->marker().data()[0]});
195 }
196
197 std::string
198 getLastKey()
199 {
200 return lastKey_;
201 }
202};
203
204inline std::vector<AsyncCallData>
205makeAsyncCallData(uint32_t const sequence, uint32_t const numMarkers)
206{
207 auto const markers = getMarkers(numMarkers);
208
209 std::vector<AsyncCallData> result;
210 result.reserve(markers.size());
211
212 for (size_t i = 0; i + 1 < markers.size(); ++i) {
213 result.emplace_back(sequence, markers[i], markers[i + 1]);
214 }
215 if (not markers.empty()) {
216 result.emplace_back(sequence, markers.back(), std::nullopt);
217 }
218 return result;
219}
220
221} // namespace etl::impl
The interface to the database used by Clio.
Definition BackendInterface.hpp:140
virtual void writeNFTs(std::vector< NFTsData > const &data)=0
Writes NFTs to the database.
virtual void writeLedgerObject(std::string &&key, std::uint32_t seq, std::string &&blob)
Writes a new ledger object.
Definition BackendInterface.cpp:70
virtual void writeSuccessor(std::string &&key, std::uint32_t seq, std::string &&successor)=0
Write a new successor.
virtual void writeMPTHolders(std::vector< MPTHolderData > const &data)=0
Write accounts that started holding onto a MPT.
LedgerCacheInterface const & cache() const
Definition BackendInterface.hpp:165
virtual void update(std::vector< LedgerObject > const &objs, uint32_t seq, bool isBackground=false)=0
Update the cache with new ledger objects.
Definition AsyncData.hpp:48
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:224
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:209
std::vector< NFTsData > getNFTDataFromObj(std::uint32_t const seq, std::string const &key, std::string const &blob)
Pull NFT data from ledger object via loadInitialLedger.
Definition NFTHelpers.cpp:359
std::vector< ripple::uint256 > getMarkers(size_t numMarkers)
Parititions the uint256 keyspace into numMarkers partitions, each of equal size.
Definition ETLHelpers.cpp:31
std::optional< MPTHolderData > getMPTHolderFromObj(std::string const &key, std::string const &blob)
Pull MPT data from ledger object via loadInitialLedger.
Definition MPTHelpers.cpp:72
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37