Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ETLService.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2025, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/LedgerCache.hpp"
24#include "data/Types.hpp"
25#include "etl/CacheLoader.hpp"
26#include "etl/ETLState.hpp"
29#include "etl/SystemState.hpp"
30#include "etl/impl/AmendmentBlockHandler.hpp"
31#include "etl/impl/LedgerFetcher.hpp"
32#include "etl/impl/LedgerPublisher.hpp"
33#include "etlng/AmendmentBlockHandlerInterface.hpp"
34#include "etlng/ETLServiceInterface.hpp"
35#include "etlng/ExtractorInterface.hpp"
37#include "etlng/impl/AmendmentBlockHandler.hpp"
38#include "etlng/impl/Extraction.hpp"
39#include "etlng/impl/Loading.hpp"
40#include "etlng/impl/Monitor.hpp"
41#include "etlng/impl/Registry.hpp"
42#include "etlng/impl/Scheduling.hpp"
43#include "etlng/impl/TaskManager.hpp"
44#include "etlng/impl/ext/Cache.hpp"
45#include "etlng/impl/ext/Core.hpp"
46#include "etlng/impl/ext/NFT.hpp"
47#include "etlng/impl/ext/Successor.hpp"
48#include "feed/SubscriptionManagerInterface.hpp"
49#include "util/Assert.hpp"
50#include "util/Profiler.hpp"
51#include "util/async/context/BasicExecutionContext.hpp"
52#include "util/config/Config.hpp"
53#include "util/log/Logger.hpp"
54#include "util/newconfig/ConfigDefinition.hpp"
55
56#include <boost/json/object.hpp>
57#include <fmt/core.h>
58#include <xrpl/basics/Blob.h>
59#include <xrpl/basics/base_uint.h>
60#include <xrpl/basics/strHex.h>
61#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
62#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
63#include <xrpl/protocol/LedgerHeader.h>
64#include <xrpl/protocol/STTx.h>
65#include <xrpl/protocol/TxFormats.h>
66#include <xrpl/protocol/TxMeta.h>
67
68#include <chrono>
69#include <cstdint>
70#include <memory>
71#include <optional>
72#include <ranges>
73#include <stdexcept>
74#include <string>
75#include <tuple>
76#include <utility>
77
78namespace etlng {
79
94 util::Logger log_{"ETL"};
95
96 std::shared_ptr<BackendInterface> backend_;
97 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
98 std::shared_ptr<etlng::LoadBalancerInterface> balancer_;
99 std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers_;
100 std::shared_ptr<etl::CacheLoader<>> cacheLoader_;
101
102 std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
103 std::shared_ptr<ExtractorInterface> extractor_;
104
105 etl::SystemState state_;
107
108 std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
109 std::shared_ptr<impl::Loader> loader_;
110
111 std::optional<util::async::CoroExecutionContext::Operation<void>> mainLoop_;
112
113public:
125 std::shared_ptr<BackendInterface> backend,
126 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
127 std::shared_ptr<etlng::LoadBalancerInterface> balancer,
128 std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers
129 )
130 : backend_(std::move(backend))
131 , subscriptions_(std::move(subscriptions))
132 , balancer_(std::move(balancer))
133 , ledgers_(std::move(ledgers))
134 , cacheLoader_(std::make_shared<etl::CacheLoader<>>(config, backend_, backend_->cache()))
135 , fetcher_(std::make_shared<etl::impl::LedgerFetcher>(backend_, balancer_))
136 , extractor_(std::make_shared<impl::Extractor>(fetcher_))
137 , amendmentBlockHandler_(std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx_, state_))
138 , loader_(std::make_shared<impl::Loader>(
139 backend_,
140 fetcher_,
141 impl::makeRegistry(
142 impl::CacheExt{backend_->cache()},
143 impl::CoreExt{backend_},
144 impl::SuccessorExt{backend_, backend_->cache()},
145 impl::NFTExt{backend_}
146 ),
147 amendmentBlockHandler_
148 ))
149 {
150 LOG(log_.info()) << "Creating ETLng...";
151 }
152
153 ~ETLService() override
154 {
155 LOG(log_.debug()) << "Stopping ETLng";
156 }
157
158 void
159 run() override
160 {
161 LOG(log_.info()) << "run() in ETLng...";
162
163 mainLoop_.emplace(ctx_.execute([this] {
164 auto const rng = loadInitialLedgerIfNeeded();
165
166 LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
167 std::optional<uint32_t> const mostRecentValidated = ledgers_->getMostRecent();
168
169 if (not mostRecentValidated) {
170 LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
171 "Exiting monitor loop";
172 return;
173 }
174
175 ASSERT(rng.has_value(), "Ledger range can't be null");
176 auto const nextSequence = rng->maxSequence + 1;
177
178 LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
179
180 auto scheduler = impl::makeScheduler(impl::ForwardScheduler{*ledgers_, nextSequence}
181 // impl::BackfillScheduler{nextSequence - 1, nextSequence - 1000},
182 // TODO lift limit and start with rng.minSeq
183 );
184
185 auto man = impl::TaskManager(ctx_, *scheduler, *extractor_, *loader_);
186
187 // TODO: figure out this: std::make_shared<impl::Monitor>(backend_, ledgers_, nextSequence)
188 man.run({}); // TODO: needs to be interruptable and fill out settings
189 }));
190 }
191
192 void
193 stop() override
194 {
195 LOG(log_.info()) << "Stop called";
196 // TODO: stop the service correctly
197 }
198
199 boost::json::object
200 getInfo() const override
201 {
202 // TODO
203 return {{"ok", true}};
204 }
205
206 bool
207 isAmendmentBlocked() const override
208 {
209 // TODO
210 return false;
211 }
212
213 bool
214 isCorruptionDetected() const override
215 {
216 // TODO
217 return false;
218 }
219
220 std::optional<etl::ETLState>
221 getETLState() const override
222 {
223 // TODO
224 return std::nullopt;
225 }
226
227 std::uint32_t
228 lastCloseAgeSeconds() const override
229 {
230 // TODO
231 return 0;
232 }
233
234private:
235 // TODO: this better be std::expected
236 std::optional<data::LedgerRange>
237 loadInitialLedgerIfNeeded()
238 {
239 if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); not rng.has_value()) {
240 LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
241
242 try {
243 LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
244 if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) {
245 auto const seq = *mostRecentValidated;
246 LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... ";
247
248 auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
249 return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) {
250 // TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar
251 data.edgeKeys = balancer_->loadInitialLedger(seq, *loader_);
252
253 // TODO: this should be interruptable for graceful shutdown
254 return loader_->loadInitialLedger(data);
255 });
256 });
257
258 LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
259 LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
260
261 if (ledger.has_value())
262 return backend_->hardFetchLedgerRangeNoThrow();
263
264 LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
265 } else {
266 LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
267 "Exiting monitor loop";
268 }
269 } catch (std::runtime_error const& e) {
270 LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
271 amendmentBlockHandler_->notifyAmendmentBlocked();
272 }
273 } else {
274 LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
275 cacheLoader_->load(rng->maxSequence);
276
277 return rng;
278 }
279
280 return std::nullopt;
281 }
282};
283} // namespace etlng
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition ETLService.hpp:93
void stop() override
Stop the ETL service.
Definition ETLService.hpp:193
void run() override
Start all components to run ETL service.
Definition ETLService.hpp:159
bool isCorruptionDetected() const override
Check whether Clio detected DB corruptions.
Definition ETLService.hpp:214
std::uint32_t lastCloseAgeSeconds() const override
Get time passed since last ledger close, in seconds.
Definition ETLService.hpp:228
std::optional< etl::ETLState > getETLState() const override
Get the etl nodes' state.
Definition ETLService.hpp:221
ETLService(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< etlng::LoadBalancerInterface > balancer, std::shared_ptr< etl::NetworkValidatedLedgersInterface > ledgers)
Create an instance of ETLService.
Definition ETLService.hpp:123
boost::json::object getInfo() const override
Get state of ETL as a JSON object.
Definition ETLService.hpp:200
bool isAmendmentBlocked() const override
Check for the amendment blocked state.
Definition ETLService.hpp:207
Definition Scheduling.hpp:43
Definition TaskManager.hpp:40
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
auto execute(SomeHandlerWith< StopToken > auto &&fn, std::optional< std::chrono::milliseconds > timeout=std::nullopt) noexcept(kIS_NOEXCEPT)
Schedule an operation on the execution context.
Definition BasicExecutionContext.hpp:299
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:37
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33
This is a base class for any ETL service implementations.
Definition ETLServiceInterface.hpp:36