Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/DBHelpers.hpp"
24#include "data/Types.hpp"
25#include "etl/SystemState.hpp"
26#include "etlng/LedgerPublisherInterface.hpp"
27#include "etlng/impl/Loading.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
29#include "util/Assert.hpp"
30#include "util/Mutex.hpp"
31#include "util/log/Logger.hpp"
32#include "util/prometheus/Counter.hpp"
33#include "util/prometheus/Prometheus.hpp"
34
35#include <boost/asio/io_context.hpp>
36#include <boost/asio/post.hpp>
37#include <boost/asio/strand.hpp>
38#include <xrpl/basics/chrono.h>
39#include <xrpl/protocol/Fees.h>
40#include <xrpl/protocol/LedgerHeader.h>
41#include <xrpl/protocol/SField.h>
42#include <xrpl/protocol/STObject.h>
43#include <xrpl/protocol/Serializer.h>
44
45#include <algorithm>
46#include <chrono>
47#include <cstddef>
48#include <cstdint>
49#include <functional>
50#include <memory>
51#include <mutex>
52#include <optional>
53#include <shared_mutex>
54#include <string>
55#include <thread>
56#include <utility>
57#include <vector>
58
59namespace etlng::impl {
60
73 util::Logger log_{"ETL"};
74
75 boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
76
77 std::shared_ptr<BackendInterface> backend_;
78 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
79 std::reference_wrapper<etl::SystemState const> state_; // shared state for ETL
80
81 util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
82
83 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
84 "etl_last_publish_seconds",
85 {},
86 "Seconds since epoch of the last published ledger"
87 );
88
89 util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
90
91public:
96 boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLServiceNg
97 std::shared_ptr<BackendInterface> backend,
98 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
99 etl::SystemState const& state
100 )
101 : publishStrand_{boost::asio::make_strand(ioc)}
102 , backend_{std::move(backend)}
103 , subscriptions_{std::move(subscriptions)}
104 , state_{std::cref(state)}
105 {
106 }
107
117 bool
119 uint32_t ledgerSequence,
120 std::optional<uint32_t> maxAttempts,
121 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
122 ) override
123 {
124 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
125 size_t numAttempts = 0;
126 while (not state_.get().isStopping) {
127 auto range = backend_->hardFetchLedgerRangeNoThrow();
128
129 if (!range || range->maxSequence < ledgerSequence) {
130 ++numAttempts;
131 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
132
133 // We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
134 if (maxAttempts && numAttempts >= maxAttempts) {
135 LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
136 return false;
137 }
138 std::this_thread::sleep_for(attemptsDelay);
139 continue;
140 }
141
142 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
143 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
144 });
145
146 ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
147 publish(*lgr);
148
149 return true;
150 }
151 return false;
152 }
153
161 void
162 publish(ripple::LedgerHeader const& lgrInfo)
163 {
164 boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
165 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
166
167 // TODO: This should probably not be part of publisher in the future
168 if (not state_.get().isWriting)
169 backend_->updateRange(lgrInfo.seq); // This can't be unit tested atm.
170
171 setLastClose(lgrInfo.closeTime);
172 auto age = lastCloseAgeSeconds();
173
174 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
175 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
176 if (age < kMAX_LEDGER_AGE_SECONDS) {
177 std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
178 return backend_->fetchFees(lgrInfo.seq, yield);
179 });
180 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
181
182 auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
183 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
184 });
185
186 auto const ledgerRange = backend_->fetchLedgerRange();
187 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
188
189 auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
190 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
191
192 // order with transaction index
193 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
194 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
195 ripple::STObject const object1(iter1, ripple::sfMetadata);
196 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
197 ripple::STObject const object2(iter2, ripple::sfMetadata);
198 return object1.getFieldU32(ripple::sfTransactionIndex) <
199 object2.getFieldU32(ripple::sfTransactionIndex);
200 });
201
202 for (auto const& txAndMeta : transactions)
203 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
204
205 subscriptions_->pubBookChanges(lgrInfo, transactions);
206
207 setLastPublishTime();
208 LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
209 } else {
210 LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
211 }
212 });
213
214 // we track latest publish-requested seq, not necessarily already published
215 setLastPublishedSequence(lgrInfo.seq);
216 }
217
221 std::uint32_t
222 lastPublishAgeSeconds() const override
223 {
224 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
225 .count();
226 }
227
231 std::chrono::time_point<std::chrono::system_clock>
232 getLastPublish() const override
233 {
234 return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
235 }};
236 }
237
241 std::uint32_t
242 lastCloseAgeSeconds() const override
243 {
244 auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
245 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
246 .count();
247 if (now < (kRIPPLE_EPOCH_START + closeTime))
248 return 0;
249 return now - (kRIPPLE_EPOCH_START + closeTime);
250 }
251
256 std::optional<uint32_t>
258 {
259 return *lastPublishedSequence_.lock();
260 }
261
262private:
263 void
264 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
265 {
266 auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
267 *closeTime = lastCloseTime;
268 }
269
270 void
271 setLastPublishTime()
272 {
273 using namespace std::chrono;
274 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
275 lastPublishSeconds_.get().set(nowSeconds);
276 }
277
278 void
279 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
280 {
281 auto lastPublishSeq = lastPublishedSequence_.lock();
282 *lastPublishSeq = lastPublishedSequence;
283 }
284};
285
286} // namespace etlng::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:318
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:194
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:72
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:257
LedgerPublisher(boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, etl::SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:95
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:162
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:242
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:222
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:232
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:118
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:134
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:132
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:31