Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CacheLoader.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/ETLHelpers.hpp"
24#include "etl/impl/BaseCursorProvider.hpp"
25#include "util/async/AnyExecutionContext.hpp"
26#include "util/async/AnyOperation.hpp"
27#include "util/log/Logger.hpp"
28
29#include <boost/algorithm/string/predicate.hpp>
30#include <boost/context/detail/config.hpp>
31#include <xrpl/basics/Blob.h>
32#include <xrpl/basics/base_uint.h>
33#include <xrpl/basics/strHex.h>
34
35#include <algorithm>
36#include <atomic>
37#include <chrono>
38#include <cstddef>
39#include <cstdint>
40#include <functional>
41#include <memory>
42#include <ranges>
43#include <string>
44#include <vector>
45
46namespace etl::impl {
47
48template <typename CacheType>
49class CacheLoaderImpl {
50 util::Logger log_{"ETL"};
51
53 std::shared_ptr<BackendInterface> backend_;
54 std::reference_wrapper<CacheType> cache_;
55
57 std::atomic_int16_t remaining_;
58
59 std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
60 std::vector<util::async::AnyOperation<void>> tasks_;
61
62public:
63 template <typename CtxType>
64 CacheLoaderImpl(
65 CtxType& ctx,
66 std::shared_ptr<BackendInterface> const& backend,
67 CacheType& cache,
68 uint32_t const seq,
69 std::size_t const numCacheMarkers,
70 std::size_t const cachePageFetchSize,
71 std::vector<CursorPair> const& cursors
72 )
73 : ctx_{ctx}
74 , backend_{backend}
75 , cache_{std::ref(cache)}
76 , queue_{cursors.size()}
77 , remaining_{cursors.size()}
78 {
79 std::ranges::for_each(cursors, [this](auto const& cursor) { queue_.push(cursor); });
80 load(seq, numCacheMarkers, cachePageFetchSize);
81 }
82
83 ~CacheLoaderImpl()
84 {
85 stop();
86 wait();
87 }
88
89 void
90 stop() noexcept
91 {
92 for (auto& t : tasks_)
93 t.abort();
94 }
95
96 void
97 wait() noexcept
98 {
99 for (auto& t : tasks_)
100 t.wait();
101 }
102
103private:
104 void
105 load(uint32_t const seq, size_t numCacheMarkers, size_t cachePageFetchSize)
106 {
107 namespace vs = std::views;
108
109 LOG(log_.info()) << "Loading cache. Num cursors = " << queue_.size();
110 tasks_.reserve(numCacheMarkers);
111
112 for ([[maybe_unused]] auto taskId : vs::iota(0u, numCacheMarkers))
113 tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
114 }
115
116 [[nodiscard]] auto
117 spawnWorker(uint32_t const seq, size_t cachePageFetchSize)
118 {
119 return ctx_.execute([this, seq, cachePageFetchSize](auto token) {
120 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
121 auto cursor = queue_.tryPop();
122 if (not cursor.has_value()) {
123 return; // queue is empty
124 }
125
126 auto [start, end] = cursor.value();
127 LOG(log_.debug()) << "Starting a cursor: " << ripple::strHex(start);
128
129 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
130 auto res =
131 data::retryOnTimeout([this, seq, cachePageFetchSize, &start, token]() {
132 return backend_->fetchLedgerPage(
133 start, seq, cachePageFetchSize, false, token
134 );
135 });
136
137 cache_.get().update(res.objects, seq, true);
138
139 if (not res.cursor or res.cursor > end) {
140 if (--remaining_ <= 0) {
141 auto endTime = std::chrono::steady_clock::now();
142 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
143 endTime - startTime_
144 );
145
146 LOG(log_.info())
147 << "Finished loading cache. Cache size = " << cache_.get().size()
148 << ". Took " << duration.count() << " seconds";
149
150 cache_.get().setFull();
151 } else {
152 LOG(log_.debug()) << "Finished a cursor. Remaining = " << remaining_;
153 }
154
155 break; // pick up the next cursor if available
156 }
157
158 start = std::move(res.cursor).value();
159 }
160 }
161 });
162 }
163};
164
165} // namespace etl::impl
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:45
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimeout exceptions and retries indefinitely.
Definition BackendInterface.hpp:82