Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ETLService.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "data/LedgerCacheLoadingState.hpp"
5#include "data/Types.hpp"
6#include "etl/CacheLoaderInterface.hpp"
7#include "etl/CacheUpdaterInterface.hpp"
8#include "etl/ETLServiceInterface.hpp"
9#include "etl/ETLState.hpp"
10#include "etl/ExtractorInterface.hpp"
11#include "etl/InitialLoadObserverInterface.hpp"
12#include "etl/LedgerPublisherInterface.hpp"
14#include "etl/LoaderInterface.hpp"
15#include "etl/MonitorInterface.hpp"
16#include "etl/MonitorProviderInterface.hpp"
18#include "etl/SystemState.hpp"
19#include "etl/TaskManagerInterface.hpp"
20#include "etl/TaskManagerProviderInterface.hpp"
21#include "etl/impl/AmendmentBlockHandler.hpp"
22#include "etl/impl/CacheUpdater.hpp"
23#include "etl/impl/Extraction.hpp"
24#include "etl/impl/LedgerFetcher.hpp"
25#include "etl/impl/LedgerPublisher.hpp"
26#include "etl/impl/Loading.hpp"
27#include "etl/impl/Registry.hpp"
28#include "etl/impl/Scheduling.hpp"
29#include "etl/impl/TaskManager.hpp"
30#include "etl/impl/ext/Cache.hpp"
31#include "etl/impl/ext/Core.hpp"
32#include "etl/impl/ext/NFT.hpp"
33#include "etl/impl/ext/Successor.hpp"
34#include "feed/SubscriptionManagerInterface.hpp"
35#include "util/async/AnyExecutionContext.hpp"
36#include "util/async/AnyOperation.hpp"
37#include "util/async/AnyStrand.hpp"
38#include "util/config/ConfigDefinition.hpp"
39#include "util/log/Logger.hpp"
40
41#include <boost/asio/io_context.hpp>
42#include <boost/json/object.hpp>
43#include <boost/signals2/connection.hpp>
44#include <fmt/format.h>
45#include <xrpl/basics/Blob.h>
46#include <xrpl/basics/base_uint.h>
47#include <xrpl/basics/strHex.h>
48#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
49#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
50#include <xrpl/protocol/LedgerHeader.h>
51#include <xrpl/protocol/STTx.h>
52#include <xrpl/protocol/TxFormats.h>
53#include <xrpl/protocol/TxMeta.h>
54
55#include <atomic>
56#include <cstddef>
57#include <cstdint>
58#include <functional>
59#include <memory>
60#include <optional>
61
62namespace etl {
63
80 util::Logger log_{"ETL"};
81
83 std::reference_wrapper<util::config::ClioConfigDefinition const> config_;
84 std::shared_ptr<BackendInterface> backend_;
85 std::shared_ptr<LoadBalancerInterface> balancer_;
86 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers_;
87 std::shared_ptr<LedgerPublisherInterface> publisher_;
88 std::shared_ptr<CacheLoaderInterface> cacheLoader_;
89 std::shared_ptr<CacheUpdaterInterface> cacheUpdater_;
90 std::shared_ptr<ExtractorInterface> extractor_;
91 std::shared_ptr<LoaderInterface> loader_;
92 std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver_;
93 std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider_;
94 std::shared_ptr<MonitorProviderInterface> monitorProvider_;
95 std::shared_ptr<SystemState> state_;
96
97 std::optional<uint32_t> startSequence_;
98 std::optional<uint32_t> finishSequence_;
99
100 std::unique_ptr<MonitorInterface> monitor_;
101 std::unique_ptr<TaskManagerInterface> taskMan_;
102
103 boost::signals2::scoped_connection monitorNewSeqSubscription_;
104 boost::signals2::scoped_connection monitorDbStalledSubscription_;
105 boost::signals2::scoped_connection systemStateWriteCommandSubscription_;
106 util::async::AnyStrand writeCommandStrand_;
107 std::atomic<size_t> runningWriteCommandHandlers_{0};
108
109 std::optional<util::async::AnyOperation<void>> mainLoop_;
110
111public:
127 static std::shared_ptr<ETLServiceInterface>
130 std::shared_ptr<SystemState> state,
131 std::unique_ptr<data::LedgerCacheLoadingStateInterface const> cacheLoadingState,
133 std::shared_ptr<BackendInterface> backend,
134 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
135 std::shared_ptr<LoadBalancerInterface> balancer,
136 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
137 );
138
159 std::reference_wrapper<util::config::ClioConfigDefinition const> config,
160 std::shared_ptr<data::BackendInterface> backend,
161 std::shared_ptr<LoadBalancerInterface> balancer,
162 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers,
163 std::shared_ptr<LedgerPublisherInterface> publisher,
164 std::shared_ptr<CacheLoaderInterface> cacheLoader,
165 std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
166 std::shared_ptr<ExtractorInterface> extractor,
167 std::shared_ptr<LoaderInterface> loader,
168 std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
169 std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider,
170 std::shared_ptr<MonitorProviderInterface> monitorProvider,
171 std::shared_ptr<SystemState> state
172 );
173
174 ~ETLService() override;
175
176 void
177 run() override;
178
179 void
180 stop() override;
181
182 boost::json::object
183 getInfo() const override;
184
185 bool
186 isAmendmentBlocked() const override;
187
188 bool
189 isCorruptionDetected() const override;
190
191 std::optional<ETLState>
192 getETLState() const override;
193
194 std::uint32_t
195 lastCloseAgeSeconds() const override;
196
197private:
198 std::optional<data::LedgerRange>
199 loadInitialLedgerIfNeeded();
200
201 [[nodiscard]] uint32_t
202 syncCacheWithDb();
203
204 void
205 updateCache(uint32_t seq);
206
207 void
208 startMonitor(uint32_t seq);
209
210 void
211 startLoading(uint32_t seq);
212
213 void
214 attemptTakeoverWriter();
215
216 void
217 giveUpWriter();
218};
219
220} // namespace etl
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.cpp:252
void run() override
Start all components to run ETL service.
Definition ETLService.cpp:182
static std::shared_ptr< ETLServiceInterface > makeETLService(util::config::ClioConfigDefinition const &config, std::shared_ptr< SystemState > state, std::unique_ptr< data::LedgerCacheLoadingStateInterface const > cacheLoadingState, util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.cpp:61
ETLService(util::async::AnyExecutionContext ctx, std::reference_wrapper< util::config::ClioConfigDefinition const > config, std::shared_ptr< data::BackendInterface > backend, std::shared_ptr< LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers, std::shared_ptr< LedgerPublisherInterface > publisher, std::shared_ptr< CacheLoaderInterface > cacheLoader, std::shared_ptr< CacheUpdaterInterface > cacheUpdater, std::shared_ptr< ExtractorInterface > extractor, std::shared_ptr< LoaderInterface > loader, std::shared_ptr< InitialLoadObserverInterface > initialLoadObserver, std::shared_ptr< TaskManagerProviderInterface > taskManagerProvider, std::shared_ptr< MonitorProviderInterface > monitorProvider, std::shared_ptr< SystemState > state)
Create an instance of ETLService.
Definition ETLService.cpp:129
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.cpp:238
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.cpp:258
void stop() override
Stop the ETL service.
Definition ETLService.cpp:218
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.cpp:270
std::optional< ETLState > getETLState() const override
Get the etl nodes' state.
Definition ETLService.cpp:264
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
A type-erased execution context.
Definition AnyStrand.hpp:21
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:17