51 std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> cur_;
52 std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> next_;
54 org::xrpl::rpc::v1::GetLedgerDataRequest request_;
55 std::unique_ptr<grpc::ClientContext> context_;
58 unsigned char nextPrefix_;
63 AsyncCallData(uint32_t seq, ripple::uint256
const& marker, std::optional<ripple::uint256>
const& nextMarker)
65 request_.mutable_ledger()->set_sequence(seq);
66 if (marker.isNonZero()) {
67 request_.set_marker(marker.data(), ripple::uint256::size());
69 request_.set_user(
"ETL");
72 nextPrefix_ = nextMarker->data()[0];
74 unsigned char const prefix = marker.data()[0];
76 LOG(log_.
debug()) <<
"Setting up AsyncCallData. marker = " << ripple::strHex(marker)
77 <<
" . prefix = " << ripple::strHex(std::string(1, prefix))
78 <<
" . nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_));
81 nextPrefix_ > prefix || nextPrefix_ == 0x00,
82 "Next prefix must be greater than current prefix. Got: nextPrefix_ = {}, prefix = {}",
87 cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
88 next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
89 context_ = std::make_unique<grpc::ClientContext>();
92 enum class CallStatus { MORE, DONE, ERRORED };
96 std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
97 grpc::CompletionQueue& cq,
100 bool cacheOnly =
false
103 LOG(log_.
trace()) <<
"Processing response. "
104 <<
"Marker prefix = " << getMarkerPrefix();
106 LOG(log_.
error()) <<
"AsyncCallData aborted";
107 return CallStatus::ERRORED;
110 LOG(log_.
error()) <<
"AsyncCallData status_ not ok: code = " << status_.error_code()
111 <<
" message = " << status_.error_message();
112 return CallStatus::ERRORED;
114 if (!next_->is_unlimited()) {
115 LOG(log_.
warn()) <<
"AsyncCallData is_unlimited is false. "
116 <<
"Make sure secure_gateway is set correctly at the ETL source";
119 std::swap(cur_, next_);
124 if (cur_->marker().empty())
128 unsigned char const prefix = cur_->marker()[0];
129 if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
134 request_.set_marker(cur_->marker());
138 auto const numObjects = cur_->ledger_objects().objects_size();
139 LOG(log_.
debug()) <<
"Writing " << numObjects <<
" objects";
141 std::vector<data::LedgerObject> cacheUpdates;
142 cacheUpdates.reserve(numObjects);
144 for (
int i = 0; i < numObjects; ++i) {
145 auto& obj = *(cur_->mutable_ledger_objects()->mutable_objects(i));
146 if (!more && nextPrefix_ != 0x00) {
147 if (
static_cast<unsigned char>(obj.key()[0]) >= nextPrefix_)
150 cacheUpdates.push_back(
151 {*ripple::uint256::fromVoidChecked(obj.key()), {obj.data().begin(), obj.data().end()}}
154 if (!lastKey_.empty())
155 backend.
writeSuccessor(std::move(lastKey_), request_.ledger().sequence(), std::string{obj.key()});
156 lastKey_ = obj.key();
164 std::move(*obj.mutable_key()), request_.ledger().sequence(), std::move(*obj.mutable_data())
168 backend.
cache().
update(cacheUpdates, request_.ledger().sequence(), cacheOnly);
169 LOG(log_.
debug()) <<
"Wrote " << numObjects <<
" objects. Got more: " << (more ?
"YES" :
"NO");
171 return more ? CallStatus::MORE : CallStatus::DONE;
175 call(std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, grpc::CompletionQueue& cq)
177 context_ = std::make_unique<grpc::ClientContext>();
179 std::unique_ptr<grpc::ClientAsyncResponseReader<org::xrpl::rpc::v1::GetLedgerDataResponse>>
rpc(
180 stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq)
185 rpc->Finish(next_.get(), &status_,
this);
191 if (next_->marker().empty()) {
194 return ripple::strHex(std::string{next_->marker().data()[0]});