Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CacheLoader.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/ETLHelpers.hpp"
24#include "etl/impl/BaseCursorProvider.hpp"
25#include "util/async/AnyExecutionContext.hpp"
26#include "util/async/AnyOperation.hpp"
27#include "util/log/Logger.hpp"
28
29#include <boost/algorithm/string/predicate.hpp>
30#include <boost/context/detail/config.hpp>
31#include <xrpl/basics/Blob.h>
32#include <xrpl/basics/base_uint.h>
33#include <xrpl/basics/strHex.h>
34
35#include <algorithm>
36#include <atomic>
37#include <chrono>
38#include <cstddef>
39#include <cstdint>
40#include <functional>
41#include <memory>
42#include <ranges>
43#include <string>
44#include <vector>
45
46namespace etl::impl {
47
48template <typename CacheType>
49class CacheLoaderImpl {
50 util::Logger log_{"ETL"};
51
53 std::shared_ptr<BackendInterface> backend_;
54 std::reference_wrapper<CacheType> cache_;
55
57 std::atomic_int16_t remaining_;
58
59 std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
60 std::vector<util::async::AnyOperation<void>> tasks_;
61
62public:
63 template <typename CtxType>
64 CacheLoaderImpl(
65 CtxType& ctx,
66 std::shared_ptr<BackendInterface> const& backend,
67 CacheType& cache,
68 uint32_t const seq,
69 std::size_t const numCacheMarkers,
70 std::size_t const cachePageFetchSize,
71 std::vector<CursorPair> const& cursors
72 )
73 : ctx_{ctx}, backend_{backend}, cache_{std::ref(cache)}, queue_{cursors.size()}, remaining_{cursors.size()}
74 {
75 std::ranges::for_each(cursors, [this](auto const& cursor) { queue_.push(cursor); });
76 load(seq, numCacheMarkers, cachePageFetchSize);
77 }
78
79 ~CacheLoaderImpl()
80 {
81 stop();
82 wait();
83 }
84
85 void
86 stop() noexcept
87 {
88 for (auto& t : tasks_)
89 t.abort();
90 }
91
92 void
93 wait() noexcept
94 {
95 for (auto& t : tasks_)
96 t.wait();
97 }
98
99private:
100 void
101 load(uint32_t const seq, size_t numCacheMarkers, size_t cachePageFetchSize)
102 {
103 namespace vs = std::views;
104
105 LOG(log_.info()) << "Loading cache. Num cursors = " << queue_.size();
106 tasks_.reserve(numCacheMarkers);
107
108 for ([[maybe_unused]] auto taskId : vs::iota(0u, numCacheMarkers))
109 tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
110 }
111
112 [[nodiscard]] auto
113 spawnWorker(uint32_t const seq, size_t cachePageFetchSize)
114 {
115 return ctx_.execute([this, seq, cachePageFetchSize](auto token) {
116 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
117 auto cursor = queue_.tryPop();
118 if (not cursor.has_value()) {
119 return; // queue is empty
120 }
121
122 auto [start, end] = cursor.value();
123 LOG(log_.debug()) << "Starting a cursor: " << ripple::strHex(start);
124
125 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
126 auto res = data::retryOnTimeout([this, seq, cachePageFetchSize, &start, token]() {
127 return backend_->fetchLedgerPage(start, seq, cachePageFetchSize, false, token);
128 });
129
130 cache_.get().update(res.objects, seq, true);
131
132 if (not res.cursor or res.cursor > end) {
133 if (--remaining_ <= 0) {
134 auto endTime = std::chrono::steady_clock::now();
135 auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime_);
136
137 LOG(log_.info()) << "Finished loading cache. Cache size = " << cache_.get().size()
138 << ". Took " << duration.count() << " seconds";
139
140 cache_.get().setFull();
141 } else {
142 LOG(log_.debug()) << "Finished a cursor. Remaining = " << remaining_;
143 }
144
145 break; // pick up the next cursor if available
146 }
147
148 start = std::move(res.cursor).value();
149 }
150 }
151 });
152 }
153};
154
155} // namespace etl::impl
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:44
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:94
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimeout exceptions and retries indefinitely.
Definition BackendInterface.hpp:82