Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CacheLoader.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "etl/ETLHelpers.hpp"
5#include "etl/impl/BaseCursorProvider.hpp"
6#include "util/async/AnyExecutionContext.hpp"
7#include "util/async/AnyOperation.hpp"
8#include "util/log/Logger.hpp"
9
10#include <boost/algorithm/string/predicate.hpp>
11#include <boost/context/detail/config.hpp>
12#include <xrpl/basics/Blob.h>
13#include <xrpl/basics/base_uint.h>
14#include <xrpl/basics/strHex.h>
15
16#include <algorithm>
17#include <atomic>
18#include <chrono>
19#include <cstddef>
20#include <cstdint>
21#include <functional>
22#include <memory>
23#include <ranges>
24#include <string>
25#include <vector>
26
27namespace etl::impl {
28
29template <typename CacheType>
30class CacheLoaderImpl {
31 util::Logger log_{"ETL"};
32
34 std::shared_ptr<BackendInterface> backend_;
35 std::reference_wrapper<CacheType> cache_;
36
38 std::atomic_int16_t remaining_;
39
40 std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
41 std::vector<util::async::AnyOperation<void>> tasks_;
42
43public:
44 template <typename CtxType>
45 CacheLoaderImpl(
46 CtxType& ctx,
47 std::shared_ptr<BackendInterface> backend,
48 CacheType& cache,
49 uint32_t const seq,
50 std::size_t const numCacheMarkers,
51 std::size_t const cachePageFetchSize,
52 std::vector<CursorPair> const& cursors
53 )
54 : ctx_{ctx}
55 , backend_{std::move(backend)}
56 , cache_{std::ref(cache)}
57 , queue_{cursors.size()}
58 , remaining_{cursors.size()}
59 {
60 std::ranges::for_each(cursors, [this](auto const& cursor) { queue_.push(cursor); });
61 load(seq, numCacheMarkers, cachePageFetchSize);
62 }
63
64 ~CacheLoaderImpl()
65 {
66 stop();
67 wait();
68 }
69
70 void
71 stop() noexcept
72 {
73 for (auto& t : tasks_)
74 t.abort();
75 }
76
77 void
78 wait() noexcept
79 {
80 for (auto& t : tasks_)
81 t.wait();
82 }
83
84private:
85 void
86 load(uint32_t const seq, size_t numCacheMarkers, size_t cachePageFetchSize)
87 {
88 namespace vs = std::views;
89
90 LOG(log_.info()) << "Loading cache. Num cursors = " << queue_.size();
91 tasks_.reserve(numCacheMarkers);
92
93 for ([[maybe_unused]] auto taskId : vs::iota(0u, numCacheMarkers))
94 tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
95 }
96
97 [[nodiscard]] auto
98 spawnWorker(uint32_t const seq, size_t cachePageFetchSize)
99 {
100 return ctx_.execute([this, seq, cachePageFetchSize](auto token) {
101 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
102 auto cursor = queue_.tryPop();
103 if (not cursor.has_value()) {
104 return; // queue is empty
105 }
106
107 auto [start, end] = cursor.value();
108 LOG(log_.debug()) << "Starting a cursor: " << ripple::strHex(start);
109
110 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
111 auto res =
112 data::retryOnTimeout([this, seq, cachePageFetchSize, &start, token]() {
113 return backend_->fetchLedgerPage(
114 start, seq, cachePageFetchSize, false, token
115 );
116 });
117
118 cache_.get().update(res.objects, seq, true);
119
120 if (not res.cursor or res.cursor > end) {
121 if (--remaining_ <= 0) {
122 auto endTime = std::chrono::steady_clock::now();
123 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
124 endTime - startTime_
125 );
126
127 LOG(log_.info())
128 << "Finished loading cache. Cache size = " << cache_.get().size()
129 << ". Took " << duration.count() << " seconds";
130
131 cache_.get().setFull();
132 } else {
133 LOG(log_.debug()) << "Finished a cursor. Remaining = " << remaining_;
134 }
135
136 break; // pick up the next cursor if available
137 }
138
139 start = std::move(res.cursor).value();
140 }
141 }
142 });
143 }
144};
145
146} // namespace etl::impl
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:26
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimeout exceptions and retries indefinitely.
Definition BackendInterface.hpp:63