Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/LedgerCacheInterface.hpp"
25#include "data/Types.hpp"
26#include "etl/SystemState.hpp"
27#include "feed/SubscriptionManagerInterface.hpp"
28#include "util/Assert.hpp"
29#include "util/log/Logger.hpp"
30#include "util/prometheus/Counter.hpp"
31#include "util/prometheus/Prometheus.hpp"
32
33#include <boost/asio/io_context.hpp>
34#include <boost/asio/strand.hpp>
35#include <xrpl/basics/chrono.h>
36#include <xrpl/protocol/Fees.h>
37#include <xrpl/protocol/LedgerHeader.h>
38#include <xrpl/protocol/SField.h>
39#include <xrpl/protocol/STObject.h>
40#include <xrpl/protocol/Serializer.h>
41
42#include <algorithm>
43#include <chrono>
44#include <cstddef>
45#include <cstdint>
46#include <functional>
47#include <memory>
48#include <mutex>
49#include <optional>
50#include <shared_mutex>
51#include <string>
52#include <thread>
53#include <utility>
54#include <vector>
55
56namespace etl::impl {
57
70 util::Logger log_{"ETL"};
71
72 boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
73
74 std::shared_ptr<BackendInterface> backend_;
75 std::reference_wrapper<data::LedgerCacheInterface> cache_;
76 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
77 std::reference_wrapper<SystemState const> state_; // shared state for ETL
78
79 std::chrono::time_point<ripple::NetClock> lastCloseTime_;
80 mutable std::shared_mutex closeTimeMtx_;
81
82 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
83 "etl_last_publish_seconds",
84 {},
85 "Seconds since epoch of the last published ledger"
86 );
87
88 std::optional<uint32_t> lastPublishedSequence_;
89 mutable std::shared_mutex lastPublishedSeqMtx_;
90
91public:
96 boost::asio::io_context& ioc,
97 std::shared_ptr<BackendInterface> backend,
99 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
100 SystemState const& state
101 )
102 : publishStrand_{boost::asio::make_strand(ioc)}
103 , backend_{std::move(backend)}
104 , cache_{cache}
105 , subscriptions_{std::move(subscriptions)}
106 , state_{std::cref(state)}
107 {
108 }
109
119 bool
121 uint32_t ledgerSequence,
122 std::optional<uint32_t> maxAttempts,
123 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
124 )
125 {
126 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
127 size_t numAttempts = 0;
128 while (not state_.get().isStopping) {
129 auto range = backend_->hardFetchLedgerRangeNoThrow();
130
131 if (!range || range->maxSequence < ledgerSequence) {
132 ++numAttempts;
133 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
134
135 // We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
136 if (maxAttempts && numAttempts >= maxAttempts) {
137 LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
138 return false;
139 }
140 std::this_thread::sleep_for(attemptsDelay);
141 continue;
142 }
143
144 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
145 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
146 });
147
148 ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
149 publish(*lgr);
150
151 return true;
152 }
153 return false;
154 }
155
163 void
164 publish(ripple::LedgerHeader const& lgrInfo)
165 {
166 boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
167 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
168
169 if (!state_.get().isWriting) {
170 LOG(log_.info()) << "Updating ledger range for read node.";
171
172 if (!cache_.get().isDisabled()) {
173 std::vector<data::LedgerObject> const diff = data::synchronousAndRetryOnTimeout([&](auto yield) {
174 return backend_->fetchLedgerDiff(lgrInfo.seq, yield);
175 });
176
177 cache_.get().update(diff, lgrInfo.seq);
178 }
179
180 backend_->updateRange(lgrInfo.seq);
181 }
182
183 setLastClose(lgrInfo.closeTime);
184 auto age = lastCloseAgeSeconds();
185
186 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
187 // TODO: this probably should be a strategy
188 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
189 if (age < kMAX_LEDGER_AGE_SECONDS) {
190 std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
191 return backend_->fetchFees(lgrInfo.seq, yield);
192 });
193 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
194
195 std::vector<data::TransactionAndMetadata> transactions =
196 data::synchronousAndRetryOnTimeout([&](auto yield) {
197 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
198 });
199
200 auto const ledgerRange = backend_->fetchLedgerRange();
201 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
202
203 std::string const range =
204 std::to_string(ledgerRange->minSequence) + "-" + std::to_string(ledgerRange->maxSequence);
205
206 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
207
208 // order with transaction index
209 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
210 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
211 ripple::STObject const object1(iter1, ripple::sfMetadata);
212 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
213 ripple::STObject const object2(iter2, ripple::sfMetadata);
214 return object1.getFieldU32(ripple::sfTransactionIndex) <
215 object2.getFieldU32(ripple::sfTransactionIndex);
216 });
217
218 for (auto& txAndMeta : transactions)
219 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
220
221 subscriptions_->pubBookChanges(lgrInfo, transactions);
222
223 setLastPublishTime();
224 LOG(log_.info()) << "Published ledger " << std::to_string(lgrInfo.seq);
225 } else {
226 LOG(log_.info()) << "Skipping publishing ledger " << std::to_string(lgrInfo.seq);
227 }
228 });
229
230 // we track latest publish-requested seq, not necessarily already published
231 setLastPublishedSequence(lgrInfo.seq);
232 }
233
237 std::uint32_t
239 {
240 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
241 .count();
242 }
243
247 std::chrono::time_point<std::chrono::system_clock>
249 {
250 return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
251 }};
252 }
253
257 std::uint32_t
259 {
260 std::shared_lock const lck(closeTimeMtx_);
261 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
262 .count();
263 auto closeTime = lastCloseTime_.time_since_epoch().count();
264 if (now < (kRIPPLE_EPOCH_START + closeTime))
265 return 0;
266 return now - (kRIPPLE_EPOCH_START + closeTime);
267 }
268
273 std::optional<uint32_t>
275 {
276 std::scoped_lock const lck(lastPublishedSeqMtx_);
277 return lastPublishedSequence_;
278 }
279
280private:
281 void
282 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
283 {
284 std::scoped_lock const lck(closeTimeMtx_);
285 lastCloseTime_ = lastCloseTime;
286 }
287
288 void
289 setLastPublishTime()
290 {
291 using namespace std::chrono;
292 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
293 lastPublishSeconds_.get().set(nowSeconds);
294 }
295
296 void
297 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
298 {
299 std::scoped_lock const lck(lastPublishedSeqMtx_);
300 lastPublishedSequence_ = lastPublishedSequence;
301 }
302};
303
304} // namespace etl::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:318
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:194
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:38
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:69
LedgerPublisher(boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, data::LedgerCacheInterface &cache, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:95
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1})
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:120
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:164
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:274
std::uint32_t lastPublishAgeSeconds() const
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:238
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const
Get last publish time as a time point.
Definition LedgerPublisher.hpp:248
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:258
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:132
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33