22#include "data/BackendInterface.hpp"
23#include "data/LedgerCache.hpp"
24#include "etl/CacheLoader.hpp"
25#include "etl/ETLState.hpp"
26#include "etl/LoadBalancer.hpp"
28#include "etl/SystemState.hpp"
29#include "etl/impl/AmendmentBlockHandler.hpp"
30#include "etl/impl/ExtractionDataPipe.hpp"
31#include "etl/impl/Extractor.hpp"
32#include "etl/impl/LedgerFetcher.hpp"
33#include "etl/impl/LedgerLoader.hpp"
34#include "etl/impl/LedgerPublisher.hpp"
35#include "etl/impl/Transformer.hpp"
36#include "feed/SubscriptionManagerInterface.hpp"
37#include "util/log/Logger.hpp"
39#include <boost/asio/io_context.hpp>
40#include <boost/json/object.hpp>
41#include <grpcpp/grpcpp.h>
42#include <org/xrpl/rpc/v1/get_ledger.pb.h>
43#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
101 std::shared_ptr<BackendInterface> backend_;
102 std::shared_ptr<LoadBalancerType> loadBalancer_;
103 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
105 std::uint32_t extractorThreads_ = 1;
116 size_t numMarkers_ = 2;
117 std::optional<uint32_t> startSequence_;
118 std::optional<uint32_t> finishSequence_;
119 size_t txnThreshold_ = 0;
134 boost::asio::io_context& ioc,
135 std::shared_ptr<BackendInterface> backend,
136 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
137 std::shared_ptr<LoadBalancerType> balancer,
138 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
154 static std::shared_ptr<ETLService>
157 boost::asio::io_context& ioc,
158 std::shared_ptr<BackendInterface> backend,
159 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
160 std::shared_ptr<LoadBalancerType> balancer,
161 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
164 auto etl = std::make_shared<ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
186 LOG(log_.
info()) <<
"Stop called";
191 if (worker_.joinable())
194 LOG(log_.
debug()) <<
"Joined ETLService worker thread";
238 boost::json::object result;
240 result[
"etl_sources"] = loadBalancer_->toJson();
241 result[
"is_writer"] =
static_cast<int>(state_.
isWriting);
242 result[
"read_only"] =
static_cast<int>(state_.
isReadOnly);
244 if (last.time_since_epoch().count() != 0)
253 std::optional<etl::ETLState>
256 return loadBalancer_->getETLState();
270 std::optional<uint32_t>
271 runETLPipeline(uint32_t startSequence, uint32_t numExtractors);
292 publishNextSequence(uint32_t nextSequence);
320 getNumMarkers()
const
Cache for an entire ledger.
Definition LedgerCache.hpp:46
Cache loading interface.
Definition CacheLoader.hpp:48
void stop() noexcept
Requests the loader to stop asap.
Definition CacheLoader.hpp:131
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:85
bool isCorruptionDetected() const
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:225
bool isAmendmentBlocked() const
Check for the amendment blocked state.
Definition ETLService.hpp:214
void stop()
Stop the ETL service.
Definition ETLService.hpp:184
ETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< LoadBalancerType > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.cpp:264
static std::shared_ptr< ETLService > makeETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< LoadBalancerType > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.hpp:155
~ETLService() override
Stops components and joins worker thread.
Definition ETLService.hpp:173
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:203
std::optional< etl::ETLState > getETLState() const noexcept
Get the etl nodes' state.
Definition ETLService.hpp:254
boost::json::object getInfo() const
Get state of ETL as a JSON object.
Definition ETLService.hpp:236
This class is used to manage connections to transaction processing processes.
Definition LoadBalancer.hpp:72
Definition AmendmentBlockHandler.hpp:34
GRPC Ledger data fetcher.
Definition LedgerFetcher.hpp:38
Loads ledger data into the DB.
Definition LedgerLoader.hpp:69
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:68
std::uint32_t lastPublishAgeSeconds() const
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:237
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:257
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const
Get last publish time as a time point.
Definition LedgerPublisher.hpp:247
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
Definition ETLService.hpp:70
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:36
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents a link from a tx to an NFT that was targeted/modified/created by it.
Definition DBHelpers.hpp:73
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
A tag class to help identify ETLService in templated code.
Definition ETLService.hpp:65
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
util::prometheus::Bool isAmendmentBlocked
Whether clio detected an amendment block.
Definition SystemState.hpp:63
util::prometheus::Bool isReadOnly
Whether the process is in strict read-only mode.
Definition SystemState.hpp:40
std::atomic_bool isStopping
Whether the software is stopping.
Definition SystemState.hpp:53
util::prometheus::Bool isCorruptionDetected
Whether clio detected a corruption that needs manual attention.
Definition SystemState.hpp:75
util::prometheus::Bool isWriting
Whether the process is writing to the database.
Definition SystemState.hpp:47