Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "etl/LedgerPublisherInterface.hpp"
25#include "etl/SystemState.hpp"
26#include "etl/impl/Loading.hpp"
27#include "feed/SubscriptionManagerInterface.hpp"
28#include "util/Assert.hpp"
29#include "util/Mutex.hpp"
30#include "util/async/AnyExecutionContext.hpp"
31#include "util/async/AnyStrand.hpp"
32#include "util/log/Logger.hpp"
33#include "util/prometheus/Counter.hpp"
34#include "util/prometheus/Prometheus.hpp"
35
36#include <boost/asio/io_context.hpp>
37#include <boost/asio/post.hpp>
38#include <boost/asio/strand.hpp>
39#include <fmt/format.h>
40#include <xrpl/basics/chrono.h>
41#include <xrpl/protocol/Fees.h>
42#include <xrpl/protocol/LedgerHeader.h>
43#include <xrpl/protocol/SField.h>
44#include <xrpl/protocol/STObject.h>
45#include <xrpl/protocol/Serializer.h>
46
47#include <algorithm>
48#include <atomic>
49#include <chrono>
50#include <cstddef>
51#include <cstdint>
52#include <functional>
53#include <memory>
54#include <mutex>
55#include <optional>
56#include <shared_mutex>
57#include <string>
58#include <thread>
59#include <utility>
60#include <vector>
61
62namespace etl::impl {
63
77 util::Logger log_{"ETL"};
78
79 util::async::AnyStrand publishStrand_;
80
81 std::atomic_bool stop_{false};
82
83 std::shared_ptr<BackendInterface> backend_;
84 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
85 std::reference_wrapper<SystemState const> state_; // shared state for ETL
86
87 util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
88
89 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ =
91 "etl_last_publish_seconds",
92 {},
93 "Seconds since epoch of the last published ledger"
94 );
95
96 util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
97
98public:
104 std::shared_ptr<BackendInterface> backend,
105 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
106 SystemState const& state
107 )
108 : publishStrand_{ctx.makeStrand()}
109 , backend_{std::move(backend)}
110 , subscriptions_{std::move(subscriptions)}
111 , state_{std::cref(state)}
112 {
113 }
114
124 bool
126 uint32_t ledgerSequence,
127 std::optional<uint32_t> maxAttempts,
128 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
129 ) override
130 {
131 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
132 size_t numAttempts = 0;
133 while (not stop_) {
134 auto range = backend_->hardFetchLedgerRangeNoThrow();
135
136 if (!range || range->maxSequence < ledgerSequence) {
137 ++numAttempts;
138 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = "
139 << ledgerSequence;
140
141 // We try maxAttempts times to publish the ledger, waiting one second in between
142 // each attempt.
143 if (maxAttempts && numAttempts >= maxAttempts) {
144 LOG(log_.debug())
145 << "Failed to publish ledger after " << numAttempts << " attempts.";
146 return false;
147 }
148 std::this_thread::sleep_for(attemptsDelay);
149 continue;
150 }
151
152 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
153 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
154 });
155
156 ASSERT(
157 lgr.has_value(),
158 "Ledger must exist in database. Ledger sequence = {}",
159 ledgerSequence
160 );
161 publish(*lgr);
162
163 return true;
164 }
165 return false;
166 }
167
176 void
177 publish(ripple::LedgerHeader const& lgrInfo)
178 {
179 publishStrand_.submit([this, lgrInfo = lgrInfo] {
180 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
181
182 setLastClose(lgrInfo.closeTime);
183 auto age = lastCloseAgeSeconds();
184
185 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up
186 // and don't publish
187 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
188 if (age < kMAX_LEDGER_AGE_SECONDS) {
189 std::optional<ripple::Fees> fees =
190 data::synchronousAndRetryOnTimeout([&](auto yield) {
191 return backend_->fetchFees(lgrInfo.seq, yield);
192 });
193 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
194
195 auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
196 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
197 });
198
199 auto const ledgerRange = backend_->fetchLedgerRange();
200 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
201
202 auto const range =
203 fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
204 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
205
206 // order with transaction index
207 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
208 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
209 ripple::STObject const object1(iter1, ripple::sfMetadata);
210 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
211 ripple::STObject const object2(iter2, ripple::sfMetadata);
212 return object1.getFieldU32(ripple::sfTransactionIndex) <
213 object2.getFieldU32(ripple::sfTransactionIndex);
214 });
215
216 for (auto const& txAndMeta : transactions)
217 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
218
219 subscriptions_->pubBookChanges(lgrInfo, transactions);
220
221 setLastPublishTime();
222 LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
223 } else {
224 LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
225 }
226 });
227
228 // we track latest publish-requested seq, not necessarily already published
229 setLastPublishedSequence(lgrInfo.seq);
230 }
231
235 std::uint32_t
236 lastPublishAgeSeconds() const override
237 {
238 return std::chrono::duration_cast<std::chrono::seconds>(
239 std::chrono::system_clock::now() - getLastPublish()
240 )
241 .count();
242 }
243
247 std::chrono::time_point<std::chrono::system_clock>
248 getLastPublish() const override
249 {
250 return std::chrono::time_point<std::chrono::system_clock>{
251 std::chrono::seconds{lastPublishSeconds_.get().value()}
252 };
253 }
254
258 std::uint32_t
259 lastCloseAgeSeconds() const override
260 {
261 auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
262 auto now = std::chrono::duration_cast<std::chrono::seconds>(
263 std::chrono::system_clock::now().time_since_epoch()
264 )
265 .count();
266 if (now < (kRIPPLE_EPOCH_START + closeTime))
267 return 0;
268 return now - (kRIPPLE_EPOCH_START + closeTime);
269 }
270
275 std::optional<uint32_t>
277 {
278 return *lastPublishedSequence_.lock();
279 }
280
287 void
289 {
290 stop_ = true;
291 }
292
293private:
294 void
295 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
296 {
297 auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
298 *closeTime = lastCloseTime;
299 }
300
301 void
302 setLastPublishTime()
303 {
304 using namespace std::chrono;
305 auto const nowSeconds =
306 duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
307 lastPublishSeconds_.get().set(nowSeconds);
308 }
309
310 void
311 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
312 {
313 auto lastPublishSeq = lastPublishedSequence_.lock();
314 *lastPublishSeq = lastPublishedSequence;
315 }
316};
317
318} // namespace etl::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:292
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:230
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:248
LedgerPublisher(util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:102
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:177
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:276
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:125
void stop()
Stops publishing.
Definition LedgerPublisher.hpp:288
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:259
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:236
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:502
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:507
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:139
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
A type-erased execution context.
Definition AnyStrand.hpp:40
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:136
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:31
Represents the state of the ETL subsystem.
Definition SystemState.hpp:38