Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/LedgerCacheInterface.hpp"
25#include "data/Types.hpp"
26#include "etl/SystemState.hpp"
27#include "etlng/LedgerPublisherInterface.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
29#include "util/Assert.hpp"
30#include "util/log/Logger.hpp"
31#include "util/prometheus/Counter.hpp"
32#include "util/prometheus/Prometheus.hpp"
33
34#include <boost/asio/io_context.hpp>
35#include <boost/asio/post.hpp>
36#include <boost/asio/strand.hpp>
37#include <xrpl/basics/chrono.h>
38#include <xrpl/protocol/Fees.h>
39#include <xrpl/protocol/LedgerHeader.h>
40#include <xrpl/protocol/SField.h>
41#include <xrpl/protocol/STObject.h>
42#include <xrpl/protocol/Serializer.h>
43
44#include <algorithm>
45#include <chrono>
46#include <cstddef>
47#include <cstdint>
48#include <functional>
49#include <memory>
50#include <mutex>
51#include <optional>
52#include <shared_mutex>
53#include <string>
54#include <thread>
55#include <utility>
56#include <vector>
57
58namespace etl::impl {
59
72 util::Logger log_{"ETL"};
73
74 boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
75
76 std::shared_ptr<BackendInterface> backend_;
77 std::reference_wrapper<data::LedgerCacheInterface> cache_;
78 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
79 std::reference_wrapper<SystemState const> state_; // shared state for ETL
80
81 std::chrono::time_point<ripple::NetClock> lastCloseTime_;
82 mutable std::shared_mutex closeTimeMtx_;
83
84 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
85 "etl_last_publish_seconds",
86 {},
87 "Seconds since epoch of the last published ledger"
88 );
89
90 std::optional<uint32_t> lastPublishedSequence_;
91 mutable std::shared_mutex lastPublishedSeqMtx_;
92
93public:
98 boost::asio::io_context& ioc,
99 std::shared_ptr<BackendInterface> backend,
101 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
102 SystemState const& state
103 )
104 : publishStrand_{boost::asio::make_strand(ioc)}
105 , backend_{std::move(backend)}
106 , cache_{cache}
107 , subscriptions_{std::move(subscriptions)}
108 , state_{std::cref(state)}
109 {
110 }
111
121 bool
123 uint32_t ledgerSequence,
124 std::optional<uint32_t> maxAttempts,
125 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
126 ) override
127 {
128 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
129 size_t numAttempts = 0;
130 while (not state_.get().isStopping) {
131 auto range = backend_->hardFetchLedgerRangeNoThrow();
132
133 if (!range || range->maxSequence < ledgerSequence) {
134 ++numAttempts;
135 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
136
137 // We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
138 if (maxAttempts && numAttempts >= maxAttempts) {
139 LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
140 return false;
141 }
142 std::this_thread::sleep_for(attemptsDelay);
143 continue;
144 }
145
146 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
147 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
148 });
149
150 ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
151 publish(*lgr);
152
153 return true;
154 }
155 return false;
156 }
157
165 void
166 publish(ripple::LedgerHeader const& lgrInfo)
167 {
168 boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
169 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
170
171 if (!state_.get().isWriting) {
172 LOG(log_.info()) << "Updating ledger range for read node.";
173
174 if (!cache_.get().isDisabled()) {
175 std::vector<data::LedgerObject> const diff = data::synchronousAndRetryOnTimeout([&](auto yield) {
176 return backend_->fetchLedgerDiff(lgrInfo.seq, yield);
177 });
178
179 cache_.get().update(diff, lgrInfo.seq);
180 }
181
182 backend_->updateRange(lgrInfo.seq);
183 }
184
185 setLastClose(lgrInfo.closeTime);
186 auto age = lastCloseAgeSeconds();
187
188 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
189 // TODO: this probably should be a strategy
190 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
191 if (age < kMAX_LEDGER_AGE_SECONDS) {
192 std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
193 return backend_->fetchFees(lgrInfo.seq, yield);
194 });
195 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
196
197 std::vector<data::TransactionAndMetadata> transactions =
198 data::synchronousAndRetryOnTimeout([&](auto yield) {
199 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
200 });
201
202 auto const ledgerRange = backend_->fetchLedgerRange();
203 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
204
205 std::string const range =
206 std::to_string(ledgerRange->minSequence) + "-" + std::to_string(ledgerRange->maxSequence);
207
208 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
209
210 // order with transaction index
211 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
212 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
213 ripple::STObject const object1(iter1, ripple::sfMetadata);
214 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
215 ripple::STObject const object2(iter2, ripple::sfMetadata);
216 return object1.getFieldU32(ripple::sfTransactionIndex) <
217 object2.getFieldU32(ripple::sfTransactionIndex);
218 });
219
220 for (auto& txAndMeta : transactions)
221 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
222
223 subscriptions_->pubBookChanges(lgrInfo, transactions);
224
225 setLastPublishTime();
226 LOG(log_.info()) << "Published ledger " << std::to_string(lgrInfo.seq);
227 } else {
228 LOG(log_.info()) << "Skipping publishing ledger " << std::to_string(lgrInfo.seq);
229 }
230 });
231
232 // we track latest publish-requested seq, not necessarily already published
233 setLastPublishedSequence(lgrInfo.seq);
234 }
235
239 std::uint32_t
240 lastPublishAgeSeconds() const override
241 {
242 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
243 .count();
244 }
245
249 std::chrono::time_point<std::chrono::system_clock>
250 getLastPublish() const override
251 {
252 return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
253 }};
254 }
255
259 std::uint32_t
260 lastCloseAgeSeconds() const override
261 {
262 std::shared_lock const lck(closeTimeMtx_);
263 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
264 .count();
265 auto closeTime = lastCloseTime_.time_since_epoch().count();
266 if (now < (kRIPPLE_EPOCH_START + closeTime))
267 return 0;
268 return now - (kRIPPLE_EPOCH_START + closeTime);
269 }
270
275 std::optional<uint32_t>
277 {
278 std::scoped_lock const lck(lastPublishedSeqMtx_);
279 return lastPublishedSequence_;
280 }
281
282private:
283 void
284 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
285 {
286 std::scoped_lock const lck(closeTimeMtx_);
287 lastCloseTime_ = lastCloseTime;
288 }
289
290 void
291 setLastPublishTime()
292 {
293 using namespace std::chrono;
294 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
295 lastPublishSeconds_.get().set(nowSeconds);
296 }
297
298 void
299 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
300 {
301 std::scoped_lock const lck(lastPublishedSeqMtx_);
302 lastPublishedSequence_ = lastPublishedSequence;
303 }
304};
305
306} // namespace etl::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:318
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:194
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:38
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:71
LedgerPublisher(boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, data::LedgerCacheInterface &cache, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:97
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:250
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:166
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last scheduled ledger to publish, Be aware that the ledger may not have been ...
Definition LedgerPublisher.hpp:276
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:122
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:260
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:240
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:132
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:31