22#include "data/BackendInterface.hpp"
23#include "data/LedgerCache.hpp"
24#include "data/Types.hpp"
25#include "etl/CacheLoader.hpp"
26#include "etl/ETLState.hpp"
29#include "etl/SystemState.hpp"
30#include "etl/impl/AmendmentBlockHandler.hpp"
31#include "etl/impl/LedgerFetcher.hpp"
32#include "etl/impl/LedgerPublisher.hpp"
33#include "etlng/AmendmentBlockHandlerInterface.hpp"
34#include "etlng/ETLServiceInterface.hpp"
35#include "etlng/ExtractorInterface.hpp"
37#include "etlng/impl/AmendmentBlockHandler.hpp"
38#include "etlng/impl/Extraction.hpp"
39#include "etlng/impl/Loading.hpp"
40#include "etlng/impl/Monitor.hpp"
41#include "etlng/impl/Registry.hpp"
42#include "etlng/impl/Scheduling.hpp"
43#include "etlng/impl/TaskManager.hpp"
44#include "etlng/impl/ext/Cache.hpp"
45#include "etlng/impl/ext/Core.hpp"
46#include "etlng/impl/ext/NFT.hpp"
47#include "etlng/impl/ext/Successor.hpp"
48#include "feed/SubscriptionManagerInterface.hpp"
49#include "util/Assert.hpp"
50#include "util/Profiler.hpp"
51#include "util/async/context/BasicExecutionContext.hpp"
52#include "util/config/Config.hpp"
53#include "util/log/Logger.hpp"
54#include "util/newconfig/ConfigDefinition.hpp"
56#include <boost/json/object.hpp>
58#include <xrpl/basics/Blob.h>
59#include <xrpl/basics/base_uint.h>
60#include <xrpl/basics/strHex.h>
61#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
62#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
63#include <xrpl/protocol/LedgerHeader.h>
64#include <xrpl/protocol/STTx.h>
65#include <xrpl/protocol/TxFormats.h>
66#include <xrpl/protocol/TxMeta.h>
96 std::shared_ptr<BackendInterface> backend_;
97 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
98 std::shared_ptr<etlng::LoadBalancerInterface> balancer_;
99 std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers_;
100 std::shared_ptr<etl::CacheLoader<>> cacheLoader_;
102 std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
103 std::shared_ptr<ExtractorInterface> extractor_;
108 std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
109 std::shared_ptr<impl::Loader> loader_;
111 std::optional<util::async::CoroExecutionContext::Operation<void>> mainLoop_;
125 std::shared_ptr<BackendInterface> backend,
126 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
127 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
128 std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers
130 : backend_(std::move(backend))
131 , subscriptions_(std::move(subscriptions))
132 , balancer_(std::move(balancer))
133 , ledgers_(std::move(ledgers))
134 , cacheLoader_(std::make_shared<
etl::CacheLoader<>>(config, backend_, backend_->cache()))
135 , fetcher_(std::make_shared<
etl::impl::LedgerFetcher>(backend_, balancer_))
136 , extractor_(std::make_shared<impl::Extractor>(fetcher_))
137 , amendmentBlockHandler_(std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx_, state_))
138 , loader_(std::make_shared<impl::Loader>(
142 impl::CacheExt{backend_->cache()},
143 impl::CoreExt{backend_},
144 impl::SuccessorExt{backend_, backend_->cache()},
145 impl::NFTExt{backend_}
147 amendmentBlockHandler_
150 LOG(log_.
info()) <<
"Creating ETLng...";
155 LOG(log_.
debug()) <<
"Stopping ETLng";
161 LOG(log_.
info()) <<
"run() in ETLng...";
163 mainLoop_.emplace(ctx_.
execute([
this] {
164 auto const rng = loadInitialLedgerIfNeeded();
166 LOG(log_.info()) <<
"Waiting for next ledger to be validated by network...";
167 std::optional<uint32_t> const mostRecentValidated = ledgers_->getMostRecent();
169 if (not mostRecentValidated) {
170 LOG(log_.info()) <<
"The wait for the next validated ledger has been aborted. "
171 "Exiting monitor loop";
175 ASSERT(rng.has_value(),
"Ledger range can't be null");
176 auto const nextSequence = rng->maxSequence + 1;
178 LOG(log_.
debug()) <<
"Database is populated. Starting monitor loop. sequence = " << nextSequence;
195 LOG(log_.info()) <<
"Stop called";
203 return {{
"ok",
true}};
220 std::optional<etl::ETLState>
236 std::optional<data::LedgerRange>
237 loadInitialLedgerIfNeeded()
239 if (
auto rng = backend_->hardFetchLedgerRangeNoThrow(); not rng.has_value()) {
240 LOG(log_.info()) <<
"Database is empty. Will download a ledger from the network.";
243 LOG(log_.info()) <<
"Waiting for next ledger to be validated by network...";
244 if (
auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) {
245 auto const seq = *mostRecentValidated;
246 LOG(log_.info()) <<
"Ledger " << seq <<
" has been validated. Downloading... ";
249 return extractor_->extractLedgerOnly(seq).and_then([
this, seq](
auto&&
data) {
251 data.edgeKeys = balancer_->loadInitialLedger(seq, *loader_);
254 return loader_->loadInitialLedger(
data);
258 LOG(log_.debug()) <<
"Time to download and store ledger = " << timeDiff;
259 LOG(log_.info()) <<
"Finished loadInitialLedger. cache size = " << backend_->cache().size();
261 if (ledger.has_value())
262 return backend_->hardFetchLedgerRangeNoThrow();
264 LOG(log_.error()) <<
"Failed to load initial ledger. Exiting monitor loop";
266 LOG(log_.info()) <<
"The wait for the next validated ledger has been aborted. "
267 "Exiting monitor loop";
269 }
catch (std::runtime_error
const& e) {
270 LOG(log_.fatal()) <<
"Failed to load initial ledger: " << e.what();
271 amendmentBlockHandler_->notifyAmendmentBlocked();
274 LOG(log_.info()) <<
"Database already populated. Picking up from the tip of history";
275 cacheLoader_->load(rng->maxSequence);
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:93
void stop() override
Stop the ETL service.
Definition ETLService.hpp:193
void run() override
Start all components to run ETL service.
Definition ETLService.hpp:159
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:214
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:228
std::optional< etl::ETLState > getETLState() const override
Get the etl nodes' state.
Definition ETLService.hpp:221
ETLService(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< etl::NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.hpp:123
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.hpp:200
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.hpp:207
Definition Scheduling.hpp:43
Definition TaskManager.hpp:40
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:299
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:37
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:36