Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "etl/LedgerPublisherInterface.hpp"
25#include "etl/SystemState.hpp"
26#include "etl/impl/Loading.hpp"
27#include "feed/SubscriptionManagerInterface.hpp"
28#include "util/Assert.hpp"
29#include "util/Mutex.hpp"
30#include "util/async/AnyExecutionContext.hpp"
31#include "util/async/AnyStrand.hpp"
32#include "util/log/Logger.hpp"
33#include "util/prometheus/Counter.hpp"
34#include "util/prometheus/Prometheus.hpp"
35
36#include <boost/asio/io_context.hpp>
37#include <boost/asio/post.hpp>
38#include <boost/asio/strand.hpp>
39#include <fmt/format.h>
40#include <xrpl/basics/chrono.h>
41#include <xrpl/protocol/Fees.h>
42#include <xrpl/protocol/LedgerHeader.h>
43#include <xrpl/protocol/SField.h>
44#include <xrpl/protocol/STObject.h>
45#include <xrpl/protocol/Serializer.h>
46
47#include <algorithm>
48#include <atomic>
49#include <chrono>
50#include <cstddef>
51#include <cstdint>
52#include <functional>
53#include <memory>
54#include <mutex>
55#include <optional>
56#include <shared_mutex>
57#include <string>
58#include <thread>
59#include <utility>
60#include <vector>
61
62namespace etl::impl {
63
76 util::Logger log_{"ETL"};
77
78 util::async::AnyStrand publishStrand_;
79
80 std::atomic_bool stop_{false};
81
82 std::shared_ptr<BackendInterface> backend_;
83 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
84 std::reference_wrapper<SystemState const> state_; // shared state for ETL
85
86 util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
87
88 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
89 "etl_last_publish_seconds",
90 {},
91 "Seconds since epoch of the last published ledger"
92 );
93
94 util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
95
96public:
102 std::shared_ptr<BackendInterface> backend,
103 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
104 SystemState const& state
105 )
106 : publishStrand_{ctx.makeStrand()}
107 , backend_{std::move(backend)}
108 , subscriptions_{std::move(subscriptions)}
109 , state_{std::cref(state)}
110 {
111 }
112
122 bool
124 uint32_t ledgerSequence,
125 std::optional<uint32_t> maxAttempts,
126 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
127 ) override
128 {
129 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
130 size_t numAttempts = 0;
131 while (not stop_) {
132 auto range = backend_->hardFetchLedgerRangeNoThrow();
133
134 if (!range || range->maxSequence < ledgerSequence) {
135 ++numAttempts;
136 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
137
138 // We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
139 if (maxAttempts && numAttempts >= maxAttempts) {
140 LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
141 return false;
142 }
143 std::this_thread::sleep_for(attemptsDelay);
144 continue;
145 }
146
147 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
148 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
149 });
150
151 ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
152 publish(*lgr);
153
154 return true;
155 }
156 return false;
157 }
158
166 void
167 publish(ripple::LedgerHeader const& lgrInfo)
168 {
169 publishStrand_.submit([this, lgrInfo = lgrInfo] {
170 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
171
172 setLastClose(lgrInfo.closeTime);
173 auto age = lastCloseAgeSeconds();
174
175 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
176 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
177 if (age < kMAX_LEDGER_AGE_SECONDS) {
178 std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
179 return backend_->fetchFees(lgrInfo.seq, yield);
180 });
181 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
182
183 auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
184 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
185 });
186
187 auto const ledgerRange = backend_->fetchLedgerRange();
188 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
189
190 auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
191 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
192
193 // order with transaction index
194 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
195 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
196 ripple::STObject const object1(iter1, ripple::sfMetadata);
197 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
198 ripple::STObject const object2(iter2, ripple::sfMetadata);
199 return object1.getFieldU32(ripple::sfTransactionIndex) <
200 object2.getFieldU32(ripple::sfTransactionIndex);
201 });
202
203 for (auto const& txAndMeta : transactions)
204 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
205
206 subscriptions_->pubBookChanges(lgrInfo, transactions);
207
208 setLastPublishTime();
209 LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
210 } else {
211 LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
212 }
213 });
214
215 // we track latest publish-requested seq, not necessarily already published
216 setLastPublishedSequence(lgrInfo.seq);
217 }
218
222 std::uint32_t
223 lastPublishAgeSeconds() const override
224 {
225 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
226 .count();
227 }
228
232 std::chrono::time_point<std::chrono::system_clock>
233 getLastPublish() const override
234 {
235 return std::chrono::time_point<std::chrono::system_clock>{
236 std::chrono::seconds{lastPublishSeconds_.get().value()}
237 };
238 }
239
243 std::uint32_t
244 lastCloseAgeSeconds() const override
245 {
246 auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
247 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
248 .count();
249 if (now < (kRIPPLE_EPOCH_START + closeTime))
250 return 0;
251 return now - (kRIPPLE_EPOCH_START + closeTime);
252 }
253
258 std::optional<uint32_t>
260 {
261 return *lastPublishedSequence_.lock();
262 }
263
270 void
272 {
273 stop_ = true;
274 }
275
276private:
277 void
278 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
279 {
280 auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
281 *closeTime = lastCloseTime;
282 }
283
284 void
285 setLastPublishTime()
286 {
287 using namespace std::chrono;
288 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
289 lastPublishSeconds_.get().set(nowSeconds);
290 }
291
292 void
293 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
294 {
295 auto lastPublishSeq = lastPublishedSequence_.lock();
296 *lastPublishSeq = lastPublishedSequence;
297 }
298};
299
300} // namespace etl::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:272
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:200
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:233
LedgerPublisher(util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:100
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:167
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:259
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:123
void stop()
Stops publishing.
Definition LedgerPublisher.hpp:271
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:244
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:223
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:95
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:481
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:486
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:139
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
A type-erased execution context.
Definition AnyStrand.hpp:40
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:131
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:31
Represents the state of the ETL subsystem.
Definition SystemState.hpp:38