Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "etl/SystemState.hpp"
25#include "etlng/LedgerPublisherInterface.hpp"
26#include "etlng/impl/Loading.hpp"
27#include "feed/SubscriptionManagerInterface.hpp"
28#include "util/Assert.hpp"
29#include "util/Mutex.hpp"
30#include "util/log/Logger.hpp"
31#include "util/prometheus/Counter.hpp"
32#include "util/prometheus/Prometheus.hpp"
33
34#include <boost/asio/io_context.hpp>
35#include <boost/asio/post.hpp>
36#include <boost/asio/strand.hpp>
37#include <fmt/format.h>
38#include <xrpl/basics/chrono.h>
39#include <xrpl/protocol/Fees.h>
40#include <xrpl/protocol/LedgerHeader.h>
41#include <xrpl/protocol/SField.h>
42#include <xrpl/protocol/STObject.h>
43#include <xrpl/protocol/Serializer.h>
44
45#include <algorithm>
46#include <chrono>
47#include <cstddef>
48#include <cstdint>
49#include <functional>
50#include <memory>
51#include <mutex>
52#include <optional>
53#include <shared_mutex>
54#include <string>
55#include <thread>
56#include <utility>
57#include <vector>
58
59namespace etlng::impl {
60
73 util::Logger log_{"ETL"};
74
75 boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
76
77 std::shared_ptr<BackendInterface> backend_;
78 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
79 std::reference_wrapper<etl::SystemState const> state_; // shared state for ETL
80
81 util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
82
83 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
84 "etl_last_publish_seconds",
85 {},
86 "Seconds since epoch of the last published ledger"
87 );
88
89 util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
90
91public:
96 boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLServiceNg
97 std::shared_ptr<BackendInterface> backend,
98 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
99 etl::SystemState const& state
100 )
101 : publishStrand_{boost::asio::make_strand(ioc)}
102 , backend_{std::move(backend)}
103 , subscriptions_{std::move(subscriptions)}
104 , state_{std::cref(state)}
105 {
106 }
107
117 bool
119 uint32_t ledgerSequence,
120 std::optional<uint32_t> maxAttempts,
121 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
122 ) override
123 {
124 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
125 size_t numAttempts = 0;
126 while (not state_.get().isStopping) {
127 auto range = backend_->hardFetchLedgerRangeNoThrow();
128
129 if (!range || range->maxSequence < ledgerSequence) {
130 ++numAttempts;
131 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
132
133 // We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
134 if (maxAttempts && numAttempts >= maxAttempts) {
135 LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
136 return false;
137 }
138 std::this_thread::sleep_for(attemptsDelay);
139 continue;
140 }
141
142 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
143 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
144 });
145
146 ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
147 publish(*lgr);
148
149 return true;
150 }
151 return false;
152 }
153
161 void
162 publish(ripple::LedgerHeader const& lgrInfo)
163 {
164 boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
165 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
166
167 setLastClose(lgrInfo.closeTime);
168 auto age = lastCloseAgeSeconds();
169
170 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
171 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
172 if (age < kMAX_LEDGER_AGE_SECONDS) {
173 std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
174 return backend_->fetchFees(lgrInfo.seq, yield);
175 });
176 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
177
178 auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
179 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
180 });
181
182 auto const ledgerRange = backend_->fetchLedgerRange();
183 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
184
185 auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
186 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
187
188 // order with transaction index
189 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
190 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
191 ripple::STObject const object1(iter1, ripple::sfMetadata);
192 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
193 ripple::STObject const object2(iter2, ripple::sfMetadata);
194 return object1.getFieldU32(ripple::sfTransactionIndex) <
195 object2.getFieldU32(ripple::sfTransactionIndex);
196 });
197
198 for (auto const& txAndMeta : transactions)
199 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
200
201 subscriptions_->pubBookChanges(lgrInfo, transactions);
202
203 setLastPublishTime();
204 LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
205 } else {
206 LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
207 }
208 });
209
210 // we track latest publish-requested seq, not necessarily already published
211 setLastPublishedSequence(lgrInfo.seq);
212 }
213
217 std::uint32_t
218 lastPublishAgeSeconds() const override
219 {
220 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
221 .count();
222 }
223
227 std::chrono::time_point<std::chrono::system_clock>
228 getLastPublish() const override
229 {
230 return std::chrono::time_point<std::chrono::system_clock>{
231 std::chrono::seconds{lastPublishSeconds_.get().value()}
232 };
233 }
234
238 std::uint32_t
239 lastCloseAgeSeconds() const override
240 {
241 auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
242 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
243 .count();
244 if (now < (kRIPPLE_EPOCH_START + closeTime))
245 return 0;
246 return now - (kRIPPLE_EPOCH_START + closeTime);
247 }
248
253 std::optional<uint32_t>
255 {
256 return *lastPublishedSequence_.lock();
257 }
258
259private:
260 void
261 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
262 {
263 auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
264 *closeTime = lastCloseTime;
265 }
266
267 void
268 setLastPublishTime()
269 {
270 using namespace std::chrono;
271 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
272 lastPublishSeconds_.get().set(nowSeconds);
273 }
274
275 void
276 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
277 {
278 auto lastPublishSeq = lastPublishedSequence_.lock();
279 *lastPublishSeq = lastPublishedSequence;
280 }
281};
282
283} // namespace etlng::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:272
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:200
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:72
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:254
LedgerPublisher(boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, etl::SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:95
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:162
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:239
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:218
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:228
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:118
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:307
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:312
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:134
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:131
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:31