4#include "util/Assert.hpp"
5#include "util/async/AnyExecutionContext.hpp"
6#include "util/async/AnyOperation.hpp"
7#include "util/async/context/BasicExecutionContext.hpp"
9#include <boost/asio/spawn.hpp>
18namespace migration::cassandra::impl {
33 TokenRange(std::int64_t start, std::int64_t end) : start{start}, end{end}
43 requires(T obj,
TokenRange const& range, boost::asio::yield_context yield) {
44 { obj.readByTokenRange(range, yield) } -> std::same_as<void>;
53template <CanReadByTokenRange TableAdapter>
58 struct TokenRangesProvider {
61 TokenRangesProvider(uint32_t numRanges) : numRanges{numRanges}
65 [[nodiscard]] std::vector<TokenRange>
68 auto const minValue = std::numeric_limits<std::int64_t>::min();
69 auto const maxValue = std::numeric_limits<std::int64_t>::max();
74 uint64_t
const rangeSize = (
static_cast<uint64_t
>(maxValue) * 2) / numRanges;
76 std::vector<TokenRange> ranges;
77 ranges.reserve(numRanges);
79 for (std::int64_t i = 0; i < numRanges; ++i) {
80 int64_t
const start = minValue + (i * rangeSize);
82 (i == numRanges - 1) ? maxValue : start +
static_cast<int64_t
>(rangeSize) - 1;
83 ranges.emplace_back(start, end);
93 return ctx_.execute([
this](
auto token) {
94 while (not token.isStopRequested()) {
95 auto cursor = queue_.tryPop();
96 if (not cursor.has_value()) {
99 reader_.readByTokenRange(cursor.value(), token);
105 load(
size_t workerNum)
107 namespace vs = std::views;
109 tasks_.reserve(workerNum);
111 for ([[maybe_unused]]
auto taskId : vs::iota(0u, workerNum))
112 tasks_.push_back(spawnWorker());
116 std::size_t cursorsNum_;
118 std::vector<util::async::AnyOperation<void>> tasks_;
119 TableAdapter reader_;
140 template <
typename ExecutionContextType = util::async::CoroExecutionContext>
142 : ctx_(ExecutionContextType(settings.ctxThreadsNum))
143 , cursorsNum_(settings.jobsNum * settings.cursorsPerJob)
144 , queue_{cursorsNum_}
145 , reader_{std::move(reader)}
147 ASSERT(settings.jobsNum > 0,
"jobsNum for full table scanner must be greater than 0");
149 settings.cursorsPerJob > 0,
150 "cursorsPerJob for full table scanner must be greater than 0"
153 auto const cursors = TokenRangesProvider{cursorsNum_}.getRanges();
154 std::ranges::for_each(cursors, [
this](
auto const& cursor) { queue_.push(cursor); });
155 load(settings.jobsNum);
164 for (
auto& task : tasks_) {
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:26
void wait()
Wait for all workers to finish.
Definition FullTableScanner.hpp:162
FullTableScanner(FullTableScannerSettings settings, TableAdapter &&reader)
Construct a new Full Table Scanner object, it will run in a sync or async context according to the pa...
Definition FullTableScanner.hpp:141
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
The concept for an adapter.
Definition FullTableScanner.hpp:42
The full table scanner settings.
Definition FullTableScanner.hpp:125
std::uint32_t ctxThreadsNum
Definition FullTableScanner.hpp:126
std::uint32_t jobsNum
Definition FullTableScanner.hpp:127
std::uint32_t cursorsPerJob
Definition FullTableScanner.hpp:129
The token range used to split the full table scan into multiple ranges.
Definition FullTableScanner.hpp:23
TokenRange(std::int64_t start, std::int64_t end)
Construct a new Token Range object.
Definition FullTableScanner.hpp:33