47 std::reference_wrapper<DataPipeType> pipe_;
48 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
49 std::reference_wrapper<LedgerFetcherType> ledgerFetcher_;
50 uint32_t startSequence_;
51 std::optional<uint32_t> finishSequence_;
52 std::reference_wrapper<SystemState const> state_;
59 std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers,
60 LedgerFetcherType& ledgerFetcher,
61 uint32_t startSequence,
62 std::optional<uint32_t> finishSequence,
65 : pipe_(std::ref(pipe))
66 , networkValidatedLedgers_{std::move(networkValidatedLedgers)}
67 , ledgerFetcher_{std::ref(ledgerFetcher)}
68 , startSequence_{startSequence}
69 , finishSequence_{finishSequence}
70 , state_{std::cref(state)}
72 thread_ = std::thread([
this]() { process(); });
77 if (thread_.joinable())
84 ASSERT(thread_.joinable(),
"Extractor thread must be joinable");
92 beast::setCurrentThreadName(
"ETLService extract");
94 double totalTime = 0.0;
95 auto currentSequence = startSequence_;
97 while (!shouldFinish(currentSequence) && networkValidatedLedgers_->waitUntilValidatedByNetwork(currentSequence)
100 return ledgerFetcher_.get().fetchDataAndDiff(currentSequence);
112 auto const tps = fetchResponse->transactions_list().transactions_size() / time;
113 LOG(log_.
info()) <<
"Extract phase time = " << time <<
"; Extract phase tps = " << tps
114 <<
"; Avg extract time = " << totalTime / (currentSequence - startSequence_ + 1)
115 <<
"; seq = " << currentSequence;
117 pipe_.get().push(currentSequence, std::move(fetchResponse));
118 currentSequence += pipe_.get().getStride();
121 pipe_.get().finish(startSequence_);
127 return state_.get().isStopping;
131 hasWriteConflict()
const
133 return state_.get().writeConflict;
137 shouldFinish(uint32_t seq)
const
143 return hasWriteConflict() || isStopping() || (finishSequence_ && seq > *finishSequence_);
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40