30class CacheLoaderImpl {
34 std::shared_ptr<BackendInterface> backend_;
35 std::reference_wrapper<CacheType> cache_;
38 std::atomic_int16_t remaining_;
40 std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
41 std::vector<util::async::AnyOperation<void>> tasks_;
44 template <
typename CtxType>
47 std::shared_ptr<BackendInterface> backend,
50 std::size_t
const numCacheMarkers,
51 std::size_t
const cachePageFetchSize,
52 std::vector<CursorPair>
const& cursors
55 , backend_{std::move(backend)}
56 , cache_{std::ref(cache)}
57 , queue_{cursors.size()}
58 , remaining_{cursors.size()}
60 std::ranges::for_each(cursors, [
this](
auto const& cursor) { queue_.push(cursor); });
61 load(seq, numCacheMarkers, cachePageFetchSize);
73 for (
auto& t : tasks_)
80 for (
auto& t : tasks_)
86 load(uint32_t
const seq,
size_t numCacheMarkers,
size_t cachePageFetchSize)
88 namespace vs = std::views;
90 LOG(log_.info()) <<
"Loading cache. Num cursors = " << queue_.size();
91 tasks_.reserve(numCacheMarkers);
93 for ([[maybe_unused]]
auto taskId : vs::iota(0u, numCacheMarkers))
94 tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
98 spawnWorker(uint32_t
const seq,
size_t cachePageFetchSize)
100 return ctx_.execute([
this, seq, cachePageFetchSize](
auto token) {
101 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
102 auto cursor = queue_.tryPop();
103 if (not cursor.has_value()) {
107 auto [start, end] = cursor.value();
108 LOG(log_.debug()) <<
"Starting a cursor: " << ripple::strHex(start);
110 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
113 return backend_->fetchLedgerPage(
114 start, seq, cachePageFetchSize,
false, token
118 cache_.get().update(res.objects, seq,
true);
120 if (not res.cursor or res.cursor > end) {
121 if (--remaining_ <= 0) {
122 auto endTime = std::chrono::steady_clock::now();
123 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
128 <<
"Finished loading cache. Cache size = " << cache_.get().size()
129 <<
". Took " << duration.count() <<
" seconds";
131 cache_.get().setFull();
133 LOG(log_.debug()) <<
"Finished a cursor. Remaining = " << remaining_;
139 start = std::move(res.cursor).value();
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimeout exceptions and retries indefinitely.
Definition BackendInterface.hpp:63