Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ETLService.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/Types.hpp"
24#include "etl/CacheLoaderInterface.hpp"
25#include "etl/CacheUpdaterInterface.hpp"
26#include "etl/ETLServiceInterface.hpp"
27#include "etl/ETLState.hpp"
28#include "etl/ExtractorInterface.hpp"
29#include "etl/InitialLoadObserverInterface.hpp"
30#include "etl/LedgerPublisherInterface.hpp"
32#include "etl/LoaderInterface.hpp"
33#include "etl/MonitorInterface.hpp"
34#include "etl/MonitorProviderInterface.hpp"
36#include "etl/SystemState.hpp"
37#include "etl/TaskManagerInterface.hpp"
38#include "etl/TaskManagerProviderInterface.hpp"
39#include "etl/impl/AmendmentBlockHandler.hpp"
40#include "etl/impl/CacheUpdater.hpp"
41#include "etl/impl/Extraction.hpp"
42#include "etl/impl/LedgerFetcher.hpp"
43#include "etl/impl/LedgerPublisher.hpp"
44#include "etl/impl/Loading.hpp"
45#include "etl/impl/Registry.hpp"
46#include "etl/impl/Scheduling.hpp"
47#include "etl/impl/TaskManager.hpp"
48#include "etl/impl/ext/Cache.hpp"
49#include "etl/impl/ext/Core.hpp"
50#include "etl/impl/ext/NFT.hpp"
51#include "etl/impl/ext/Successor.hpp"
52#include "feed/SubscriptionManagerInterface.hpp"
53#include "util/async/AnyExecutionContext.hpp"
54#include "util/async/AnyOperation.hpp"
55#include "util/async/AnyStrand.hpp"
56#include "util/config/ConfigDefinition.hpp"
57#include "util/log/Logger.hpp"
58
59#include <boost/asio/io_context.hpp>
60#include <boost/json/object.hpp>
61#include <boost/signals2/connection.hpp>
62#include <fmt/format.h>
63#include <xrpl/basics/Blob.h>
64#include <xrpl/basics/base_uint.h>
65#include <xrpl/basics/strHex.h>
66#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
67#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
68#include <xrpl/protocol/LedgerHeader.h>
69#include <xrpl/protocol/STTx.h>
70#include <xrpl/protocol/TxFormats.h>
71#include <xrpl/protocol/TxMeta.h>
72
73#include <atomic>
74#include <cstddef>
75#include <cstdint>
76#include <functional>
77#include <memory>
78#include <optional>
79
80namespace etl {
81
98 util::Logger log_{"ETL"};
99
101 std::reference_wrapper<util::config::ClioConfigDefinition const> config_;
102 std::shared_ptr<BackendInterface> backend_;
103 std::shared_ptr<LoadBalancerInterface> balancer_;
104 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers_;
105 std::shared_ptr<LedgerPublisherInterface> publisher_;
106 std::shared_ptr<CacheLoaderInterface> cacheLoader_;
107 std::shared_ptr<CacheUpdaterInterface> cacheUpdater_;
108 std::shared_ptr<ExtractorInterface> extractor_;
109 std::shared_ptr<LoaderInterface> loader_;
110 std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver_;
111 std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider_;
112 std::shared_ptr<MonitorProviderInterface> monitorProvider_;
113 std::shared_ptr<SystemState> state_;
114
115 std::optional<uint32_t> startSequence_;
116 std::optional<uint32_t> finishSequence_;
117
118 std::unique_ptr<MonitorInterface> monitor_;
119 std::unique_ptr<TaskManagerInterface> taskMan_;
120
121 boost::signals2::scoped_connection monitorNewSeqSubscription_;
122 boost::signals2::scoped_connection monitorDbStalledSubscription_;
123 boost::signals2::scoped_connection systemStateWriteCommandSubscription_;
124 util::async::AnyStrand writeCommandStrand_;
125 std::atomic<size_t> runningWriteCommandHandlers_{0};
126
127 std::optional<util::async::AnyOperation<void>> mainLoop_;
128
129public:
144 static std::shared_ptr<ETLServiceInterface>
147 std::shared_ptr<SystemState> state,
149 std::shared_ptr<BackendInterface> backend,
150 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
151 std::shared_ptr<LoadBalancerInterface> balancer,
152 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
153 );
154
175 std::reference_wrapper<util::config::ClioConfigDefinition const> config,
176 std::shared_ptr<data::BackendInterface> backend,
177 std::shared_ptr<LoadBalancerInterface> balancer,
178 std::shared_ptr<NetworkValidatedLedgersInterface> ledgers,
179 std::shared_ptr<LedgerPublisherInterface> publisher,
180 std::shared_ptr<CacheLoaderInterface> cacheLoader,
181 std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
182 std::shared_ptr<ExtractorInterface> extractor,
183 std::shared_ptr<LoaderInterface> loader,
184 std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
185 std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider,
186 std::shared_ptr<MonitorProviderInterface> monitorProvider,
187 std::shared_ptr<SystemState> state
188 );
189
190 ~ETLService() override;
191
192 void
193 run() override;
194
195 void
196 stop() override;
197
198 boost::json::object
199 getInfo() const override;
200
201 bool
202 isAmendmentBlocked() const override;
203
204 bool
205 isCorruptionDetected() const override;
206
207 std::optional<ETLState>
208 getETLState() const override;
209
210 std::uint32_t
211 lastCloseAgeSeconds() const override;
212
213private:
214 std::optional<data::LedgerRange>
215 loadInitialLedgerIfNeeded();
216
217 [[nodiscard]] uint32_t
218 syncCacheWithDb();
219
220 void
221 updateCache(uint32_t seq);
222
223 void
224 startMonitor(uint32_t seq);
225
226 void
227 startLoading(uint32_t seq);
228
229 void
230 attemptTakeoverWriter();
231
232 void
233 giveUpWriter();
234};
235
236} // namespace etl
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.cpp:267
void run() override
Start all components to run ETL service.
Definition ETLService.cpp:197
ETLService(util::async::AnyExecutionContext ctx, std::reference_wrapper< util::config::ClioConfigDefinition const > config, std::shared_ptr< data::BackendInterface > backend, std::shared_ptr< LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers, std::shared_ptr< LedgerPublisherInterface > publisher, std::shared_ptr< CacheLoaderInterface > cacheLoader, std::shared_ptr< CacheUpdaterInterface > cacheUpdater, std::shared_ptr< ExtractorInterface > extractor, std::shared_ptr< LoaderInterface > loader, std::shared_ptr< InitialLoadObserverInterface > initialLoadObserver, std::shared_ptr< TaskManagerProviderInterface > taskManagerProvider, std::shared_ptr< MonitorProviderInterface > monitorProvider, std::shared_ptr< SystemState > state)
Create an instance of ETLService.
Definition ETLService.cpp:144
static std::shared_ptr< ETLServiceInterface > makeETLService(util::config::ClioConfigDefinition const &config, std::shared_ptr< SystemState > state, util::async::AnyExecutionContext ctx, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< LoadBalancerInterface > balancer, std::shared_ptr< NetworkValidatedLedgersInterface > ledgers)
A factory function to spawn new ETLService instances.
Definition ETLService.cpp:79
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.cpp:253
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.cpp:273
void stop() override
Stop the ETL service.
Definition ETLService.cpp:233
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.cpp:285
std::optional< ETLState > getETLState() const override
Get the etl nodes' state.
Definition ETLService.cpp:279
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
A type-erased execution context.
Definition AnyStrand.hpp:40
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:36