49class CacheLoaderImpl {
53 std::shared_ptr<BackendInterface> backend_;
54 std::reference_wrapper<CacheType> cache_;
57 std::atomic_int16_t remaining_;
59 std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
60 std::vector<util::async::AnyOperation<void>> tasks_;
63 template <
typename CtxType>
66 std::shared_ptr<BackendInterface>
const& backend,
69 std::size_t
const numCacheMarkers,
70 std::size_t
const cachePageFetchSize,
71 std::vector<CursorPair>
const& cursors
75 , cache_{std::ref(cache)}
76 , queue_{cursors.size()}
77 , remaining_{cursors.size()}
79 std::ranges::for_each(cursors, [
this](
auto const& cursor) { queue_.push(cursor); });
80 load(seq, numCacheMarkers, cachePageFetchSize);
92 for (
auto& t : tasks_)
99 for (
auto& t : tasks_)
105 load(uint32_t
const seq,
size_t numCacheMarkers,
size_t cachePageFetchSize)
107 namespace vs = std::views;
109 LOG(log_.info()) <<
"Loading cache. Num cursors = " << queue_.size();
110 tasks_.reserve(numCacheMarkers);
112 for ([[maybe_unused]]
auto taskId : vs::iota(0u, numCacheMarkers))
113 tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
117 spawnWorker(uint32_t
const seq,
size_t cachePageFetchSize)
119 return ctx_.execute([
this, seq, cachePageFetchSize](
auto token) {
120 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
121 auto cursor = queue_.tryPop();
122 if (not cursor.has_value()) {
126 auto [start, end] = cursor.value();
127 LOG(log_.debug()) <<
"Starting a cursor: " << ripple::strHex(start);
129 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
132 return backend_->fetchLedgerPage(
133 start, seq, cachePageFetchSize,
false, token
137 cache_.get().update(res.objects, seq,
true);
139 if (not res.cursor or res.cursor > end) {
140 if (--remaining_ <= 0) {
141 auto endTime = std::chrono::steady_clock::now();
142 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
147 <<
"Finished loading cache. Cache size = " << cache_.get().size()
148 <<
". Took " << duration.count() <<
" seconds";
150 cache_.get().setFull();
152 LOG(log_.debug()) <<
"Finished a cursor. Remaining = " << remaining_;
158 start = std::move(res.cursor).value();
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimeout exceptions and retries indefinitely.
Definition BackendInterface.hpp:82