Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
FullTableScanner.hpp
1#pragma once
2
3#include "etl/ETLHelpers.hpp"
4#include "util/Assert.hpp"
5#include "util/async/AnyExecutionContext.hpp"
6#include "util/async/AnyOperation.hpp"
7#include "util/async/context/BasicExecutionContext.hpp"
8
9#include <boost/asio/spawn.hpp>
10
11#include <algorithm>
12#include <cstddef>
13#include <cstdint>
14#include <limits>
15#include <ranges>
16#include <vector>
17
18namespace migration::cassandra::impl {
19
23struct TokenRange {
24 std::int64_t start;
25 std::int64_t end;
26
33 TokenRange(std::int64_t start, std::int64_t end) : start{start}, end{end}
34 {
35 }
36};
37
41template <typename T>
43 requires(T obj, TokenRange const& range, boost::asio::yield_context yield) {
44 { obj.readByTokenRange(range, yield) } -> std::same_as<void>;
45 };
46
53template <CanReadByTokenRange TableAdapter>
58 struct TokenRangesProvider {
59 uint32_t numRanges;
60
61 TokenRangesProvider(uint32_t numRanges) : numRanges{numRanges}
62 {
63 }
64
65 [[nodiscard]] std::vector<TokenRange>
66 getRanges() const
67 {
68 auto const minValue = std::numeric_limits<std::int64_t>::min();
69 auto const maxValue = std::numeric_limits<std::int64_t>::max();
70 if (numRanges == 1)
71 return {TokenRange{minValue, maxValue}};
72
73 // Safely calculate the range size using uint64_t to avoid overflow
74 uint64_t const rangeSize = (static_cast<uint64_t>(maxValue) * 2) / numRanges;
75
76 std::vector<TokenRange> ranges;
77 ranges.reserve(numRanges);
78
79 for (std::int64_t i = 0; i < numRanges; ++i) {
80 int64_t const start = minValue + (i * rangeSize);
81 int64_t const end =
82 (i == numRanges - 1) ? maxValue : start + static_cast<int64_t>(rangeSize) - 1;
83 ranges.emplace_back(start, end);
84 }
85
86 return ranges;
87 }
88 };
89
90 [[nodiscard]] auto
91 spawnWorker()
92 {
93 return ctx_.execute([this](auto token) {
94 while (not token.isStopRequested()) {
95 auto cursor = queue_.tryPop();
96 if (not cursor.has_value()) {
97 return; // queue is empty
98 }
99 reader_.readByTokenRange(cursor.value(), token);
100 }
101 });
102 }
103
104 void
105 load(size_t workerNum)
106 {
107 namespace vs = std::views;
108
109 tasks_.reserve(workerNum);
110
111 for ([[maybe_unused]] auto taskId : vs::iota(0u, workerNum))
112 tasks_.push_back(spawnWorker());
113 }
114
116 std::size_t cursorsNum_;
118 std::vector<util::async::AnyOperation<void>> tasks_;
119 TableAdapter reader_;
120
121public:
126 std::uint32_t ctxThreadsNum;
127 std::uint32_t jobsNum;
129 std::uint32_t cursorsPerJob;
130 };
131
140 template <typename ExecutionContextType = util::async::CoroExecutionContext>
141 FullTableScanner(FullTableScannerSettings settings, TableAdapter&& reader)
142 : ctx_(ExecutionContextType(settings.ctxThreadsNum))
143 , cursorsNum_(settings.jobsNum * settings.cursorsPerJob)
144 , queue_{cursorsNum_}
145 , reader_{std::move(reader)}
146 {
147 ASSERT(settings.jobsNum > 0, "jobsNum for full table scanner must be greater than 0");
148 ASSERT(
149 settings.cursorsPerJob > 0,
150 "cursorsPerJob for full table scanner must be greater than 0"
151 );
152
153 auto const cursors = TokenRangesProvider{cursorsNum_}.getRanges();
154 std::ranges::for_each(cursors, [this](auto const& cursor) { queue_.push(cursor); });
155 load(settings.jobsNum);
156 }
157
161 void
163 {
164 for (auto& task : tasks_) {
165 task.wait();
166 }
167 }
168};
169
170} // namespace migration::cassandra::impl
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:26
void wait()
Wait for all workers to finish.
Definition FullTableScanner.hpp:162
FullTableScanner(FullTableScannerSettings settings, TableAdapter &&reader)
Construct a new Full Table Scanner object, it will run in a sync or async context according to the pa...
Definition FullTableScanner.hpp:141
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
The concept for an adapter.
Definition FullTableScanner.hpp:42
The full table scanner settings.
Definition FullTableScanner.hpp:125
std::uint32_t ctxThreadsNum
Definition FullTableScanner.hpp:126
std::uint32_t jobsNum
Definition FullTableScanner.hpp:127
std::uint32_t cursorsPerJob
Definition FullTableScanner.hpp:129
The token range used to split the full table scan into multiple ranges.
Definition FullTableScanner.hpp:23
TokenRange(std::int64_t start, std::int64_t end)
Construct a new Token Range object.
Definition FullTableScanner.hpp:33