3#include "data/BackendInterface.hpp"
4#include "data/LedgerCacheInterface.hpp"
5#include "data/LedgerCacheLoadingState.hpp"
6#include "data/Types.hpp"
7#include "etl/CacheLoaderInterface.hpp"
8#include "etl/CacheLoaderSettings.hpp"
9#include "etl/impl/CacheLoader.hpp"
10#include "etl/impl/CursorFromAccountProvider.hpp"
11#include "etl/impl/CursorFromDiffProvider.hpp"
12#include "etl/impl/CursorFromFixDiffNumProvider.hpp"
13#include "util/Assert.hpp"
14#include "util/Profiler.hpp"
15#include "util/async/context/BasicExecutionContext.hpp"
16#include "util/config/ConfigDefinition.hpp"
17#include "util/log/Logger.hpp"
36template <
typename ExecutionContextType = util::async::CoroExecutionContext>
41 std::shared_ptr<BackendInterface> backend_;
42 std::reference_wrapper<data::LedgerCacheInterface> cache_;
45 std::unique_ptr<data::LedgerCacheLoadingStateInterface const> cacheLoadingState_;
46 ExecutionContextType ctx_;
47 std::unique_ptr<CacheLoaderType> loader_;
60 std::shared_ptr<BackendInterface> backend,
62 std::unique_ptr<data::LedgerCacheLoadingStateInterface const> cacheLoadingState
64 : backend_{std::move(backend)}
66 , settings_{makeCacheLoaderSettings(config)}
67 , cacheLoadingState_(std::move(cacheLoadingState))
68 , ctx_{settings_.numThreads}
81 load(uint32_t
const seq)
override
83 ASSERT(not cache_.get().isFull(),
"Cache must not be full. seq = {}", seq);
85 if (settings_.isDisabled()) {
86 cache_.get().setDisabled();
87 LOG(log_.warn()) <<
"Cache is disabled. Not loading";
91 if (loadCacheFromFile()) {
93 updateCacheToSeq(seq);
94 cache_.get().setFull();
98 LOG(log_.info()) <<
"Waiting for ledger cache loading to become allowed";
99 cacheLoadingState_->waitForLoadingAllowed();
100 LOG(log_.info()) <<
"Ledger cache loading is now allowed. Start loading...";
101 cache_.get().startLoading();
103 std::shared_ptr<impl::BaseCursorProvider> provider;
104 if (settings_.numCacheCursorsFromDiff != 0) {
105 LOG(log_.info()) <<
"Loading cache with cursor from num_cursors_from_diff="
106 << settings_.numCacheCursorsFromDiff;
107 provider = std::make_shared<impl::CursorFromDiffProvider>(
108 backend_, settings_.numCacheCursorsFromDiff
110 }
else if (settings_.numCacheCursorsFromAccount != 0) {
111 LOG(log_.info()) <<
"Loading cache with cursor from num_cursors_from_account="
112 << settings_.numCacheCursorsFromAccount;
113 provider = std::make_shared<impl::CursorFromAccountProvider>(
114 backend_, settings_.numCacheCursorsFromAccount, settings_.cachePageFetchSize
117 LOG(log_.info()) <<
"Loading cache with cursor from num_diffs="
118 << settings_.numCacheDiffs;
119 provider = std::make_shared<impl::CursorFromFixDiffNumProvider>(
120 backend_, settings_.numCacheDiffs
124 loader_ = std::make_unique<CacheLoaderType>(
129 settings_.numCacheMarkers,
130 settings_.cachePageFetchSize,
131 provider->getCursors(seq)
134 if (settings_.isSync()) {
136 ASSERT(cache_.get().isFull(),
"Cache must be full after sync load. seq = {}", seq);
146 if (loader_ !=
nullptr)
156 if (loader_ !=
nullptr)
168 auto const minLatestSequence =
169 backend_->fetchLedgerRange()
177 auto const [success, duration_ms] =
util::timed([&]() {
178 return cache_.get().loadFromFile(settings_.cacheFileSettings->path, minLatestSequence);
181 if (not success.has_value()) {
182 LOG(log_.warn()) <<
"Error loading cache from file: " << success.error();
186 LOG(log_.info()) <<
"Loaded cache from file in " << duration_ms
187 <<
" ms. Latest sequence: " << cache_.get().latestLedgerSequence();
192 updateCacheToSeq(uint32_t
const seq)
194 while (cache_.get().latestLedgerSequence() < seq) {
195 auto const seqToLoad = cache_.get().latestLedgerSequence() + 1;
196 LOG(log_.info()) <<
"Fetching ledger " << seqToLoad
197 <<
"from DB after loading cache from file";
199 return backend_->fetchLedgerDiff(seqToLoad, yield);
201 cache_.get().update(diff, seqToLoad);
202 LOG(log_.info()) <<
"Updated cache to " << seqToLoad;
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:21
void load(uint32_t const seq) override
Load the cache for the given sequence number.
Definition CacheLoader.hpp:81
void stop() noexcept override
Requests the loader to stop asap.
Definition CacheLoader.hpp:144
void wait() noexcept override
Waits for the loader to finish background work.
Definition CacheLoader.hpp:154
CacheLoader(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > backend, data::LedgerCacheInterface &cache, std::unique_ptr< data::LedgerCacheLoadingStateInterface const > cacheLoadingState)
Construct a new Cache Loader object.
Definition CacheLoader.hpp:58
Definition CacheLoader.hpp:30
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:488
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:117
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:21
Stores a range of sequences as a min and max pair.
Definition Types.hpp:243
An interface for the Cache Loader.
Definition CacheLoaderInterface.hpp:10
Settings for the cache loader.
Definition CacheLoaderSettings.hpp:15
std::optional< CacheFileSettings > cacheFileSettings
Definition CacheLoaderSettings.hpp:39