22#include "data/BackendCounters.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/cassandra/Handle.hpp"
25#include "data/cassandra/Types.hpp"
26#include "data/cassandra/impl/AsyncExecutor.hpp"
27#include "util/Assert.hpp"
28#include "util/Batching.hpp"
29#include "util/log/Logger.hpp"
31#include <boost/asio.hpp>
32#include <boost/asio/associated_executor.hpp>
33#include <boost/asio/executor_work_guard.hpp>
34#include <boost/asio/io_context.hpp>
35#include <boost/asio/spawn.hpp>
36#include <boost/json/object.hpp>
41#include <condition_variable>
53namespace data::cassandra::impl {
63template <
typename HandleType = Handle, SomeBackendCounters BackendCountersType = BackendCounters>
67 std::uint32_t maxWriteRequestsOutstanding_;
68 std::atomic_uint32_t numWriteRequestsOutstanding_ = 0;
70 std::uint32_t maxReadRequestsOutstanding_;
71 std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
73 std::size_t writeBatchSize_;
75 std::mutex throttleMutex_;
76 std::condition_variable throttleCv_;
78 std::mutex syncMutex_;
79 std::condition_variable syncCv_;
81 boost::asio::io_context ioc_;
82 std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
84 std::reference_wrapper<HandleType const> handle_;
87 typename BackendCountersType::PtrType counters_;
90 using ResultOrErrorType =
typename HandleType::ResultOrErrorType;
91 using StatementType =
typename HandleType::StatementType;
92 using PreparedStatementType =
typename HandleType::PreparedStatementType;
93 using FutureType =
typename HandleType::FutureType;
94 using FutureWithCallbackType =
typename HandleType::FutureWithCallbackType;
95 using ResultType =
typename HandleType::ResultType;
96 using CompletionTokenType = boost::asio::yield_context;
104 HandleType
const& handle,
105 typename BackendCountersType::PtrType counters = BackendCountersType::make()
107 : maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding}
108 , maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding}
109 , writeBatchSize_{settings.writeBatchSize}
110 , work_{boost::asio::make_work_guard(ioc_)}
111 , handle_{std::cref(handle)}
112 , thread_{[this]() { ioc_.run(); }}
113 , counters_{std::move(counters)}
115 LOG(log_.
info()) <<
"Max write requests outstanding is " << maxWriteRequestsOutstanding_
116 <<
"; Max read requests outstanding is " << maxReadRequestsOutstanding_;
119 ~DefaultExecutionStrategy()
132 LOG(log_.debug()) <<
"Waiting to sync all writes...";
133 std::unique_lock<std::mutex> lck(syncMutex_);
134 syncCv_.wait(lck, [
this]() {
return finishedAllWriteRequests(); });
135 LOG(log_.debug()) <<
"Sync done.";
144 bool const result = numReadRequestsOutstanding_ >= maxReadRequestsOutstanding_;
146 counters_->registerTooBusy();
158 auto const startTime = std::chrono::steady_clock::now();
160 auto res = handle_.get().execute(statement);
162 counters_->registerWriteSync(startTime);
166 counters_->registerWriteSyncRetry();
167 LOG(log_.warn()) <<
"Cassandra sync write error, retrying: " << res.error();
168 std::this_thread::sleep_for(std::chrono::milliseconds(5));
177 template <
typename... Args>
179 writeSync(PreparedStatementType
const& preparedStatement, Args&&... args)
181 return writeSync(preparedStatement.bind(std::forward<Args>(args)...));
193 template <
typename... Args>
195 write(PreparedStatementType
const& preparedStatement, Args&&... args)
197 auto statement = preparedStatement.bind(std::forward<Args>(args)...);
198 write(std::move(statement));
212 auto const startTime = std::chrono::steady_clock::now();
214 incrementOutstandingRequestCount();
216 counters_->registerWriteStarted();
218 AsyncExecutor<std::decay_t<
decltype(statement)>, HandleType>::run(
221 std::move(statement),
222 [
this, startTime](
auto const&) {
223 decrementOutstandingRequestCount();
225 counters_->registerWriteFinished(startTime);
227 [
this]() { counters_->registerWriteRetry(); }
240 write(std::vector<StatementType>&& statements)
242 if (statements.empty())
245 util::forEachBatch(std::move(statements), writeBatchSize_, [
this](
auto begin,
auto end) {
246 auto const startTime = std::chrono::steady_clock::now();
247 auto chunk = std::vector<StatementType>{};
249 chunk.reserve(std::distance(begin, end));
250 std::move(begin, end, std::back_inserter(chunk));
252 incrementOutstandingRequestCount();
253 counters_->registerWriteStarted();
256 AsyncExecutor<std::decay_t<
decltype(chunk)>, HandleType>::run(
260 [
this, startTime](
auto const&) {
261 decrementOutstandingRequestCount();
262 counters_->registerWriteFinished(startTime);
264 [
this]() { counters_->registerWriteRetry(); }
281 std::ranges::for_each(std::move(statements), [
this](
auto& statement) {
282 this->
write(std::move(statement));
297 template <
typename... Args>
298 [[maybe_unused]] ResultOrErrorType
299 read(CompletionTokenType token, PreparedStatementType
const& preparedStatement, Args&&... args)
301 return read(token, preparedStatement.bind(std::forward<Args>(args)...));
314 [[maybe_unused]] ResultOrErrorType
315 read(CompletionTokenType token, std::vector<StatementType>
const& statements)
317 auto const startTime = std::chrono::steady_clock::now();
319 auto const numStatements = statements.size();
320 std::optional<FutureWithCallbackType> future;
321 counters_->registerReadStarted(numStatements);
325 numReadRequestsOutstanding_ += numStatements;
327 auto init = [
this, &statements, &future]<
typename Self>(Self& self) {
328 auto sself = std::make_shared<Self>(std::move(self));
330 future.emplace(handle_.get().asyncExecute(statements, [sself](
auto&& res)
mutable {
332 boost::asio::get_associated_executor(*sself),
333 [sself, res = std::forward<decltype(res)>(res)]() mutable {
334 sself->complete(std::move(res));
340 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
341 std::move(init), token, boost::asio::get_associated_executor(token)
343 numReadRequestsOutstanding_ -= numStatements;
346 counters_->registerReadFinished(startTime, numStatements);
350 LOG(log_.error()) <<
"Failed batch read in coroutine: " << res.error();
352 throwErrorIfNeeded(res.error());
354 counters_->registerReadError(numStatements);
357 counters_->registerReadRetry(numStatements);
371 [[maybe_unused]] ResultOrErrorType
372 read(CompletionTokenType token, StatementType
const& statement)
374 auto const startTime = std::chrono::steady_clock::now();
376 std::optional<FutureWithCallbackType> future;
377 counters_->registerReadStarted();
381 ++numReadRequestsOutstanding_;
382 auto init = [
this, &statement, &future]<
typename Self>(Self& self) {
383 auto sself = std::make_shared<Self>(std::move(self));
385 future.emplace(handle_.get().asyncExecute(statement, [sself](
auto&& res)
mutable {
387 boost::asio::get_associated_executor(*sself),
388 [sself, res = std::forward<decltype(res)>(res)]() mutable {
389 sself->complete(std::move(res));
395 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
396 std::move(init), token, boost::asio::get_associated_executor(token)
398 --numReadRequestsOutstanding_;
401 counters_->registerReadFinished(startTime);
405 LOG(log_.error()) <<
"Failed read in coroutine: " << res.error();
407 throwErrorIfNeeded(res.error());
409 counters_->registerReadError();
412 counters_->registerReadRetry();
427 std::vector<ResultType>
428 readEach(CompletionTokenType token, std::vector<StatementType>
const& statements)
430 auto const startTime = std::chrono::steady_clock::now();
432 std::atomic_uint64_t errorsCount = 0u;
433 std::atomic_int numOutstanding = statements.size();
434 numReadRequestsOutstanding_ += statements.size();
436 auto futures = std::vector<FutureWithCallbackType>{};
437 futures.reserve(numOutstanding);
438 counters_->registerReadStarted(statements.size());
440 auto init = [
this, &statements, &futures, &errorsCount, &numOutstanding]<
typename Self>(
443 auto sself = std::make_shared<Self>(std::move(self));
444 auto executionHandler =
445 [&errorsCount, &numOutstanding, sself](
auto const& res)
mutable {
450 if (--numOutstanding == 0) {
452 boost::asio::get_associated_executor(*sself),
453 [sself]()
mutable { sself->complete(); }
459 std::cbegin(statements),
460 std::cend(statements),
461 std::back_inserter(futures),
462 [
this, &executionHandler](
auto const& statement) {
463 return handle_.get().asyncExecute(statement, executionHandler);
468 boost::asio::async_compose<CompletionTokenType, void()>(
469 std::move(init), token, boost::asio::get_associated_executor(token)
471 numReadRequestsOutstanding_ -= statements.size();
473 if (errorsCount > 0) {
475 errorsCount <= statements.size(),
"Errors number cannot exceed statements number"
477 counters_->registerReadError(errorsCount);
478 counters_->registerReadFinished(startTime, statements.size() - errorsCount);
481 counters_->registerReadFinished(startTime, statements.size());
483 std::vector<ResultType> results;
484 results.reserve(futures.size());
489 std::make_move_iterator(std::begin(futures)),
490 std::make_move_iterator(std::end(futures)),
491 std::back_inserter(results),
493 auto entry = future.get();
494 auto&& res = entry.value();
495 return std::move(res);
500 futures.size() == statements.size(),
501 "Futures size must be equal to statements size. Got {} and {}",
506 results.size() == statements.size(),
507 "Results size must be equal to statements size. Got {} and {}",
520 return counters_->report();
525 incrementOutstandingRequestCount()
528 std::unique_lock<std::mutex> lck(throttleMutex_);
529 if (!canAddWriteRequest()) {
530 LOG(log_.
trace()) <<
"Max outstanding requests reached. "
531 <<
"Waiting for other requests to finish";
532 throttleCv_.wait(lck, [
this]() {
return canAddWriteRequest(); });
535 ++numWriteRequestsOutstanding_;
539 decrementOutstandingRequestCount()
542 ASSERT(numWriteRequestsOutstanding_ > 0,
"Decrementing num outstanding below 0");
543 size_t const cur = (--numWriteRequestsOutstanding_);
547 std::lock_guard
const lck(throttleMutex_);
548 throttleCv_.notify_one();
553 std::lock_guard
const lck(syncMutex_);
554 syncCv_.notify_one();
559 canAddWriteRequest()
const
561 return numWriteRequestsOutstanding_ < maxWriteRequestsOutstanding_;
565 finishedAllWriteRequests()
const
567 return numWriteRequestsOutstanding_ == 0;
571 throwErrorIfNeeded(CassandraError err)
const
574 throw DatabaseTimeout();
576 if (err.isInvalidQuery())
577 throw std::runtime_error(
"Invalid query");
Represents a database timeout error.
Definition BackendInterface.hpp:59
A query executor with a changeable retry policy.
Definition AsyncExecutor.hpp:56
ResultOrErrorType read(CompletionTokenType token, StatementType const &statement)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:372
void write(StatementType &&statement)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:210
ResultOrErrorType read(CompletionTokenType token, PreparedStatementType const &preparedStatement, Args &&... args)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:299
ResultOrErrorType read(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:315
void write(PreparedStatementType const &preparedStatement, Args &&... args)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:195
void write(std::vector< StatementType > &&statements)
Non-blocking batched query execution used for writing data.
Definition ExecutionStrategy.hpp:240
void sync()
Wait for all async writes to finish before unblocking.
Definition ExecutionStrategy.hpp:130
ResultOrErrorType writeSync(PreparedStatementType const &preparedStatement, Args &&... args)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:179
bool isTooBusy() const
Definition ExecutionStrategy.hpp:142
ResultOrErrorType writeSync(StatementType const &statement)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:156
std::vector< ResultType > readEach(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:428
boost::json::object stats() const
Get statistics about the backend.
Definition ExecutionStrategy.hpp:518
DefaultExecutionStrategy(Settings const &settings, HandleType const &handle, typename BackendCountersType::PtrType counters=BackendCountersType::make())
Definition ExecutionStrategy.hpp:102
void writeEach(std::vector< StatementType > &&statements)
Non-blocking query execution used for writing data. Contrast with write, this method does not execute...
Definition ExecutionStrategy.hpp:279
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:497
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:507
void forEachBatch(std::ranges::forward_range auto &&container, std::size_t batchSize, auto &&fn)
Iterate over a container in batches.
Definition Batching.hpp:38
Bundles all cassandra settings in one place.
Definition Cluster.hpp:56