Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CacheLoader.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "data/LedgerCacheInterface.hpp"
5#include "data/LedgerCacheLoadingState.hpp"
6#include "data/Types.hpp"
7#include "etl/CacheLoaderInterface.hpp"
8#include "etl/CacheLoaderSettings.hpp"
9#include "etl/impl/CacheLoader.hpp"
10#include "etl/impl/CursorFromAccountProvider.hpp"
11#include "etl/impl/CursorFromDiffProvider.hpp"
12#include "etl/impl/CursorFromFixDiffNumProvider.hpp"
13#include "util/Assert.hpp"
14#include "util/Profiler.hpp"
15#include "util/async/context/BasicExecutionContext.hpp"
16#include "util/config/ConfigDefinition.hpp"
17#include "util/log/Logger.hpp"
18
19#include <algorithm>
20#include <cstdint>
21#include <functional>
22#include <memory>
23#include <utility>
24
25namespace etl {
26
36template <typename ExecutionContextType = util::async::CoroExecutionContext>
39
40 util::Logger log_{"ETL"};
41 std::shared_ptr<BackendInterface> backend_;
42 std::reference_wrapper<data::LedgerCacheInterface> cache_;
43
44 CacheLoaderSettings settings_;
45 std::unique_ptr<data::LedgerCacheLoadingStateInterface const> cacheLoadingState_;
46 ExecutionContextType ctx_;
47 std::unique_ptr<CacheLoaderType> loader_;
48
49public:
60 std::shared_ptr<BackendInterface> backend,
62 std::unique_ptr<data::LedgerCacheLoadingStateInterface const> cacheLoadingState
63 )
64 : backend_{std::move(backend)}
65 , cache_{cache}
66 , settings_{makeCacheLoaderSettings(config)}
67 , cacheLoadingState_(std::move(cacheLoadingState))
68 , ctx_{settings_.numThreads}
69 {
70 }
71
80 void
81 load(uint32_t const seq) override
82 {
83 ASSERT(not cache_.get().isFull(), "Cache must not be full. seq = {}", seq);
84
85 if (settings_.isDisabled()) {
86 cache_.get().setDisabled();
87 LOG(log_.warn()) << "Cache is disabled. Not loading";
88 return;
89 }
90
91 if (loadCacheFromFile()) {
92 // Cache file may contain outdated data, so fetch whatever left up to seq from DB
93 updateCacheToSeq(seq);
94 cache_.get().setFull();
95 return;
96 }
97
98 LOG(log_.info()) << "Waiting for ledger cache loading to become allowed";
99 cacheLoadingState_->waitForLoadingAllowed();
100 LOG(log_.info()) << "Ledger cache loading is now allowed. Start loading...";
101 cache_.get().startLoading();
102
103 std::shared_ptr<impl::BaseCursorProvider> provider;
104 if (settings_.numCacheCursorsFromDiff != 0) {
105 LOG(log_.info()) << "Loading cache with cursor from num_cursors_from_diff="
106 << settings_.numCacheCursorsFromDiff;
107 provider = std::make_shared<impl::CursorFromDiffProvider>(
108 backend_, settings_.numCacheCursorsFromDiff
109 );
110 } else if (settings_.numCacheCursorsFromAccount != 0) {
111 LOG(log_.info()) << "Loading cache with cursor from num_cursors_from_account="
112 << settings_.numCacheCursorsFromAccount;
113 provider = std::make_shared<impl::CursorFromAccountProvider>(
114 backend_, settings_.numCacheCursorsFromAccount, settings_.cachePageFetchSize
115 );
116 } else {
117 LOG(log_.info()) << "Loading cache with cursor from num_diffs="
118 << settings_.numCacheDiffs;
119 provider = std::make_shared<impl::CursorFromFixDiffNumProvider>(
120 backend_, settings_.numCacheDiffs
121 );
122 }
123
124 loader_ = std::make_unique<CacheLoaderType>(
125 ctx_,
126 backend_,
127 cache_,
128 seq,
129 settings_.numCacheMarkers,
130 settings_.cachePageFetchSize,
131 provider->getCursors(seq)
132 );
133
134 if (settings_.isSync()) {
135 loader_->wait();
136 ASSERT(cache_.get().isFull(), "Cache must be full after sync load. seq = {}", seq);
137 }
138 }
139
143 void
144 stop() noexcept override
145 {
146 if (loader_ != nullptr)
147 loader_->stop();
148 }
149
153 void
154 wait() noexcept override
155 {
156 if (loader_ != nullptr)
157 loader_->wait();
158 }
159
160private:
161 bool
162 loadCacheFromFile()
163 {
164 if (not settings_.cacheFileSettings.has_value()) {
165 return false;
166 }
167 LOG(log_.info()) << "Loading ledger cache from " << settings_.cacheFileSettings->path;
168 auto const minLatestSequence =
169 backend_->fetchLedgerRange()
170 .transform([this](data::LedgerRange const& range) {
171 return std::max(
172 range.maxSequence - settings_.cacheFileSettings->maxAge, range.minSequence
173 );
174 })
175 .value_or(0);
176
177 auto const [success, duration_ms] = util::timed([&]() {
178 return cache_.get().loadFromFile(settings_.cacheFileSettings->path, minLatestSequence);
179 });
180
181 if (not success.has_value()) {
182 LOG(log_.warn()) << "Error loading cache from file: " << success.error();
183 return false;
184 }
185
186 LOG(log_.info()) << "Loaded cache from file in " << duration_ms
187 << " ms. Latest sequence: " << cache_.get().latestLedgerSequence();
188 return true;
189 }
190
191 void
192 updateCacheToSeq(uint32_t const seq)
193 {
194 while (cache_.get().latestLedgerSequence() < seq) {
195 auto const seqToLoad = cache_.get().latestLedgerSequence() + 1;
196 LOG(log_.info()) << "Fetching ledger " << seqToLoad
197 << "from DB after loading cache from file";
198 auto const diff = data::synchronousAndRetryOnTimeout([this, seqToLoad](auto yield) {
199 return backend_->fetchLedgerDiff(seqToLoad, yield);
200 });
201 cache_.get().update(diff, seqToLoad);
202 LOG(log_.info()) << "Updated cache to " << seqToLoad;
203 }
204 }
205};
206
207} // namespace etl
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:21
void load(uint32_t const seq) override
Load the cache for the given sequence number.
Definition CacheLoader.hpp:81
void stop() noexcept override
Requests the loader to stop asap.
Definition CacheLoader.hpp:144
void wait() noexcept override
Waits for the loader to finish background work.
Definition CacheLoader.hpp:154
CacheLoader(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > backend, data::LedgerCacheInterface &cache, std::unique_ptr< data::LedgerCacheLoadingStateInterface const > cacheLoadingState)
Construct a new Cache Loader object.
Definition CacheLoader.hpp:58
Definition CacheLoader.hpp:30
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:488
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:117
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:21
Stores a range of sequences as a min and max pair.
Definition Types.hpp:243
An interface for the Cache Loader.
Definition CacheLoaderInterface.hpp:10
Settings for the cache loader.
Definition CacheLoaderSettings.hpp:15
std::optional< CacheFileSettings > cacheFileSettings
Definition CacheLoaderSettings.hpp:39