Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Extractor.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
23#include "etl/SystemState.hpp"
24#include "util/Assert.hpp"
25#include "util/Profiler.hpp"
26#include "util/log/Logger.hpp"
27
28#include <xrpl/beast/core/CurrentThreadName.h>
29
30#include <chrono>
31#include <cstdint>
32#include <functional>
33#include <memory>
34#include <optional>
35#include <thread>
36#include <utility>
37
38namespace etl::impl {
39
43template <typename DataPipeType, typename LedgerFetcherType>
44class Extractor {
45 util::Logger log_{"ETL"};
46
47 std::reference_wrapper<DataPipeType> pipe_;
48 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
49 std::reference_wrapper<LedgerFetcherType> ledgerFetcher_;
50 uint32_t startSequence_;
51 std::optional<uint32_t> finishSequence_;
52 std::reference_wrapper<SystemState const> state_; // shared state for ETL
53
54 std::thread thread_;
55
56public:
58 DataPipeType& pipe,
59 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers,
60 LedgerFetcherType& ledgerFetcher,
61 uint32_t startSequence,
62 std::optional<uint32_t> finishSequence,
63 SystemState const& state
64 )
65 : pipe_(std::ref(pipe))
66 , networkValidatedLedgers_{std::move(networkValidatedLedgers)}
67 , ledgerFetcher_{std::ref(ledgerFetcher)}
68 , startSequence_{startSequence}
69 , finishSequence_{finishSequence}
70 , state_{std::cref(state)}
71 {
72 thread_ = std::thread([this]() { process(); });
73 }
74
76 {
77 if (thread_.joinable())
78 thread_.join();
79 }
80
81 void
82 waitTillFinished()
83 {
84 ASSERT(thread_.joinable(), "Extractor thread must be joinable");
85 thread_.join();
86 }
87
88private:
89 void
90 process()
91 {
92 beast::setCurrentThreadName("ETLService extract");
93
94 double totalTime = 0.0;
95 auto currentSequence = startSequence_;
96
97 while (!shouldFinish(currentSequence) && networkValidatedLedgers_->waitUntilValidatedByNetwork(currentSequence)
98 ) {
99 auto [fetchResponse, time] = ::util::timed<std::chrono::duration<double>>([this, currentSequence]() {
100 return ledgerFetcher_.get().fetchDataAndDiff(currentSequence);
101 });
102 totalTime += time;
103
104 // if the fetch is unsuccessful, stop. fetchLedger only returns false if the server is shutting down, or
105 // if the ledger was found in the database (which means another process already wrote the ledger that
106 // this process was trying to extract; this is a form of a write conflict). Otherwise, fetchDataAndDiff
107 // will keep trying to fetch the specified ledger until successful.
108 if (!fetchResponse)
109 break;
110
111 // TODO: extract this part into a strategy perhaps
112 auto const tps = fetchResponse->transactions_list().transactions_size() / time;
113 LOG(log_.info()) << "Extract phase time = " << time << "; Extract phase tps = " << tps
114 << "; Avg extract time = " << totalTime / (currentSequence - startSequence_ + 1)
115 << "; seq = " << currentSequence;
116
117 pipe_.get().push(currentSequence, std::move(fetchResponse));
118 currentSequence += pipe_.get().getStride();
119 }
120
121 pipe_.get().finish(startSequence_);
122 }
123
124 [[nodiscard]] bool
125 isStopping() const
126 {
127 return state_.get().isStopping;
128 }
129
130 [[nodiscard]] bool
131 hasWriteConflict() const
132 {
133 return state_.get().writeConflict;
134 }
135
136 [[nodiscard]] bool
137 shouldFinish(uint32_t seq) const
138 {
139 // Stopping conditions:
140 // - if there is a write conflict in the load thread, the ETL mechanism should stop.
141 // - if the entire server is shutting down - this can be detected in a variety of ways.
142 // - when the given sequence is past the finishSequence in case one is specified
143 return hasWriteConflict() || isStopping() || (finishSequence_ && seq > *finishSequence_);
144 }
145};
146
147} // namespace etl::impl
Extractor thread that is fetching GRPC data and enqueue it on the DataPipeType.
Definition Extractor.hpp:44
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
Represents the state of the ETL subsystem.
Definition SystemState.hpp:33