Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
FullTableScanner.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "etl/ETLHelpers.hpp"
23#include "util/Assert.hpp"
24#include "util/async/AnyExecutionContext.hpp"
25#include "util/async/AnyOperation.hpp"
26#include "util/async/context/BasicExecutionContext.hpp"
27
28#include <boost/asio/spawn.hpp>
29
30#include <algorithm>
31#include <cstddef>
32#include <cstdint>
33#include <limits>
34#include <ranges>
35#include <vector>
36
37namespace migration::cassandra::impl {
38
42struct TokenRange {
43 std::int64_t start;
44 std::int64_t end;
45
52 TokenRange(std::int64_t start, std::int64_t end) : start{start}, end{end}
53 {
54 }
55};
56
60template <typename T>
61concept CanReadByTokenRange = requires(T obj, TokenRange const& range, boost::asio::yield_context yield) {
62 { obj.readByTokenRange(range, yield) } -> std::same_as<void>;
63};
64
71template <CanReadByTokenRange TableAdapter>
76 struct TokenRangesProvider {
77 uint32_t numRanges;
78
79 TokenRangesProvider(uint32_t numRanges) : numRanges{numRanges}
80 {
81 }
82
83 [[nodiscard]] std::vector<TokenRange>
84 getRanges() const
85 {
86 auto const minValue = std::numeric_limits<std::int64_t>::min();
87 auto const maxValue = std::numeric_limits<std::int64_t>::max();
88 if (numRanges == 1)
89 return {TokenRange{minValue, maxValue}};
90
91 // Safely calculate the range size using uint64_t to avoid overflow
92 uint64_t const rangeSize = (static_cast<uint64_t>(maxValue) * 2) / numRanges;
93
94 std::vector<TokenRange> ranges;
95 ranges.reserve(numRanges);
96
97 for (std::int64_t i = 0; i < numRanges; ++i) {
98 int64_t const start = minValue + (i * rangeSize);
99 int64_t const end = (i == numRanges - 1) ? maxValue : start + static_cast<int64_t>(rangeSize) - 1;
100 ranges.emplace_back(start, end);
101 }
102
103 return ranges;
104 }
105 };
106
107 [[nodiscard]] auto
108 spawnWorker()
109 {
110 return ctx_.execute([this](auto token) {
111 while (not token.isStopRequested()) {
112 auto cursor = queue_.tryPop();
113 if (not cursor.has_value()) {
114 return; // queue is empty
115 }
116 reader_.readByTokenRange(cursor.value(), token);
117 }
118 });
119 }
120
121 void
122 load(size_t workerNum)
123 {
124 namespace vs = std::views;
125
126 tasks_.reserve(workerNum);
127
128 for ([[maybe_unused]] auto taskId : vs::iota(0u, workerNum))
129 tasks_.push_back(spawnWorker());
130 }
131
133 std::size_t cursorsNum_;
135 std::vector<util::async::AnyOperation<void>> tasks_;
136 TableAdapter reader_;
137
138public:
143 std::uint32_t ctxThreadsNum;
144 std::uint32_t jobsNum;
145 std::uint32_t cursorsPerJob;
146 };
147
156 template <typename ExecutionContextType = util::async::CoroExecutionContext>
157 FullTableScanner(FullTableScannerSettings settings, TableAdapter&& reader)
158 : ctx_(ExecutionContextType(settings.ctxThreadsNum))
159 , cursorsNum_(settings.jobsNum * settings.cursorsPerJob)
160 , queue_{cursorsNum_}
161 , reader_{std::move(reader)}
162 {
163 ASSERT(settings.jobsNum > 0, "jobsNum for full table scanner must be greater than 0");
164 ASSERT(settings.cursorsPerJob > 0, "cursorsPerJob for full table scanner must be greater than 0");
165
166 auto const cursors = TokenRangesProvider{cursorsNum_}.getRanges();
167 std::ranges::for_each(cursors, [this](auto const& cursor) { queue_.push(cursor); });
168 load(settings.jobsNum);
169 }
170
174 void
176 {
177 for (auto& task : tasks_) {
178 task.wait();
179 }
180 }
181};
182
183} // namespace migration::cassandra::impl
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:44
void push(T const &elt)
Push element onto the queue.
Definition ETLHelpers.hpp:70
The full table scanner. It will split the full table scan into multiple ranges and read the data in g...
Definition FullTableScanner.hpp:72
void wait()
Wait for all workers to finish.
Definition FullTableScanner.hpp:175
FullTableScanner(FullTableScannerSettings settings, TableAdapter &&reader)
Construct a new Full Table Scanner object, it will run in a sync or async context according to the pa...
Definition FullTableScanner.hpp:157
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto execute(SomeHandlerWithoutStopToken auto &&fn)
Execute a function on the execution context.
Definition AnyExecutionContext.hpp:86
The concept for an adapter.
Definition FullTableScanner.hpp:61
The full table scanner settings.
Definition FullTableScanner.hpp:142
std::uint32_t ctxThreadsNum
Definition FullTableScanner.hpp:143
std::uint32_t jobsNum
Definition FullTableScanner.hpp:144
std::uint32_t cursorsPerJob
Definition FullTableScanner.hpp:145
The token range used to split the full table scan into multiple ranges.
Definition FullTableScanner.hpp:42
TokenRange(std::int64_t start, std::int64_t end)
Construct a new Token Range object.
Definition FullTableScanner.hpp:52