22#include "data/BackendCounters.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/cassandra/Handle.hpp"
25#include "data/cassandra/Types.hpp"
26#include "data/cassandra/impl/AsyncExecutor.hpp"
27#include "util/Assert.hpp"
28#include "util/Batching.hpp"
29#include "util/log/Logger.hpp"
31#include <boost/asio.hpp>
32#include <boost/asio/associated_executor.hpp>
33#include <boost/asio/io_context.hpp>
34#include <boost/asio/io_service.hpp>
35#include <boost/asio/spawn.hpp>
36#include <boost/json/object.hpp>
41#include <condition_variable>
53namespace data::cassandra::impl {
63template <
typename HandleType = Handle, SomeBackendCounters BackendCountersType = BackendCounters>
67 std::uint32_t maxWriteRequestsOutstanding_;
68 std::atomic_uint32_t numWriteRequestsOutstanding_ = 0;
70 std::uint32_t maxReadRequestsOutstanding_;
71 std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
73 std::size_t writeBatchSize_;
75 std::mutex throttleMutex_;
76 std::condition_variable throttleCv_;
78 std::mutex syncMutex_;
79 std::condition_variable syncCv_;
81 boost::asio::io_context ioc_;
82 std::optional<boost::asio::io_service::work> work_;
84 std::reference_wrapper<HandleType const> handle_;
87 typename BackendCountersType::PtrType counters_;
90 using ResultOrErrorType =
typename HandleType::ResultOrErrorType;
91 using StatementType =
typename HandleType::StatementType;
92 using PreparedStatementType =
typename HandleType::PreparedStatementType;
93 using FutureType =
typename HandleType::FutureType;
94 using FutureWithCallbackType =
typename HandleType::FutureWithCallbackType;
95 using ResultType =
typename HandleType::ResultType;
96 using CompletionTokenType = boost::asio::yield_context;
104 HandleType
const& handle,
105 typename BackendCountersType::PtrType counters = BackendCountersType::make()
107 : maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding}
108 , maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding}
109 , writeBatchSize_{settings.writeBatchSize}
111 , handle_{std::cref(handle)}
112 , thread_{[this]() { ioc_.run(); }}
113 , counters_{std::move(counters)}
115 LOG(log_.info()) <<
"Max write requests outstanding is " << maxWriteRequestsOutstanding_
116 <<
"; Max read requests outstanding is " << maxReadRequestsOutstanding_;
119 ~DefaultExecutionStrategy()
132 LOG(log_.debug()) <<
"Waiting to sync all writes...";
133 std::unique_lock<std::mutex> lck(syncMutex_);
134 syncCv_.wait(lck, [
this]() {
return finishedAllWriteRequests(); });
135 LOG(log_.debug()) <<
"Sync done.";
144 bool const result = numReadRequestsOutstanding_ >= maxReadRequestsOutstanding_;
146 counters_->registerTooBusy();
158 auto const startTime = std::chrono::steady_clock::now();
160 auto res = handle_.get().execute(statement);
162 counters_->registerWriteSync(startTime);
166 counters_->registerWriteSyncRetry();
167 LOG(log_.warn()) <<
"Cassandra sync write error, retrying: " << res.error();
168 std::this_thread::sleep_for(std::chrono::milliseconds(5));
177 template <
typename... Args>
179 writeSync(PreparedStatementType
const& preparedStatement, Args&&... args)
181 return writeSync(preparedStatement.bind(std::forward<Args>(args)...));
193 template <
typename... Args>
195 write(PreparedStatementType
const& preparedStatement, Args&&... args)
197 auto statement = preparedStatement.bind(std::forward<Args>(args)...);
198 write(std::move(statement));
212 auto const startTime = std::chrono::steady_clock::now();
214 incrementOutstandingRequestCount();
216 counters_->registerWriteStarted();
218 AsyncExecutor<std::decay_t<
decltype(statement)>, HandleType>::run(
221 std::move(statement),
222 [
this, startTime](
auto const&) {
223 decrementOutstandingRequestCount();
225 counters_->registerWriteFinished(startTime);
227 [
this]() { counters_->registerWriteRetry(); }
240 write(std::vector<StatementType>&& statements)
242 if (statements.empty())
245 util::forEachBatch(std::move(statements), writeBatchSize_, [
this](
auto begin,
auto end) {
246 auto const startTime = std::chrono::steady_clock::now();
247 auto chunk = std::vector<StatementType>{};
249 chunk.reserve(std::distance(begin, end));
250 std::move(begin, end, std::back_inserter(chunk));
252 incrementOutstandingRequestCount();
253 counters_->registerWriteStarted();
256 AsyncExecutor<std::decay_t<
decltype(chunk)>, HandleType>::run(
260 [
this, startTime](
auto const&) {
261 decrementOutstandingRequestCount();
262 counters_->registerWriteFinished(startTime);
264 [
this]() { counters_->registerWriteRetry(); }
281 std::ranges::for_each(std::move(statements), [
this](
auto& statement) { this->write(std::move(statement)); });
295 template <
typename... Args>
296 [[maybe_unused]] ResultOrErrorType
297 read(CompletionTokenType token, PreparedStatementType
const& preparedStatement, Args&&... args)
299 return read(token, preparedStatement.bind(std::forward<Args>(args)...));
312 [[maybe_unused]] ResultOrErrorType
313 read(CompletionTokenType token, std::vector<StatementType>
const& statements)
315 auto const startTime = std::chrono::steady_clock::now();
317 auto const numStatements = statements.size();
318 std::optional<FutureWithCallbackType> future;
319 counters_->registerReadStarted(numStatements);
323 numReadRequestsOutstanding_ += numStatements;
325 auto init = [
this, &statements, &future]<
typename Self>(Self& self) {
326 auto sself = std::make_shared<Self>(std::move(self));
328 future.emplace(handle_.get().asyncExecute(statements, [sself](
auto&& res)
mutable {
330 boost::asio::get_associated_executor(*sself),
331 [sself, res = std::forward<decltype(res)>(res)]() mutable { sself->complete(std::move(res)); }
336 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
337 init, token, boost::asio::get_associated_executor(token)
339 numReadRequestsOutstanding_ -= numStatements;
342 counters_->registerReadFinished(startTime, numStatements);
346 LOG(log_.error()) <<
"Failed batch read in coroutine: " << res.error();
348 throwErrorIfNeeded(res.error());
350 counters_->registerReadError(numStatements);
353 counters_->registerReadRetry(numStatements);
367 [[maybe_unused]] ResultOrErrorType
368 read(CompletionTokenType token, StatementType
const& statement)
370 auto const startTime = std::chrono::steady_clock::now();
372 std::optional<FutureWithCallbackType> future;
373 counters_->registerReadStarted();
377 ++numReadRequestsOutstanding_;
378 auto init = [
this, &statement, &future]<
typename Self>(Self& self) {
379 auto sself = std::make_shared<Self>(std::move(self));
381 future.emplace(handle_.get().asyncExecute(statement, [sself](
auto&& res)
mutable {
383 boost::asio::get_associated_executor(*sself),
384 [sself, res = std::forward<decltype(res)>(res)]() mutable { sself->complete(std::move(res)); }
389 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
390 init, token, boost::asio::get_associated_executor(token)
392 --numReadRequestsOutstanding_;
395 counters_->registerReadFinished(startTime);
399 LOG(log_.error()) <<
"Failed read in coroutine: " << res.error();
401 throwErrorIfNeeded(res.error());
403 counters_->registerReadError();
406 counters_->registerReadRetry();
421 std::vector<ResultType>
422 readEach(CompletionTokenType token, std::vector<StatementType>
const& statements)
424 auto const startTime = std::chrono::steady_clock::now();
426 std::atomic_uint64_t errorsCount = 0u;
427 std::atomic_int numOutstanding = statements.size();
428 numReadRequestsOutstanding_ += statements.size();
430 auto futures = std::vector<FutureWithCallbackType>{};
431 futures.reserve(numOutstanding);
432 counters_->registerReadStarted(statements.size());
434 auto init = [
this, &statements, &futures, &errorsCount, &numOutstanding]<
typename Self>(Self& self) {
435 auto sself = std::make_shared<Self>(std::move(self));
436 auto executionHandler = [&errorsCount, &numOutstanding, sself](
auto const& res)
mutable {
441 if (--numOutstanding == 0) {
442 boost::asio::post(boost::asio::get_associated_executor(*sself), [sself]()
mutable {
449 std::cbegin(statements),
450 std::cend(statements),
451 std::back_inserter(futures),
452 [
this, &executionHandler](
auto const& statement) {
453 return handle_.get().asyncExecute(statement, executionHandler);
458 boost::asio::async_compose<CompletionTokenType, void()>(
459 init, token, boost::asio::get_associated_executor(token)
461 numReadRequestsOutstanding_ -= statements.size();
463 if (errorsCount > 0) {
464 ASSERT(errorsCount <= statements.size(),
"Errors number cannot exceed statements number");
465 counters_->registerReadError(errorsCount);
466 counters_->registerReadFinished(startTime, statements.size() - errorsCount);
469 counters_->registerReadFinished(startTime, statements.size());
471 std::vector<ResultType> results;
472 results.reserve(futures.size());
476 std::make_move_iterator(std::begin(futures)),
477 std::make_move_iterator(std::end(futures)),
478 std::back_inserter(results),
480 auto entry = future.get();
481 auto&& res = entry.value();
482 return std::move(res);
487 futures.size() == statements.size(),
488 "Futures size must be equal to statements size. Got {} and {}",
493 results.size() == statements.size(),
494 "Results size must be equal to statements size. Got {} and {}",
507 return counters_->report();
512 incrementOutstandingRequestCount()
515 std::unique_lock<std::mutex> lck(throttleMutex_);
516 if (!canAddWriteRequest()) {
517 LOG(log_.trace()) <<
"Max outstanding requests reached. "
518 <<
"Waiting for other requests to finish";
519 throttleCv_.wait(lck, [
this]() {
return canAddWriteRequest(); });
522 ++numWriteRequestsOutstanding_;
526 decrementOutstandingRequestCount()
529 ASSERT(numWriteRequestsOutstanding_ > 0,
"Decrementing num outstanding below 0");
530 size_t const cur = (--numWriteRequestsOutstanding_);
534 std::lock_guard
const lck(throttleMutex_);
535 throttleCv_.notify_one();
540 std::lock_guard
const lck(syncMutex_);
541 syncCv_.notify_one();
546 canAddWriteRequest()
const
548 return numWriteRequestsOutstanding_ < maxWriteRequestsOutstanding_;
552 finishedAllWriteRequests()
const
554 return numWriteRequestsOutstanding_ == 0;
558 throwErrorIfNeeded(CassandraError err)
const
561 throw DatabaseTimeout();
563 if (err.isInvalidQuery())
564 throw std::runtime_error(
"Invalid query");
Represents a database timeout error.
Definition BackendInterface.hpp:56
A query executor with a changable retry policy.
Definition AsyncExecutor.hpp:55
Implements async and sync querying against the cassandra DB with support for throttling.
Definition ExecutionStrategy.hpp:64
ResultOrErrorType read(CompletionTokenType token, StatementType const &statement)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:368
void write(StatementType &&statement)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:210
ResultOrErrorType read(CompletionTokenType token, PreparedStatementType const &preparedStatement, Args &&... args)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:297
ResultOrErrorType read(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:313
void write(PreparedStatementType const &preparedStatement, Args &&... args)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:195
void write(std::vector< StatementType > &&statements)
Non-blocking batched query execution used for writing data.
Definition ExecutionStrategy.hpp:240
void sync()
Wait for all async writes to finish before unblocking.
Definition ExecutionStrategy.hpp:130
ResultOrErrorType writeSync(PreparedStatementType const &preparedStatement, Args &&... args)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:179
bool isTooBusy() const
Definition ExecutionStrategy.hpp:142
ResultOrErrorType writeSync(StatementType const &statement)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:156
std::vector< ResultType > readEach(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:422
boost::json::object stats() const
Get statistics about the backend.
Definition ExecutionStrategy.hpp:505
DefaultExecutionStrategy(Settings const &settings, HandleType const &handle, typename BackendCountersType::PtrType counters=BackendCountersType::make())
Definition ExecutionStrategy.hpp:102
void writeEach(std::vector< StatementType > &&statements)
Non-blocking query execution used for writing data. Constrast with write, this method does not execut...
Definition ExecutionStrategy.hpp:279
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
void forEachBatch(std::ranges::forward_range auto &&container, std::size_t batchSize, auto &&fn)
Iterate over a container in batches.
Definition Batching.hpp:38
Bundles all cassandra settings in one place.
Definition Cluster.hpp:43