Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ETLService.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/CacheLoader.hpp"
24#include "etl/ETLState.hpp"
25#include "etl/LoadBalancer.hpp"
27#include "etl/SystemState.hpp"
28#include "etl/impl/AmendmentBlockHandler.hpp"
29#include "etl/impl/ExtractionDataPipe.hpp"
30#include "etl/impl/Extractor.hpp"
31#include "etl/impl/LedgerFetcher.hpp"
32#include "etl/impl/LedgerLoader.hpp"
33#include "etl/impl/LedgerPublisher.hpp"
34#include "etl/impl/Transformer.hpp"
35#include "etlng/ETLService.hpp"
36#include "etlng/ETLServiceInterface.hpp"
37#include "etlng/LoadBalancer.hpp"
39#include "feed/SubscriptionManagerInterface.hpp"
40#include "util/Assert.hpp"
41#include "util/log/Logger.hpp"
42
43#include <boost/asio/io_context.hpp>
44#include <boost/json/object.hpp>
45#include <grpcpp/grpcpp.h>
46#include <org/xrpl/rpc/v1/get_ledger.pb.h>
47#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
48
49#include <cstddef>
50#include <cstdint>
51#include <memory>
52#include <optional>
53#include <string>
54#include <thread>
55
58struct NFTsData;
59
63namespace etl {
64
69 virtual ~ETLServiceTag() = default;
70};
71
72template <typename T>
73concept SomeETLService = std::derived_from<T, ETLServiceTag>;
74
89 // TODO: make these template parameters in ETLService
97 using TransformerType =
99
100 util::Logger log_{"ETL"};
101
102 std::shared_ptr<BackendInterface> backend_;
103 std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
104 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
105
106 std::uint32_t extractorThreads_ = 1;
107 std::thread worker_;
108
109 CacheLoaderType cacheLoader_;
110 LedgerFetcherType ledgerFetcher_;
111 LedgerLoaderType ledgerLoader_;
112 LedgerPublisherType ledgerPublisher_;
113 AmendmentBlockHandlerType amendmentBlockHandler_;
114
115 SystemState state_;
116
117 size_t numMarkers_ = 2;
118 std::optional<uint32_t> startSequence_;
119 std::optional<uint32_t> finishSequence_;
120
121public:
134 boost::asio::io_context& ioc,
135 std::shared_ptr<BackendInterface> backend,
136 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
137 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
138 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
139 );
140
145
159 static std::shared_ptr<etlng::ETLServiceInterface>
162 boost::asio::io_context& ioc,
163 std::shared_ptr<BackendInterface> backend,
164 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
165 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
166 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
167 )
168 {
169 std::shared_ptr<etlng::ETLServiceInterface> ret;
170
171 if (config.get<bool>("__ng_etl")) {
172 ASSERT(
173 std::dynamic_pointer_cast<etlng::LoadBalancer>(balancer),
174 "LoadBalancer type must be etlng::LoadBalancer"
175 );
176 ret = std::make_shared<etlng::ETLService>(config, backend, subscriptions, balancer, ledgers);
177 } else {
178 ASSERT(
179 std::dynamic_pointer_cast<etl::LoadBalancer>(balancer), "LoadBalancer type must be etl::LoadBalancer"
180 );
181 ret = std::make_shared<etl::ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
182 }
183
184 ret->run();
185 return ret;
186 }
187
191 ~ETLService() override
192 {
193 if (not state_.isStopping)
194 stop();
195 }
196
201 void
202 stop() override
203 {
204 LOG(log_.info()) << "Stop called";
205
206 state_.isStopping = true;
207 cacheLoader_.stop();
208
209 if (worker_.joinable())
210 worker_.join();
211
212 LOG(log_.debug()) << "Joined ETLService worker thread";
213 }
214
220 std::uint32_t
221 lastCloseAgeSeconds() const override
222 {
223 return ledgerPublisher_.lastCloseAgeSeconds();
224 }
225
231 bool
232 isAmendmentBlocked() const override
233 {
234 return state_.isAmendmentBlocked;
235 }
236
242 bool
243 isCorruptionDetected() const override
244 {
245 return state_.isCorruptionDetected;
246 }
247
253 boost::json::object
254 getInfo() const override
255 {
256 boost::json::object result;
257
258 result["etl_sources"] = loadBalancer_->toJson();
259 result["is_writer"] = static_cast<int>(state_.isWriting);
260 result["read_only"] = static_cast<int>(state_.isReadOnly);
261 auto last = ledgerPublisher_.getLastPublish();
262 if (last.time_since_epoch().count() != 0)
263 result["last_publish_age_seconds"] = std::to_string(ledgerPublisher_.lastPublishAgeSeconds());
264 return result;
265 }
266
271 std::optional<etl::ETLState>
272 getETLState() const noexcept override
273 {
274 return loadBalancer_->getETLState();
275 }
276
280 void
281 run() override;
282
283private:
294 std::optional<uint32_t>
295 runETLPipeline(uint32_t startSequence, uint32_t numExtractors);
296
306 void
307 monitor();
308
315 uint32_t
316 publishNextSequence(uint32_t nextSequence);
317
324 void
325 monitorReadOnly();
326
330 bool
331 isStopping() const
332 {
333 return state_.isStopping;
334 }
335
343 std::uint32_t
344 getNumMarkers() const
345 {
346 return numMarkers_;
347 }
348
352 void
353 doWork();
354};
355} // namespace etl
Cache loading interface.
Definition CacheLoader.hpp:49
void stop() noexcept
Requests the loader to stop asap.
Definition CacheLoader.hpp:132
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:88
ETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.cpp:265
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.hpp:232
void run() override
Start all components to run ETL service.
Definition ETLService.cpp:243
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.hpp:254
std::optional< etl::ETLState > getETLState() const noexcept override
Get the etl nodes' state.
Definition ETLService.hpp:272
ETLService(ETLService &&)=delete
Move constructor is deleted because ETL service shares its fields by reference.
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:243
~ETLService() override
Stops components and joins worker thread.
Definition ETLService.hpp:191
void stop() override
Stop the ETL service.
Definition ETLService.hpp:202
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:221
static std::shared_ptr< etlng::ETLServiceInterface > makeETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.hpp:160
Definition AmendmentBlockHandler.hpp:34
A collection of thread safe async queues used by Extractor and Transformer to communicate.
Definition ExtractionDataPipe.hpp:37
Extractor thread that is fetching GRPC data and enqueue it on the DataPipeType.
Definition Extractor.hpp:44
GRPC Ledger data fetcher.
Definition LedgerFetcher.hpp:39
Loads ledger data into the DB.
Definition LedgerLoader.hpp:70
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:69
std::uint32_t lastPublishAgeSeconds() const
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:238
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const
Get last publish time as a time point.
Definition LedgerPublisher.hpp:248
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:258
Transformer thread that prepares new ledger out of raw data from GRPC.
Definition Transformer.hpp:70
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
Definition ETLService.hpp:73
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:37
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents a link from a tx to an NFT that was targeted/modified/created by it.
Definition DBHelpers.hpp:73
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
A tag class to help identify ETLService in templated code.
Definition ETLService.hpp:68
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
util::prometheus::Bool isAmendmentBlocked
Whether clio detected an amendment block.
Definition SystemState.hpp:63
util::prometheus::Bool isReadOnly
Whether the process is in strict read-only mode.
Definition SystemState.hpp:40
std::atomic_bool isStopping
Whether the software is stopping.
Definition SystemState.hpp:53
util::prometheus::Bool isCorruptionDetected
Whether clio detected a corruption that needs manual attention.
Definition SystemState.hpp:75
util::prometheus::Bool isWriting
Whether the process is writing to the database.
Definition SystemState.hpp:47
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:36