Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/Types.hpp"
25#include "etl/SystemState.hpp"
26#include "etlng/LedgerPublisherInterface.hpp"
27#include "etlng/impl/Loading.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
29#include "util/Assert.hpp"
30#include "util/Mutex.hpp"
31#include "util/log/Logger.hpp"
32#include "util/prometheus/Counter.hpp"
33#include "util/prometheus/Prometheus.hpp"
34
35#include <boost/asio/io_context.hpp>
36#include <boost/asio/post.hpp>
37#include <boost/asio/strand.hpp>
38#include <fmt/core.h>
39#include <xrpl/basics/chrono.h>
40#include <xrpl/protocol/Fees.h>
41#include <xrpl/protocol/LedgerHeader.h>
42#include <xrpl/protocol/SField.h>
43#include <xrpl/protocol/STObject.h>
44#include <xrpl/protocol/Serializer.h>
45
46#include <algorithm>
47#include <chrono>
48#include <cstddef>
49#include <cstdint>
50#include <functional>
51#include <memory>
52#include <mutex>
53#include <optional>
54#include <shared_mutex>
55#include <string>
56#include <thread>
57#include <utility>
58#include <vector>
59
60namespace etlng::impl {
61
74 util::Logger log_{"ETL"};
75
76 boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
77
78 std::shared_ptr<BackendInterface> backend_;
79 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
80 std::reference_wrapper<etl::SystemState const> state_; // shared state for ETL
81
82 util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
83
84 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
85 "etl_last_publish_seconds",
86 {},
87 "Seconds since epoch of the last published ledger"
88 );
89
90 util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
91
92public:
97 boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLServiceNg
98 std::shared_ptr<BackendInterface> backend,
99 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
100 etl::SystemState const& state
101 )
102 : publishStrand_{boost::asio::make_strand(ioc)}
103 , backend_{std::move(backend)}
104 , subscriptions_{std::move(subscriptions)}
105 , state_{std::cref(state)}
106 {
107 }
108
118 bool
120 uint32_t ledgerSequence,
121 std::optional<uint32_t> maxAttempts,
122 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
123 ) override
124 {
125 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
126 size_t numAttempts = 0;
127 while (not state_.get().isStopping) {
128 auto range = backend_->hardFetchLedgerRangeNoThrow();
129
130 if (!range || range->maxSequence < ledgerSequence) {
131 ++numAttempts;
132 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
133
134 // We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
135 if (maxAttempts && numAttempts >= maxAttempts) {
136 LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
137 return false;
138 }
139 std::this_thread::sleep_for(attemptsDelay);
140 continue;
141 }
142
143 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
144 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
145 });
146
147 ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
148 publish(*lgr);
149
150 return true;
151 }
152 return false;
153 }
154
162 void
163 publish(ripple::LedgerHeader const& lgrInfo)
164 {
165 boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
166 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
167
168 setLastClose(lgrInfo.closeTime);
169 auto age = lastCloseAgeSeconds();
170
171 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
172 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
173 if (age < kMAX_LEDGER_AGE_SECONDS) {
174 std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
175 return backend_->fetchFees(lgrInfo.seq, yield);
176 });
177 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
178
179 auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
180 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
181 });
182
183 auto const ledgerRange = backend_->fetchLedgerRange();
184 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
185
186 auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
187 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
188
189 // order with transaction index
190 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
191 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
192 ripple::STObject const object1(iter1, ripple::sfMetadata);
193 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
194 ripple::STObject const object2(iter2, ripple::sfMetadata);
195 return object1.getFieldU32(ripple::sfTransactionIndex) <
196 object2.getFieldU32(ripple::sfTransactionIndex);
197 });
198
199 for (auto const& txAndMeta : transactions)
200 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
201
202 subscriptions_->pubBookChanges(lgrInfo, transactions);
203
204 setLastPublishTime();
205 LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
206 } else {
207 LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
208 }
209 });
210
211 // we track latest publish-requested seq, not necessarily already published
212 setLastPublishedSequence(lgrInfo.seq);
213 }
214
218 std::uint32_t
219 lastPublishAgeSeconds() const override
220 {
221 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
222 .count();
223 }
224
228 std::chrono::time_point<std::chrono::system_clock>
229 getLastPublish() const override
230 {
231 return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
232 }};
233 }
234
238 std::uint32_t
239 lastCloseAgeSeconds() const override
240 {
241 auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
242 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
243 .count();
244 if (now < (kRIPPLE_EPOCH_START + closeTime))
245 return 0;
246 return now - (kRIPPLE_EPOCH_START + closeTime);
247 }
248
253 std::optional<uint32_t>
255 {
256 return *lastPublishedSequence_.lock();
257 }
258
259private:
260 void
261 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
262 {
263 auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
264 *closeTime = lastCloseTime;
265 }
266
267 void
268 setLastPublishTime()
269 {
270 using namespace std::chrono;
271 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
272 lastPublishSeconds_.get().set(nowSeconds);
273 }
274
275 void
276 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
277 {
278 auto lastPublishSeq = lastPublishedSequence_.lock();
279 *lastPublishSeq = lastPublishedSequence;
280 }
281};
282
283} // namespace etlng::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:318
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:194
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:73
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:254
LedgerPublisher(boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, etl::SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:96
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:163
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:239
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:219
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:229
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:119
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:134
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:132
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:31