Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CacheLoader.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/ETLHelpers.hpp"
24#include "etl/impl/BaseCursorProvider.hpp"
25#include "util/async/AnyExecutionContext.hpp"
26#include "util/async/AnyOperation.hpp"
27#include "util/log/Logger.hpp"
28
29#include <boost/algorithm/string/predicate.hpp>
30#include <boost/context/detail/config.hpp>
31#include <xrpl/basics/Blob.h>
32#include <xrpl/basics/base_uint.h>
33#include <xrpl/basics/strHex.h>
34
35#include <algorithm>
36#include <atomic>
37#include <chrono>
38#include <cstddef>
39#include <cstdint>
40#include <functional>
41#include <memory>
42#include <ranges>
43#include <string>
44#include <vector>
45
46namespace etl::impl {
47
48template <typename CacheType>
50 util::Logger log_{"ETL"};
51
53 std::shared_ptr<BackendInterface> backend_;
54 std::reference_wrapper<CacheType> cache_;
55
57 std::atomic_int16_t remaining_;
58
59 std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
60 std::vector<util::async::AnyOperation<void>> tasks_;
61
62public:
63 template <typename CtxType>
65 CtxType& ctx,
66 std::shared_ptr<BackendInterface> const& backend,
67 CacheType& cache,
68 uint32_t const seq,
69 std::size_t const numCacheMarkers,
70 std::size_t const cachePageFetchSize,
71 std::vector<CursorPair> const& cursors
72 )
73 : ctx_{ctx}, backend_{backend}, cache_{std::ref(cache)}, queue_{cursors.size()}, remaining_{cursors.size()}
74 {
75 std::ranges::for_each(cursors, [this](auto const& cursor) { queue_.push(cursor); });
76 load(seq, numCacheMarkers, cachePageFetchSize);
77 }
78
80 {
81 stop();
82 wait();
83 }
84
85 void
86 stop() noexcept
87 {
88 for (auto& t : tasks_)
89 t.abort();
90 }
91
92 void
93 wait() noexcept
94 {
95 for (auto& t : tasks_)
96 t.wait();
97 }
98
99private:
100 void
101 load(uint32_t const seq, size_t numCacheMarkers, size_t cachePageFetchSize)
102 {
103 namespace vs = std::views;
104
105 LOG(log_.info()) << "Loading cache. Num cursors = " << queue_.size();
106 tasks_.reserve(numCacheMarkers);
107
108 for ([[maybe_unused]] auto taskId : vs::iota(0u, numCacheMarkers))
109 tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
110 }
111
112 [[nodiscard]] auto
113 spawnWorker(uint32_t const seq, size_t cachePageFetchSize)
114 {
115 return ctx_.execute([this, seq, cachePageFetchSize](auto token) {
116 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
117 auto cursor = queue_.tryPop();
118 if (not cursor.has_value()) {
119 return; // queue is empty
120 }
121
122 auto [start, end] = cursor.value();
123 LOG(log_.debug()) << "Starting a cursor: " << ripple::strHex(start);
124
125 while (not token.isStopRequested() and not cache_.get().isDisabled()) {
126 auto res = data::retryOnTimeout([this, seq, cachePageFetchSize, &start, token]() {
127 return backend_->fetchLedgerPage(start, seq, cachePageFetchSize, false, token);
128 });
129
130 cache_.get().update(res.objects, seq, true);
131
132 if (not res.cursor or res.cursor > end) {
133 if (--remaining_ <= 0) {
134 auto endTime = std::chrono::steady_clock::now();
135 auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime_);
136
137 LOG(log_.info()) << "Finished loading cache. Cache size = " << cache_.get().size()
138 << ". Took " << duration.count() << " seconds";
139
140 cache_.get().setFull();
141 } else {
142 LOG(log_.debug()) << "Finished a cursor. Remaining = " << remaining_;
143 }
144
145 break; // pick up the next cursor if available
146 }
147
148 start = std::move(res.cursor).value();
149 }
150 }
151 });
152 }
153};
154
155} // namespace etl::impl
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:44
std::optional< T > tryPop()
Attempt to pop an element.
Definition ETLHelpers.hpp:120
std::size_t size() const
Get the size of the queue.
Definition ETLHelpers.hpp:139
void push(T const &elt)
Push element onto the queue.
Definition ETLHelpers.hpp:70
Definition CacheLoader.hpp:49
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto execute(SomeHandlerWithoutStopToken auto &&fn)
Execute a function on the execution context.
Definition AnyExecutionContext.hpp:86
auto retryOnTimeout(FnType func, size_t waitMs=kDEFAULT_WAIT_BETWEEN_RETRY)
A helper function that catches DatabaseTimout exceptions and retries indefinitely.
Definition BackendInterface.hpp:79