23#include "util/Assert.hpp"
24#include "util/async/AnyExecutionContext.hpp"
25#include "util/async/AnyOperation.hpp"
26#include "util/async/context/BasicExecutionContext.hpp"
28#include <boost/asio/spawn.hpp>
37namespace migration::cassandra::impl {
52 TokenRange(std::int64_t start, std::int64_t end) : start{start}, end{end}
61concept CanReadByTokenRange =
requires(T obj, TokenRange
const& range, boost::asio::yield_context yield) {
62 { obj.readByTokenRange(range, yield) } -> std::same_as<void>;
71template <CanReadByTokenRange TableAdapter>
76 struct TokenRangesProvider {
79 TokenRangesProvider(uint32_t numRanges) : numRanges{numRanges}
83 [[nodiscard]] std::vector<TokenRange>
86 auto const minValue = std::numeric_limits<std::int64_t>::min();
87 auto const maxValue = std::numeric_limits<std::int64_t>::max();
92 uint64_t
const rangeSize = (
static_cast<uint64_t
>(maxValue) * 2) / numRanges;
94 std::vector<TokenRange> ranges;
95 ranges.reserve(numRanges);
97 for (std::int64_t i = 0; i < numRanges; ++i) {
98 int64_t
const start = minValue + (i * rangeSize);
99 int64_t
const end = (i == numRanges - 1) ? maxValue : start +
static_cast<int64_t
>(rangeSize) - 1;
100 ranges.emplace_back(start, end);
110 return ctx_.
execute([
this](
auto token) {
111 while (not token.isStopRequested()) {
112 auto cursor = queue_.tryPop();
113 if (not cursor.has_value()) {
116 reader_.readByTokenRange(cursor.value(), token);
122 load(
size_t workerNum)
124 namespace vs = std::views;
126 tasks_.reserve(workerNum);
128 for ([[maybe_unused]]
auto taskId : vs::iota(0u, workerNum))
129 tasks_.push_back(spawnWorker());
133 std::size_t cursorsNum_;
135 std::vector<util::async::AnyOperation<void>> tasks_;
136 TableAdapter reader_;
156 template <
typename ExecutionContextType = util::async::CoroExecutionContext>
158 : ctx_(ExecutionContextType(settings.ctxThreadsNum))
159 , cursorsNum_(settings.jobsNum * settings.cursorsPerJob)
160 , queue_{cursorsNum_}
161 , reader_{std::move(reader)}
163 ASSERT(settings.
jobsNum > 0,
"jobsNum for full table scanner must be greater than 0");
164 ASSERT(settings.
cursorsPerJob > 0,
"cursorsPerJob for full table scanner must be greater than 0");
166 auto const cursors = TokenRangesProvider{cursorsNum_}.getRanges();
167 std::ranges::for_each(cursors, [
this](
auto const& cursor) { queue_.
push(cursor); });
177 for (
auto& task : tasks_) {
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:44
void push(T const &elt)
Push element onto the queue.
Definition ETLHelpers.hpp:70
The full table scanner. It will split the full table scan into multiple ranges and read the data in g...
Definition FullTableScanner.hpp:72
void wait()
Wait for all workers to finish.
Definition FullTableScanner.hpp:175
FullTableScanner(FullTableScannerSettings settings, TableAdapter &&reader)
Construct a new Full Table Scanner object, it will run in a sync or async context according to the pa...
Definition FullTableScanner.hpp:157
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
auto execute(SomeHandlerWithoutStopToken auto &&fn)
Execute a function on the execution context.
Definition AnyExecutionContext.hpp:86
The concept for an adapter.
Definition FullTableScanner.hpp:61
The full table scanner settings.
Definition FullTableScanner.hpp:142
std::uint32_t ctxThreadsNum
Definition FullTableScanner.hpp:143
std::uint32_t jobsNum
Definition FullTableScanner.hpp:144
std::uint32_t cursorsPerJob
Definition FullTableScanner.hpp:145
The token range used to split the full table scan into multiple ranges.
Definition FullTableScanner.hpp:42
TokenRange(std::int64_t start, std::int64_t end)
Construct a new Token Range object.
Definition FullTableScanner.hpp:52