22#include "data/BackendInterface.hpp"
23#include "etl/CacheLoader.hpp"
24#include "etl/ETLState.hpp"
25#include "etl/LoadBalancer.hpp"
27#include "etl/SystemState.hpp"
28#include "etl/impl/AmendmentBlockHandler.hpp"
29#include "etl/impl/ExtractionDataPipe.hpp"
30#include "etl/impl/Extractor.hpp"
31#include "etl/impl/LedgerFetcher.hpp"
32#include "etl/impl/LedgerLoader.hpp"
33#include "etl/impl/LedgerPublisher.hpp"
34#include "etl/impl/Transformer.hpp"
35#include "etlng/ETLService.hpp"
36#include "etlng/ETLServiceInterface.hpp"
37#include "etlng/LoadBalancer.hpp"
39#include "feed/SubscriptionManagerInterface.hpp"
40#include "util/Assert.hpp"
41#include "util/log/Logger.hpp"
43#include <boost/asio/io_context.hpp>
44#include <boost/json/object.hpp>
45#include <grpcpp/grpcpp.h>
46#include <org/xrpl/rpc/v1/get_ledger.pb.h>
47#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
102 std::shared_ptr<BackendInterface> backend_;
103 std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
104 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
106 std::uint32_t extractorThreads_ = 1;
117 size_t numMarkers_ = 2;
118 std::optional<uint32_t> startSequence_;
119 std::optional<uint32_t> finishSequence_;
134 boost::asio::io_context& ioc,
135 std::shared_ptr<BackendInterface> backend,
136 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
137 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
138 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
159 static std::shared_ptr<etlng::ETLServiceInterface>
162 boost::asio::io_context& ioc,
163 std::shared_ptr<BackendInterface> backend,
164 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
165 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
166 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
169 std::shared_ptr<etlng::ETLServiceInterface> ret;
171 if (config.
get<
bool>(
"__ng_etl")) {
173 std::dynamic_pointer_cast<etlng::LoadBalancer>(balancer),
174 "LoadBalancer type must be etlng::LoadBalancer"
176 ret = std::make_shared<etlng::ETLService>(config, backend, subscriptions, balancer, ledgers);
179 std::dynamic_pointer_cast<etl::LoadBalancer>(balancer),
"LoadBalancer type must be etl::LoadBalancer"
181 ret = std::make_shared<etl::ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
204 LOG(log_.
info()) <<
"Stop called";
209 if (worker_.joinable())
212 LOG(log_.
debug()) <<
"Joined ETLService worker thread";
256 boost::json::object result;
258 result[
"etl_sources"] = loadBalancer_->toJson();
259 result[
"is_writer"] =
static_cast<int>(state_.
isWriting);
260 result[
"read_only"] =
static_cast<int>(state_.
isReadOnly);
262 if (last.time_since_epoch().count() != 0)
271 std::optional<etl::ETLState>
274 return loadBalancer_->getETLState();
294 std::optional<uint32_t>
295 runETLPipeline(uint32_t startSequence, uint32_t numExtractors);
316 publishNextSequence(uint32_t nextSequence);
344 getNumMarkers()
const
Cache loading interface.
Definition CacheLoader.hpp:49
void stop() noexcept
Requests the loader to stop asap.
Definition CacheLoader.hpp:132
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:88
ETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.cpp:265
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.hpp:232
void run() override
Start all components to run ETL service.
Definition ETLService.cpp:243
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.hpp:254
std::optional< etl::ETLState > getETLState() const noexcept override
Get the etl nodes' state.
Definition ETLService.hpp:272
ETLService(ETLService &&)=delete
Move constructor is deleted because ETL service shares its fields by reference.
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:243
~ETLService() override
Stops components and joins worker thread.
Definition ETLService.hpp:191
void stop() override
Stop the ETL service.
Definition ETLService.hpp:202
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:221
static std::shared_ptr< etlng::ETLServiceInterface > makeETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.hpp:160
Definition AmendmentBlockHandler.hpp:34
GRPC Ledger data fetcher.
Definition LedgerFetcher.hpp:39
Loads ledger data into the DB.
Definition LedgerLoader.hpp:70
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:69
std::uint32_t lastPublishAgeSeconds() const
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:238
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const
Get last publish time as a time point.
Definition LedgerPublisher.hpp:248
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:258
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
Definition ETLService.hpp:73
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:37
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents a link from a tx to an NFT that was targeted/modified/created by it.
Definition DBHelpers.hpp:73
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
A tag class to help identify ETLService in templated code.
Definition ETLService.hpp:68
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
util::prometheus::Bool isAmendmentBlocked
Whether clio detected an amendment block.
Definition SystemState.hpp:63
util::prometheus::Bool isReadOnly
Whether the process is in strict read-only mode.
Definition SystemState.hpp:40
std::atomic_bool isStopping
Whether the software is stopping.
Definition SystemState.hpp:53
util::prometheus::Bool isCorruptionDetected
Whether clio detected a corruption that needs manual attention.
Definition SystemState.hpp:75
util::prometheus::Bool isWriting
Whether the process is writing to the database.
Definition SystemState.hpp:47
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:36