53 std::shared_ptr<BackendInterface> backend_;
54 std::reference_wrapper<CacheType> cache_;
57 std::atomic_int16_t remaining_;
59 std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
60 std::vector<util::async::AnyOperation<void>> tasks_;
63 template <
typename CtxType>
66 std::shared_ptr<BackendInterface>
const& backend,
69 std::size_t
const numCacheMarkers,
70 std::size_t
const cachePageFetchSize,
71 std::vector<CursorPair>
const& cursors
73 : ctx_{ctx}, backend_{backend}, cache_{std::ref(cache)}, queue_{cursors.size()}, remaining_{cursors.size()}
75 std::ranges::for_each(cursors, [
this](
auto const& cursor) { queue_.
push(cursor); });
76 load(seq, numCacheMarkers, cachePageFetchSize);
88 for (
auto& t : tasks_)
95 for (
auto& t : tasks_)
101 load(uint32_t
const seq,
size_t numCacheMarkers,
size_t cachePageFetchSize)
103 namespace vs = std::views;
105 LOG(log_.
info()) <<
"Loading cache. Num cursors = " << queue_.
size();
106 tasks_.reserve(numCacheMarkers);
108 for ([[maybe_unused]]
auto taskId : vs::iota(0u, numCacheMarkers))
109 tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
113 spawnWorker(uint32_t
const seq,
size_t cachePageFetchSize)
115 return ctx_.
execute([
this, seq, cachePageFetchSize](
auto token) {
116 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
117 auto cursor = queue_.
tryPop();
118 if (not cursor.has_value()) {
122 auto [start, end] = cursor.value();
123 LOG(log_.
debug()) <<
"Starting a cursor: " << ripple::strHex(start);
125 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
127 return backend_->fetchLedgerPage(start, seq, cachePageFetchSize,
false, token);
130 cache_.get().update(res.objects, seq,
true);
132 if (not res.cursor or res.cursor > end) {
133 if (--remaining_ <= 0) {
134 auto endTime = std::chrono::steady_clock::now();
135 auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime_);
137 LOG(log_.
info()) <<
"Finished loading cache. Cache size = " << cache_.get().size()
138 <<
". Took " << duration.count() <<
" seconds";
140 cache_.get().setFull();
142 LOG(log_.
debug()) <<
"Finished a cursor. Remaining = " << remaining_;
148 start = std::move(res.cursor).value();
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto execute(SomeHandlerWithoutStopToken auto &&fn)
Execute a function on the execution context.
Definition AnyExecutionContext.hpp:86
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimout exceptions and retries indefinitely.
Definition BackendInterface.hpp:79