22#include "data/BackendInterface.hpp"
23#include "etl/CacheLoader.hpp"
24#include "etl/ETLState.hpp"
26#include "etl/SystemState.hpp"
27#include "etl/impl/AmendmentBlockHandler.hpp"
28#include "etl/impl/ExtractionDataPipe.hpp"
29#include "etl/impl/Extractor.hpp"
30#include "etl/impl/LedgerFetcher.hpp"
31#include "etl/impl/LedgerLoader.hpp"
32#include "etl/impl/LedgerPublisher.hpp"
33#include "etl/impl/Transformer.hpp"
34#include "etlng/ETLServiceInterface.hpp"
36#include "etlng/impl/LedgerPublisher.hpp"
37#include "etlng/impl/TaskManagerProvider.hpp"
38#include "feed/SubscriptionManagerInterface.hpp"
39#include "util/async/AnyExecutionContext.hpp"
40#include "util/log/Logger.hpp"
42#include <boost/asio/io_context.hpp>
43#include <boost/json/object.hpp>
44#include <grpcpp/grpcpp.h>
45#include <org/xrpl/rpc/v1/get_ledger.pb.h>
46#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
101 std::shared_ptr<BackendInterface> backend_;
102 std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
103 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
105 std::uint32_t extractorThreads_ = 1;
116 size_t numMarkers_ = 2;
117 std::optional<uint32_t> startSequence_;
118 std::optional<uint32_t> finishSequence_;
133 boost::asio::io_context& ioc,
134 std::shared_ptr<BackendInterface> backend,
135 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
136 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
137 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
159 static std::shared_ptr<etlng::ETLServiceInterface>
162 boost::asio::io_context& ioc,
164 std::shared_ptr<BackendInterface> backend,
165 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
166 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
167 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
186 LOG(log_.
info()) <<
"Stop called";
191 if (worker_.joinable())
194 LOG(log_.
debug()) <<
"Joined ETLService worker thread";
238 boost::json::object result;
240 result[
"etl_sources"] = loadBalancer_->toJson();
241 result[
"is_writer"] =
static_cast<int>(state_.
isWriting);
242 result[
"read_only"] =
static_cast<int>(state_.
isReadOnly);
244 if (last.time_since_epoch().count() != 0)
253 std::optional<etl::ETLState>
256 return loadBalancer_->getETLState();
276 std::optional<uint32_t>
277 runETLPipeline(uint32_t startSequence, uint32_t numExtractors);
298 publishNextSequence(uint32_t nextSequence);
326 getNumMarkers()
const
Cache loading interface.
Definition CacheLoader.hpp:51
void stop() noexcept override
Requests the loader to stop asap.
Definition CacheLoader.hpp:137
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:87
ETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.cpp:355
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.hpp:214
void run() override
Start all components to run ETL service.
Definition ETLService.cpp:333
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.hpp:236
std::optional< etl::ETLState > getETLState() const noexcept override
Get the etl nodes' state.
Definition ETLService.hpp:254
ETLService(ETLService &&)=delete
Move constructor is deleted because ETL service shares its fields by reference.
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:225
~ETLService() override
Stops components and joins worker thread.
Definition ETLService.hpp:173
void stop() override
Stop the ETL service.
Definition ETLService.hpp:184
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:203
static std::shared_ptr< etlng::ETLServiceInterface > makeETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.cpp:70
Definition AmendmentBlockHandler.hpp:34
GRPC Ledger data fetcher.
Definition LedgerFetcher.hpp:39
Loads ledger data into the DB.
Definition LedgerLoader.hpp:70
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:71
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:250
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:260
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:240
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
Definition ETLService.hpp:72
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:39
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents a link from a tx to an NFT that was targeted/modified/created by it.
Definition DBHelpers.hpp:73
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
A tag class to help identify ETLService in templated code.
Definition ETLService.hpp:67
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
util::prometheus::Bool isAmendmentBlocked
Whether clio detected an amendment block.
Definition SystemState.hpp:63
util::prometheus::Bool isReadOnly
Whether the process is in strict read-only mode.
Definition SystemState.hpp:40
std::atomic_bool isStopping
Whether the software is stopping.
Definition SystemState.hpp:53
util::prometheus::Bool isCorruptionDetected
Whether clio detected a corruption that needs manual attention.
Definition SystemState.hpp:75
util::prometheus::Bool isWriting
Whether the process is writing to the database.
Definition SystemState.hpp:47
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:36