Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ETLService.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/CacheLoader.hpp"
24#include "etl/ETLState.hpp"
26#include "etl/SystemState.hpp"
27#include "etl/impl/AmendmentBlockHandler.hpp"
28#include "etl/impl/ExtractionDataPipe.hpp"
29#include "etl/impl/Extractor.hpp"
30#include "etl/impl/LedgerFetcher.hpp"
31#include "etl/impl/LedgerLoader.hpp"
32#include "etl/impl/LedgerPublisher.hpp"
33#include "etl/impl/Transformer.hpp"
34#include "etlng/ETLServiceInterface.hpp"
36#include "etlng/impl/LedgerPublisher.hpp"
37#include "etlng/impl/TaskManagerProvider.hpp"
38#include "feed/SubscriptionManagerInterface.hpp"
39#include "util/async/AnyExecutionContext.hpp"
40#include "util/log/Logger.hpp"
41
42#include <boost/asio/io_context.hpp>
43#include <boost/json/object.hpp>
44#include <grpcpp/grpcpp.h>
45#include <org/xrpl/rpc/v1/get_ledger.pb.h>
46#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
47
48#include <cstddef>
49#include <cstdint>
50#include <memory>
51#include <optional>
52#include <string>
53#include <thread>
54
57struct NFTsData;
58
62namespace etl {
63
68 virtual ~ETLServiceTag() = default;
69};
70
71template <typename T>
72concept SomeETLService = std::derived_from<T, ETLServiceTag>;
73
88 // TODO: make these template parameters in ETLService
96 using TransformerType =
98
99 util::Logger log_{"ETL"};
100
101 std::shared_ptr<BackendInterface> backend_;
102 std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
103 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
104
105 std::uint32_t extractorThreads_ = 1;
106 std::thread worker_;
107
108 CacheLoaderType cacheLoader_;
109 LedgerFetcherType ledgerFetcher_;
110 LedgerLoaderType ledgerLoader_;
111 LedgerPublisherType ledgerPublisher_;
112 AmendmentBlockHandlerType amendmentBlockHandler_;
113
114 SystemState state_;
115
116 size_t numMarkers_ = 2;
117 std::optional<uint32_t> startSequence_;
118 std::optional<uint32_t> finishSequence_;
119
120public:
133 boost::asio::io_context& ioc,
134 std::shared_ptr<BackendInterface> backend,
135 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
136 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
137 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
138 );
139
144
159 static std::shared_ptr<etlng::ETLServiceInterface>
162 boost::asio::io_context& ioc,
164 std::shared_ptr<BackendInterface> backend,
165 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
166 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
167 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
168 );
169
173 ~ETLService() override
174 {
175 if (not state_.isStopping)
176 stop();
177 }
178
183 void
184 stop() override
185 {
186 LOG(log_.info()) << "Stop called";
187
188 state_.isStopping = true;
189 cacheLoader_.stop();
190
191 if (worker_.joinable())
192 worker_.join();
193
194 LOG(log_.debug()) << "Joined ETLService worker thread";
195 }
196
202 std::uint32_t
203 lastCloseAgeSeconds() const override
204 {
205 return ledgerPublisher_.lastCloseAgeSeconds();
206 }
207
213 bool
214 isAmendmentBlocked() const override
215 {
216 return state_.isAmendmentBlocked;
217 }
218
224 bool
225 isCorruptionDetected() const override
226 {
227 return state_.isCorruptionDetected;
228 }
229
235 boost::json::object
236 getInfo() const override
237 {
238 boost::json::object result;
239
240 result["etl_sources"] = loadBalancer_->toJson();
241 result["is_writer"] = static_cast<int>(state_.isWriting);
242 result["read_only"] = static_cast<int>(state_.isReadOnly);
243 auto last = ledgerPublisher_.getLastPublish();
244 if (last.time_since_epoch().count() != 0)
245 result["last_publish_age_seconds"] = std::to_string(ledgerPublisher_.lastPublishAgeSeconds());
246 return result;
247 }
248
253 std::optional<etl::ETLState>
254 getETLState() const noexcept override
255 {
256 return loadBalancer_->getETLState();
257 }
258
262 void
263 run() override;
264
265private:
276 std::optional<uint32_t>
277 runETLPipeline(uint32_t startSequence, uint32_t numExtractors);
278
288 void
289 monitor();
290
297 uint32_t
298 publishNextSequence(uint32_t nextSequence);
299
306 void
307 monitorReadOnly();
308
312 bool
313 isStopping() const
314 {
315 return state_.isStopping;
316 }
317
325 std::uint32_t
326 getNumMarkers() const
327 {
328 return numMarkers_;
329 }
330
334 void
335 doWork();
336};
337} // namespace etl
Cache loading interface.
Definition CacheLoader.hpp:51
void stop() noexcept override
Requests the loader to stop asap.
Definition CacheLoader.hpp:137
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:87
ETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.cpp:355
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.hpp:214
void run() override
Start all components to run ETL service.
Definition ETLService.cpp:333
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.hpp:236
std::optional< etl::ETLState > getETLState() const noexcept override
Get the etl nodes' state.
Definition ETLService.hpp:254
ETLService(ETLService &&)=delete
Move constructor is deleted because ETL service shares its fields by reference.
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:225
~ETLService() override
Stops components and joins worker thread.
Definition ETLService.hpp:173
void stop() override
Stop the ETL service.
Definition ETLService.hpp:184
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:203
static std::shared_ptr< etlng::ETLServiceInterface > makeETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.cpp:70
Definition AmendmentBlockHandler.hpp:34
A collection of thread safe async queues used by Extractor and Transformer to communicate.
Definition ExtractionDataPipe.hpp:37
Extractor thread that is fetching GRPC data and enqueue it on the DataPipeType.
Definition Extractor.hpp:44
GRPC Ledger data fetcher.
Definition LedgerFetcher.hpp:39
Loads ledger data into the DB.
Definition LedgerLoader.hpp:70
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:71
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const override
Get last publish time as a time point.
Definition LedgerPublisher.hpp:250
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:260
std::uint32_t lastPublishAgeSeconds() const override
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:240
Transformer thread that prepares new ledger out of raw data from GRPC.
Definition Transformer.hpp:70
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
Definition ETLService.hpp:72
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:39
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents a link from a tx to an NFT that was targeted/modified/created by it.
Definition DBHelpers.hpp:73
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
A tag class to help identify ETLService in templated code.
Definition ETLService.hpp:67
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
util::prometheus::Bool isAmendmentBlocked
Whether clio detected an amendment block.
Definition SystemState.hpp:63
util::prometheus::Bool isReadOnly
Whether the process is in strict read-only mode.
Definition SystemState.hpp:40
std::atomic_bool isStopping
Whether the software is stopping.
Definition SystemState.hpp:53
util::prometheus::Bool isCorruptionDetected
Whether clio detected a corruption that needs manual attention.
Definition SystemState.hpp:75
util::prometheus::Bool isWriting
Whether the process is writing to the database.
Definition SystemState.hpp:47
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:36