49class CacheLoaderImpl {
 
   53    std::shared_ptr<BackendInterface> backend_;
 
   54    std::reference_wrapper<CacheType> cache_;
 
   57    std::atomic_int16_t remaining_;
 
   59    std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
 
   60    std::vector<util::async::AnyOperation<void>> tasks_;
 
   63    template <
typename CtxType>
 
   66        std::shared_ptr<BackendInterface> 
const& backend,
 
   69        std::size_t 
const numCacheMarkers,
 
   70        std::size_t 
const cachePageFetchSize,
 
   71        std::vector<CursorPair> 
const& cursors
 
   73        : ctx_{ctx}, backend_{backend}, cache_{std::ref(cache)}, queue_{cursors.size()}, remaining_{cursors.size()}
 
   75        std::ranges::for_each(cursors, [
this](
auto const& cursor) { queue_.push(cursor); });
 
   76        load(seq, numCacheMarkers, cachePageFetchSize);
 
   88        for (
auto& t : tasks_)
 
   95        for (
auto& t : tasks_)
 
  101    load(uint32_t 
const seq, 
size_t numCacheMarkers, 
size_t cachePageFetchSize)
 
  103        namespace vs = std::views;
 
  105        LOG(log_.info()) << 
"Loading cache. Num cursors = " << queue_.size();
 
  106        tasks_.reserve(numCacheMarkers);
 
  108        for ([[maybe_unused]] 
auto taskId : vs::iota(0u, numCacheMarkers))
 
  109            tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
 
  113    spawnWorker(uint32_t 
const seq, 
size_t cachePageFetchSize)
 
  115        return ctx_.execute([
this, seq, cachePageFetchSize](
auto token) {
 
  116            while (not token.isStopRequested() and not cache_.get().isDisabled()) {
 
  117                auto cursor = queue_.tryPop();
 
  118                if (not cursor.has_value()) {
 
  122                auto [start, end] = cursor.value();
 
  123                LOG(log_.debug()) << 
"Starting a cursor: " << ripple::strHex(start);
 
  125                while (not token.isStopRequested() and not cache_.get().isDisabled()) {
 
  127                        return backend_->fetchLedgerPage(start, seq, cachePageFetchSize, 
false, token);
 
  130                    cache_.get().update(res.objects, seq, 
true);
 
  132                    if (not res.cursor or res.cursor > end) {
 
  133                        if (--remaining_ <= 0) {
 
  134                            auto endTime = std::chrono::steady_clock::now();
 
  135                            auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime_);
 
  137                            LOG(log_.info()) << 
"Finished loading cache. Cache size = " << cache_.get().size()
 
  138                                             << 
". Took " << duration.count() << 
" seconds";
 
  140                            cache_.get().setFull();
 
  142                            LOG(log_.debug()) << 
"Finished a cursor. Remaining = " << remaining_;
 
  148                    start = std::move(res.cursor).value();
 
 
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimeout exceptions and retries indefinitely.
Definition BackendInterface.hpp:82