Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LedgerPublisher.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "data/DBHelpers.hpp"
5#include "etl/LedgerPublisherInterface.hpp"
6#include "etl/SystemState.hpp"
7#include "etl/impl/Loading.hpp"
8#include "feed/SubscriptionManagerInterface.hpp"
9#include "util/Assert.hpp"
10#include "util/Mutex.hpp"
11#include "util/async/AnyExecutionContext.hpp"
12#include "util/async/AnyStrand.hpp"
13#include "util/log/Logger.hpp"
14#include "util/prometheus/Counter.hpp"
15#include "util/prometheus/Prometheus.hpp"
16
17#include <boost/asio/io_context.hpp>
18#include <boost/asio/post.hpp>
19#include <boost/asio/strand.hpp>
20#include <fmt/format.h>
21#include <xrpl/basics/chrono.h>
22#include <xrpl/protocol/Fees.h>
23#include <xrpl/protocol/LedgerHeader.h>
24#include <xrpl/protocol/SField.h>
25#include <xrpl/protocol/STObject.h>
26#include <xrpl/protocol/Serializer.h>
27
28#include <algorithm>
29#include <atomic>
30#include <chrono>
31#include <cstddef>
32#include <cstdint>
33#include <functional>
34#include <memory>
35#include <mutex>
36#include <optional>
37#include <shared_mutex>
38#include <string>
39#include <thread>
40#include <utility>
41#include <vector>
42
43namespace etl::impl {
44
58 util::Logger log_{"ETL"};
59
60 util::async::AnyStrand publishStrand_;
61
62 std::atomic_bool stop_{false};
63
64 std::shared_ptr<BackendInterface> backend_;
65 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
66 std::reference_wrapper<SystemState const> state_; // shared state for ETL
67
68 util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
69
70 std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ =
72 "etl_last_publish_seconds",
73 {},
74 "Seconds since epoch of the last published ledger"
75 );
76
77 util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
78
79public:
85 std::shared_ptr<BackendInterface> backend,
86 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
87 SystemState const& state
88 )
89 : publishStrand_{ctx.makeStrand()}
90 , backend_{std::move(backend)}
91 , subscriptions_{std::move(subscriptions)}
92 , state_{std::cref(state)}
93 {
94 }
95
105 bool
107 uint32_t ledgerSequence,
108 std::optional<uint32_t> maxAttempts,
109 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
110 ) override
111 {
112 LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
113 size_t numAttempts = 0;
114 while (not stop_) {
115 auto range = backend_->hardFetchLedgerRangeNoThrow();
116
117 if (!range || range->maxSequence < ledgerSequence) {
118 ++numAttempts;
119 LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = "
120 << ledgerSequence;
121
122 // We try maxAttempts times to publish the ledger, waiting one second in between
123 // each attempt.
124 if (maxAttempts && numAttempts >= maxAttempts) {
125 LOG(log_.debug())
126 << "Failed to publish ledger after " << numAttempts << " attempts.";
127 return false;
128 }
129 std::this_thread::sleep_for(attemptsDelay);
130 continue;
131 }
132
133 auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
134 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
135 });
136
137 ASSERT(
138 lgr.has_value(),
139 "Ledger must exist in database. Ledger sequence = {}",
140 ledgerSequence
141 );
142 publish(*lgr);
143
144 return true;
145 }
146 return false;
147 }
148
157 void
158 publish(ripple::LedgerHeader const& lgrInfo)
159 {
160 publishStrand_.submit([this, lgrInfo = lgrInfo] {
161 LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
162
163 setLastClose(lgrInfo.closeTime);
164 auto age = lastCloseAgeSeconds();
165
166 // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up
167 // and don't publish
168 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
169 if (age < kMAX_LEDGER_AGE_SECONDS) {
170 std::optional<ripple::Fees> fees =
171 data::synchronousAndRetryOnTimeout([&](auto yield) {
172 return backend_->fetchFees(lgrInfo.seq, yield);
173 });
174 ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
175
176 auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
177 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
178 });
179
180 auto const ledgerRange = backend_->fetchLedgerRange();
181 ASSERT(ledgerRange.has_value(), "Ledger range must exist");
182
183 auto const range =
184 fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
185 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
186
187 // order with transaction index
188 std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
189 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
190 ripple::STObject const object1(iter1, ripple::sfMetadata);
191 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
192 ripple::STObject const object2(iter2, ripple::sfMetadata);
193 return object1.getFieldU32(ripple::sfTransactionIndex) <
194 object2.getFieldU32(ripple::sfTransactionIndex);
195 });
196
197 for (auto const& txAndMeta : transactions)
198 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
199
200 subscriptions_->pubBookChanges(lgrInfo, transactions);
201
202 setLastPublishTime();
203 LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
204 } else {
205 LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
206 }
207 });
208
209 // we track latest publish-requested seq, not necessarily already published
210 setLastPublishedSequence(lgrInfo.seq);
211 }
212
216 std::uint32_t
217 lastPublishAgeSeconds() const override
218 {
219 return std::chrono::duration_cast<std::chrono::seconds>(
220 std::chrono::system_clock::now() - getLastPublish()
221 )
222 .count();
223 }
224
228 std::chrono::time_point<std::chrono::system_clock>
229 getLastPublish() const override
230 {
231 return std::chrono::time_point<std::chrono::system_clock>{
232 std::chrono::seconds{lastPublishSeconds_.get().value()}
233 };
234 }
235
239 std::uint32_t
240 lastCloseAgeSeconds() const override
241 {
242 auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
243 auto now = std::chrono::duration_cast<std::chrono::seconds>(
244 std::chrono::system_clock::now().time_since_epoch()
245 )
246 .count();
247 if (now < (kRIPPLE_EPOCH_START + closeTime))
248 return 0;
249 return now - (kRIPPLE_EPOCH_START + closeTime);
250 }
251
256 std::optional<uint32_t>
258 {
259 return *lastPublishedSequence_.lock();
260 }
261
268 void
270 {
271 stop_ = true;
272 }
273
274private:
275 void
276 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
277 {
278 auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
279 *closeTime = lastCloseTime;
280 }
281
282 void
283 setLastPublishTime()
284 {
285 using namespace std::chrono;
286 auto const nowSeconds =
287 duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
288 lastPublishSeconds_.get().set(nowSeconds);
289 }
290
291 void
292 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
293 {
294 auto lastPublishSeq = lastPublishedSequence_.lock();
295 *lastPublishSeq = lastPublishedSequence;
296 }
297};
298
299} // namespace etl::impl
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:273
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:211
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:229
LedgerPublisher(util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:83
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:158
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:257
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1}) override
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:106
void stop()
Stops publishing.
Definition LedgerPublisher.hpp:269
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:240
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:217
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:483
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:488
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:82
Lock< ProtectedDataType const, LockType, MutexType > lock() const
Lock the mutex and get a lock object allowing access to the protected data.
Definition Mutex.hpp:120
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
A type-erased execution context.
Definition AnyStrand.hpp:21
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:117
The interface of a scheduler for the extraction process.
Definition LedgerPublisherInterface.hpp:12
Represents the state of the ETL subsystem.
Definition SystemState.hpp:20