Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ETLService.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/LedgerCache.hpp"
24#include "etl/CacheLoader.hpp"
25#include "etl/ETLState.hpp"
26#include "etl/LoadBalancer.hpp"
28#include "etl/SystemState.hpp"
29#include "etl/impl/AmendmentBlockHandler.hpp"
30#include "etl/impl/ExtractionDataPipe.hpp"
31#include "etl/impl/Extractor.hpp"
32#include "etl/impl/LedgerFetcher.hpp"
33#include "etl/impl/LedgerLoader.hpp"
34#include "etl/impl/LedgerPublisher.hpp"
35#include "etl/impl/Transformer.hpp"
36#include "feed/SubscriptionManagerInterface.hpp"
37#include "util/log/Logger.hpp"
38
39#include <boost/asio/io_context.hpp>
40#include <boost/json/object.hpp>
41#include <grpcpp/grpcpp.h>
42#include <org/xrpl/rpc/v1/get_ledger.pb.h>
43#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
44
45#include <concepts>
46#include <cstddef>
47#include <cstdint>
48#include <memory>
49#include <optional>
50#include <string>
51#include <thread>
52
55struct NFTsData;
56
60namespace etl {
61
66 virtual ~ETLServiceTag() = default;
67};
68
69template <typename T>
70concept SomeETLService = std::derived_from<T, ETLServiceTag>;
71
85class ETLService : public ETLServiceTag {
86 // TODO: make these template parameters in ETLService
96 using TransformerType =
98
99 util::Logger log_{"ETL"};
100
101 std::shared_ptr<BackendInterface> backend_;
102 std::shared_ptr<LoadBalancerType> loadBalancer_;
103 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
104
105 std::uint32_t extractorThreads_ = 1;
106 std::thread worker_;
107
108 CacheLoaderType cacheLoader_;
109 LedgerFetcherType ledgerFetcher_;
110 LedgerLoaderType ledgerLoader_;
111 LedgerPublisherType ledgerPublisher_;
112 AmendmentBlockHandlerType amendmentBlockHandler_;
113
114 SystemState state_;
115
116 size_t numMarkers_ = 2;
117 std::optional<uint32_t> startSequence_;
118 std::optional<uint32_t> finishSequence_;
119 size_t txnThreshold_ = 0;
120
121public:
134 boost::asio::io_context& ioc,
135 std::shared_ptr<BackendInterface> backend,
136 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
137 std::shared_ptr<LoadBalancerType> balancer,
138 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
139 );
140
154 static std::shared_ptr<ETLService>
157 boost::asio::io_context& ioc,
158 std::shared_ptr<BackendInterface> backend,
159 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
160 std::shared_ptr<LoadBalancerType> balancer,
161 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
162 )
163 {
164 auto etl = std::make_shared<ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
165 etl->run();
166
167 return etl;
168 }
169
173 ~ETLService() override
174 {
175 if (not state_.isStopping)
176 stop();
177 }
178
183 void
185 {
186 LOG(log_.info()) << "Stop called";
187
188 state_.isStopping = true;
189 cacheLoader_.stop();
190
191 if (worker_.joinable())
192 worker_.join();
193
194 LOG(log_.debug()) << "Joined ETLService worker thread";
195 }
196
202 std::uint32_t
204 {
205 return ledgerPublisher_.lastCloseAgeSeconds();
206 }
207
213 bool
215 {
216 return state_.isAmendmentBlocked;
217 }
218
224 bool
226 {
227 return state_.isCorruptionDetected;
228 }
229
235 boost::json::object
236 getInfo() const
237 {
238 boost::json::object result;
239
240 result["etl_sources"] = loadBalancer_->toJson();
241 result["is_writer"] = static_cast<int>(state_.isWriting);
242 result["read_only"] = static_cast<int>(state_.isReadOnly);
243 auto last = ledgerPublisher_.getLastPublish();
244 if (last.time_since_epoch().count() != 0)
245 result["last_publish_age_seconds"] = std::to_string(ledgerPublisher_.lastPublishAgeSeconds());
246 return result;
247 }
248
253 std::optional<etl::ETLState>
254 getETLState() const noexcept
255 {
256 return loadBalancer_->getETLState();
257 }
258
259private:
270 std::optional<uint32_t>
271 runETLPipeline(uint32_t startSequence, uint32_t numExtractors);
272
282 void
283 monitor();
284
291 uint32_t
292 publishNextSequence(uint32_t nextSequence);
293
300 void
301 monitorReadOnly();
302
306 bool
307 isStopping() const
308 {
309 return state_.isStopping;
310 }
311
319 std::uint32_t
320 getNumMarkers() const
321 {
322 return numMarkers_;
323 }
324
328 void
329 run();
330
334 void
335 doWork();
336};
337} // namespace etl
Cache for an entire ledger.
Definition LedgerCache.hpp:46
Cache loading interface.
Definition CacheLoader.hpp:48
void stop() noexcept
Requests the loader to stop asap.
Definition CacheLoader.hpp:131
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:85
bool isCorruptionDetected() const
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:225
bool isAmendmentBlocked() const
Check for the amendment blocked state.
Definition ETLService.hpp:214
void stop()
Stop the ETL service.
Definition ETLService.hpp:184
ETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< LoadBalancerType > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.cpp:264
static std::shared_ptr< ETLService > makeETLService(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< LoadBalancerType > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.hpp:155
~ETLService() override
Stops components and joins worker thread.
Definition ETLService.hpp:173
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:203
std::optional< etl::ETLState > getETLState() const noexcept
Get the etl nodes' state.
Definition ETLService.hpp:254
boost::json::object getInfo() const
Get state of ETL as a JSON object.
Definition ETLService.hpp:236
This class is used to manage connections to transaction processing processes.
Definition LoadBalancer.hpp:72
Definition AmendmentBlockHandler.hpp:34
A collection of thread safe async queues used by Extractor and Transformer to communicate.
Definition ExtractionDataPipe.hpp:37
Extractor thread that is fetching GRPC data and enqueue it on the DataPipeType.
Definition Extractor.hpp:44
GRPC Ledger data fetcher.
Definition LedgerFetcher.hpp:38
Loads ledger data into the DB.
Definition LedgerLoader.hpp:69
Publishes ledgers in a synchronized fashion.
Definition LedgerPublisher.hpp:68
std::uint32_t lastPublishAgeSeconds() const
Get time passed since last publish, in seconds.
Definition LedgerPublisher.hpp:237
std::uint32_t lastCloseAgeSeconds() const
Get time passed since last ledger close, in seconds.
Definition LedgerPublisher.hpp:257
std::chrono::time_point< std::chrono::system_clock > getLastPublish() const
Get last publish time as a time point.
Definition LedgerPublisher.hpp:247
Transformer thread that prepares new ledger out of raw data from GRPC.
Definition Transformer.hpp:70
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
Definition ETLService.hpp:70
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:36
Struct used to keep track of what to write to account_transactions/account_tx tables.
Definition DBHelpers.hpp:45
Represents a link from a tx to an NFT that was targeted/modified/created by it.
Definition DBHelpers.hpp:73
Represents an NFT state at a particular ledger.
Definition DBHelpers.hpp:103
A tag class to help identify ETLService in templated code.
Definition ETLService.hpp:65
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
util::prometheus::Bool isAmendmentBlocked
Whether clio detected an amendment block.
Definition SystemState.hpp:63
util::prometheus::Bool isReadOnly
Whether the process is in strict read-only mode.
Definition SystemState.hpp:40
std::atomic_bool isStopping
Whether the software is stopping.
Definition SystemState.hpp:53
util::prometheus::Bool isCorruptionDetected
Whether clio detected a corruption that needs manual attention.
Definition SystemState.hpp:75
util::prometheus::Bool isWriting
Whether the process is writing to the database.
Definition SystemState.hpp:47