Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "etl/LedgerPublisherInterface.hpp"
25#include "etl/SystemState.hpp"
26#include "etl/impl/Loading.hpp"
27#include "feed/SubscriptionManagerInterface.hpp"
28#include "util/Assert.hpp"
29#include "util/Mutex.hpp"
30#include "util/async/AnyExecutionContext.hpp"
31#include "util/async/AnyStrand.hpp"
32#include "util/log/Logger.hpp"
33#include "util/prometheus/Counter.hpp"
34#include "util/prometheus/Prometheus.hpp"
35
36#include <boost/asio/io_context.hpp>
37#include <boost/asio/post.hpp>
38#include <boost/asio/strand.hpp>
39#include <fmt/format.h>
40#include <xrpl/basics/chrono.h>
41#include <xrpl/protocol/Fees.h>
42#include <xrpl/protocol/LedgerHeader.h>
43#include <xrpl/protocol/SField.h>
44#include <xrpl/protocol/STObject.h>
45#include <xrpl/protocol/Serializer.h>
46
47#include <algorithm>
48#include <chrono>
49#include <cstddef>
50#include <cstdint>
51#include <functional>
52#include <memory>
53#include <mutex>
54#include <optional>
55#include <shared_mutex>
56#include <string>
57#include <thread>
58#include <utility>
59#include <vector>
60
61namespace etl::impl {
62
75 util::Logger log_{"ETL"};
76
77 util::async::AnyStrand publishStrand_;
78
79 std::shared_ptr<BackendInterface> backend_;
80 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
81 std::reference_wrapper<SystemState const> state_; // shared state for ETL
82
83 util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
84
85 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
86 "etl_last_publish_seconds",
87 {},
88 "Seconds since epoch of the last published ledger"
89 );
90
91 util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
92
93public:
99 std::shared_ptr<BackendInterface> backend,
100 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
101 SystemState const& state
102 )
103 : publishStrand_{ctx.makeStrand()}
104 , backend_{std::move(backend)}
105 , subscriptions_{std::move(subscriptions)}
106 , state_{std::cref(state)}
107 {
108 }
109
119 bool
121 uint32_t ledgerSequence,
122 std::optional<uint32_t> maxAttempts,
123 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
124 ) override
125 {
126 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
127 size_t numAttempts = 0;
128 while (not state_.get().isStopping) {
129 auto range = backend_->hardFetchLedgerRangeNoThrow();
130
131 if (!range || range->maxSequence < ledgerSequence) {
132 ++numAttempts;
133 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
134
135 // We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
136 if (maxAttempts && numAttempts >= maxAttempts) {
137 LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
138 return false;
139 }
140 std::this_thread::sleep_for(attemptsDelay);
141 continue;
142 }
143
144 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
145 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
146 });
147
148 ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
149 publish(*lgr);
150
151 return true;
152 }
153 return false;
154 }
155
163 void
164 publish(ripple::LedgerHeader const& lgrInfo)
165 {
166 publishStrand_.submit([this, lgrInfo = lgrInfo] {
167 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
168
169 setLastClose(lgrInfo.closeTime);
170 auto age = lastCloseAgeSeconds();
171
172 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
173 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
174 if (age < kMAX_LEDGER_AGE_SECONDS) {
175 std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
176 return backend_->fetchFees(lgrInfo.seq, yield);
177 });
178 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
179
180 auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
181 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
182 });
183
184 auto const ledgerRange = backend_->fetchLedgerRange();
185 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
186
187 auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
188 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
189
190 // order with transaction index
191 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
192 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
193 ripple::STObject const object1(iter1, ripple::sfMetadata);
194 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
195 ripple::STObject const object2(iter2, ripple::sfMetadata);
196 return object1.getFieldU32(ripple::sfTransactionIndex) <
197 object2.getFieldU32(ripple::sfTransactionIndex);
198 });
199
200 for (auto const& txAndMeta : transactions)
201 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
202
203 subscriptions_->pubBookChanges(lgrInfo, transactions);
204
205 setLastPublishTime();
206 LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
207 } else {
208 LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
209 }
210 });
211
212 // we track latest publish-requested seq, not necessarily already published
213 setLastPublishedSequence(lgrInfo.seq);
214 }
215
219 std::uint32_t
220 lastPublishAgeSeconds() const override
221 {
222 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
223 .count();
224 }
225
229 std::chrono::time_point<std::chrono::system_clock>
230 getLastPublish() const override
231 {
232 return std::chrono::time_point<std::chrono::system_clock>{
233 std::chrono::seconds{lastPublishSeconds_.get().value()}
234 };
235 }
236
240 std::uint32_t
241 lastCloseAgeSeconds() const override
242 {
243 auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
244 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
245 .count();
246 if (now < (kRIPPLE_EPOCH_START + closeTime))
247 return 0;
248 return now - (kRIPPLE_EPOCH_START + closeTime);
249 }
250
255 std::optional<uint32_t>
257 {
258 return *lastPublishedSequence_.lock();
259 }
260
261private:
262 void
263 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
264 {
265 auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
266 *closeTime = lastCloseTime;
267 }
268
269 void
270 setLastPublishTime()
271 {
272 using namespace std::chrono;
273 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
274 lastPublishSeconds_.get().set(nowSeconds);
275 }
276
277 void
278 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
279 {
280 auto lastPublishSeq = lastPublishedSequence_.lock();
281 *lastPublishSeq = lastPublishedSequence;
282 }
283};
284
285} // namespace etl::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:272
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:200
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:230
LedgerPublisher(util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:97
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:164
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:256
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:120
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:241
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:220
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:95
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:475
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:480
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:139
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
A type-erased execution context.
Definition AnyStrand.hpp:40
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:131
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:31
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33