22#include "data/BackendInterface.hpp"
24#include "data/LedgerCacheInterface.hpp"
25#include "data/Types.hpp"
26#include "etl/SystemState.hpp"
27#include "feed/SubscriptionManagerInterface.hpp"
28#include "util/Assert.hpp"
29#include "util/log/Logger.hpp"
30#include "util/prometheus/Counter.hpp"
31#include "util/prometheus/Prometheus.hpp"
33#include <boost/asio/io_context.hpp>
34#include <boost/asio/strand.hpp>
35#include <xrpl/basics/chrono.h>
36#include <xrpl/protocol/Fees.h>
37#include <xrpl/protocol/LedgerHeader.h>
38#include <xrpl/protocol/SField.h>
39#include <xrpl/protocol/STObject.h>
40#include <xrpl/protocol/Serializer.h>
50#include <shared_mutex>
72 boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
74 std::shared_ptr<BackendInterface> backend_;
75 std::reference_wrapper<data::LedgerCacheInterface> cache_;
76 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
77 std::reference_wrapper<SystemState const> state_;
79 std::chrono::time_point<ripple::NetClock> lastCloseTime_;
80 mutable std::shared_mutex closeTimeMtx_;
83 "etl_last_publish_seconds",
85 "Seconds since epoch of the last published ledger"
88 std::optional<uint32_t> lastPublishedSequence_;
89 mutable std::shared_mutex lastPublishedSeqMtx_;
96 boost::asio::io_context& ioc,
97 std::shared_ptr<BackendInterface> backend,
99 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
102 : publishStrand_{boost::asio::make_strand(ioc)}
103 , backend_{std::move(backend)}
105 , subscriptions_{std::move(subscriptions)}
106 , state_{std::cref(state)}
121 uint32_t ledgerSequence,
122 std::optional<uint32_t> maxAttempts,
123 std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
126 LOG(log_.
info()) <<
"Attempting to publish ledger = " << ledgerSequence;
127 size_t numAttempts = 0;
128 while (not state_.get().isStopping) {
129 auto range = backend_->hardFetchLedgerRangeNoThrow();
131 if (!range || range->maxSequence < ledgerSequence) {
133 LOG(log_.
debug()) <<
"Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
136 if (maxAttempts && numAttempts >= maxAttempts) {
137 LOG(log_.
debug()) <<
"Failed to publish ledger after " << numAttempts <<
" attempts.";
140 std::this_thread::sleep_for(attemptsDelay);
145 return backend_->fetchLedgerBySequence(ledgerSequence, yield);
148 ASSERT(lgr.has_value(),
"Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
166 boost::asio::post(publishStrand_, [
this, lgrInfo = lgrInfo]() {
167 LOG(log_.
info()) <<
"Publishing ledger " << std::to_string(lgrInfo.seq);
169 if (!state_.get().isWriting) {
170 LOG(log_.
info()) <<
"Updating ledger range for read node.";
172 if (!cache_.get().isDisabled()) {
174 return backend_->fetchLedgerDiff(lgrInfo.seq, yield);
177 cache_.get().update(diff, lgrInfo.seq);
180 backend_->updateRange(lgrInfo.seq);
183 setLastClose(lgrInfo.closeTime);
188 static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
189 if (age < kMAX_LEDGER_AGE_SECONDS) {
191 return backend_->fetchFees(lgrInfo.seq, yield);
193 ASSERT(fees.has_value(),
"Fees must exist for ledger {}", lgrInfo.seq);
195 std::vector<data::TransactionAndMetadata> transactions =
197 return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
200 auto const ledgerRange = backend_->fetchLedgerRange();
201 ASSERT(ledgerRange.has_value(),
"Ledger range must exist");
203 std::string
const range =
204 std::to_string(ledgerRange->minSequence) +
"-" + std::to_string(ledgerRange->maxSequence);
206 subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
209 std::ranges::sort(transactions, [](
auto const& t1,
auto const& t2) {
210 ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
211 ripple::STObject
const object1(iter1, ripple::sfMetadata);
212 ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
213 ripple::STObject
const object2(iter2, ripple::sfMetadata);
214 return object1.getFieldU32(ripple::sfTransactionIndex) <
215 object2.getFieldU32(ripple::sfTransactionIndex);
218 for (
auto& txAndMeta : transactions)
219 subscriptions_->pubTransaction(txAndMeta, lgrInfo);
221 subscriptions_->pubBookChanges(lgrInfo, transactions);
223 setLastPublishTime();
224 LOG(log_.
info()) <<
"Published ledger " << std::to_string(lgrInfo.seq);
226 LOG(log_.
info()) <<
"Skipping publishing ledger " << std::to_string(lgrInfo.seq);
231 setLastPublishedSequence(lgrInfo.seq);
240 return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
getLastPublish())
247 std::chrono::time_point<std::chrono::system_clock>
250 return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
260 std::shared_lock
const lck(closeTimeMtx_);
261 auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
263 auto closeTime = lastCloseTime_.time_since_epoch().count();
273 std::optional<uint32_t>
276 std::scoped_lock
const lck(lastPublishedSeqMtx_);
277 return lastPublishedSequence_;
282 setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
284 std::scoped_lock
const lck(closeTimeMtx_);
285 lastCloseTime_ = lastCloseTime;
291 using namespace std::chrono;
292 auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
293 lastPublishSeconds_.get().set(nowSeconds);
297 setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
299 std::scoped_lock
const lck(lastPublishedSeqMtx_);
300 lastPublishedSequence_ = lastPublishedSequence;
static constexpr std::uint32_t kRIPPLE_EPOCH_START
The ripple epoch start timestamp. Midnight on 1st January 2000.
Definition DBHelpers.hpp:318
static util::prometheus::CounterInt & counterInt(std::string name, util::prometheus::Labels labels, std::optional< std::string > description=std::nullopt)
Get an integer based counter metric. It will be created if it doesn't exist.
Definition Prometheus.cpp:194
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:38
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:69
LedgerPublisher(boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, data::LedgerCacheInterface &cache, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, SystemState const &state)
Create an instance of the publisher.
Definition LedgerPublisher.hpp:95
bool publish(uint32_t ledgerSequence, std::optional< uint32_t > maxAttempts, std::chrono::steady_clock::duration attemptsDelay=std::chrono::seconds{1})
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition LedgerPublisher.hpp:120
void publish(ripple::LedgerHeader const &lgrInfo)
Publish the passed ledger asynchronously.
Definition LedgerPublisher.hpp:164
std::optional< uint32_t > getLastPublishedSequence() const
Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been...
Definition LedgerPublisher.hpp:274
std::uint32_t lastPublishAgeSeconds() const
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:238
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const
Get last publish time as a time point.
Definition LedgerPublisher.hpp:248
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:258
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:132
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33