3#include "data/BackendCounters.hpp"
4#include "data/BackendInterface.hpp"
5#include "data/cassandra/Handle.hpp"
6#include "data/cassandra/Types.hpp"
7#include "data/cassandra/impl/AsyncExecutor.hpp"
8#include "util/Assert.hpp"
9#include "util/Batching.hpp"
10#include "util/log/Logger.hpp"
12#include <boost/asio.hpp>
13#include <boost/asio/associated_executor.hpp>
14#include <boost/asio/executor_work_guard.hpp>
15#include <boost/asio/io_context.hpp>
16#include <boost/asio/spawn.hpp>
17#include <boost/json/object.hpp>
22#include <condition_variable>
34namespace data::cassandra::impl {
44template <
typename HandleType = Handle, SomeBackendCounters BackendCountersType = BackendCounters>
48 std::uint32_t maxWriteRequestsOutstanding_;
49 std::atomic_uint32_t numWriteRequestsOutstanding_ = 0;
51 std::uint32_t maxReadRequestsOutstanding_;
52 std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
54 std::size_t writeBatchSize_;
56 std::mutex throttleMutex_;
57 std::condition_variable throttleCv_;
59 std::mutex syncMutex_;
60 std::condition_variable syncCv_;
62 boost::asio::io_context ioc_;
63 std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
65 std::reference_wrapper<HandleType const> handle_;
68 typename BackendCountersType::PtrType counters_;
71 using ResultOrErrorType =
typename HandleType::ResultOrErrorType;
72 using StatementType =
typename HandleType::StatementType;
73 using PreparedStatementType =
typename HandleType::PreparedStatementType;
74 using FutureType =
typename HandleType::FutureType;
75 using FutureWithCallbackType =
typename HandleType::FutureWithCallbackType;
76 using ResultType =
typename HandleType::ResultType;
77 using CompletionTokenType = boost::asio::yield_context;
85 HandleType
const& handle,
86 typename BackendCountersType::PtrType counters = BackendCountersType::make()
88 : maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding}
89 , maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding}
90 , writeBatchSize_{settings.writeBatchSize}
91 , work_{boost::asio::make_work_guard(ioc_)}
92 , handle_{std::cref(handle)}
93 , thread_{[this]() { ioc_.run(); }}
94 , counters_{std::move(counters)}
96 LOG(log_.
info()) <<
"Max write requests outstanding is " << maxWriteRequestsOutstanding_
97 <<
"; Max read requests outstanding is " << maxReadRequestsOutstanding_;
100 ~DefaultExecutionStrategy()
113 LOG(log_.debug()) <<
"Waiting to sync all writes...";
114 std::unique_lock<std::mutex> lck(syncMutex_);
115 syncCv_.wait(lck, [
this]() {
return finishedAllWriteRequests(); });
116 LOG(log_.debug()) <<
"Sync done.";
125 bool const result = numReadRequestsOutstanding_ >= maxReadRequestsOutstanding_;
127 counters_->registerTooBusy();
139 auto const startTime = std::chrono::steady_clock::now();
141 auto res = handle_.get().execute(statement);
143 counters_->registerWriteSync(startTime);
147 counters_->registerWriteSyncRetry();
148 LOG(log_.warn()) <<
"Cassandra sync write error, retrying: " << res.error();
149 std::this_thread::sleep_for(std::chrono::milliseconds(5));
158 template <
typename... Args>
160 writeSync(PreparedStatementType
const& preparedStatement, Args&&... args)
162 return writeSync(preparedStatement.bind(std::forward<Args>(args)...));
174 template <
typename... Args>
176 write(PreparedStatementType
const& preparedStatement, Args&&... args)
178 auto statement = preparedStatement.bind(std::forward<Args>(args)...);
179 write(std::move(statement));
193 auto const startTime = std::chrono::steady_clock::now();
195 incrementOutstandingRequestCount();
197 counters_->registerWriteStarted();
199 AsyncExecutor<std::decay_t<
decltype(statement)>, HandleType>::run(
202 std::move(statement),
203 [
this, startTime](
auto const&) {
204 decrementOutstandingRequestCount();
206 counters_->registerWriteFinished(startTime);
208 [
this]() { counters_->registerWriteRetry(); }
221 write(std::vector<StatementType>&& statements)
223 if (statements.empty())
226 util::forEachBatch(std::move(statements), writeBatchSize_, [
this](
auto begin,
auto end) {
227 auto const startTime = std::chrono::steady_clock::now();
228 auto chunk = std::vector<StatementType>{};
230 chunk.reserve(std::distance(begin, end));
231 std::move(begin, end, std::back_inserter(chunk));
233 incrementOutstandingRequestCount();
234 counters_->registerWriteStarted();
237 AsyncExecutor<std::decay_t<
decltype(chunk)>, HandleType>::run(
241 [
this, startTime](
auto const&) {
242 decrementOutstandingRequestCount();
243 counters_->registerWriteFinished(startTime);
245 [
this]() { counters_->registerWriteRetry(); }
262 std::ranges::for_each(std::move(statements), [
this](
auto& statement) {
263 this->
write(std::move(statement));
278 template <
typename... Args>
279 [[maybe_unused]] ResultOrErrorType
280 read(CompletionTokenType token, PreparedStatementType
const& preparedStatement, Args&&... args)
282 return read(token, preparedStatement.bind(std::forward<Args>(args)...));
295 [[maybe_unused]] ResultOrErrorType
296 read(CompletionTokenType token, std::vector<StatementType>
const& statements)
298 auto const startTime = std::chrono::steady_clock::now();
300 auto const numStatements = statements.size();
301 std::optional<FutureWithCallbackType> future;
302 counters_->registerReadStarted(numStatements);
306 numReadRequestsOutstanding_ += numStatements;
308 auto init = [
this, &statements, &future]<
typename Self>(Self& self) {
309 auto sself = std::make_shared<Self>(std::move(self));
311 future.emplace(handle_.get().asyncExecute(statements, [sself](
auto&& res)
mutable {
313 boost::asio::get_associated_executor(*sself),
314 [sself, res = std::forward<decltype(res)>(res)]() mutable {
315 sself->complete(std::move(res));
321 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
322 std::move(init), token, boost::asio::get_associated_executor(token)
324 numReadRequestsOutstanding_ -= numStatements;
327 counters_->registerReadFinished(startTime, numStatements);
331 LOG(log_.error()) <<
"Failed batch read in coroutine: " << res.error();
333 throwErrorIfNeeded(res.error());
335 counters_->registerReadError(numStatements);
338 counters_->registerReadRetry(numStatements);
352 [[maybe_unused]] ResultOrErrorType
353 read(CompletionTokenType token, StatementType
const& statement)
355 auto const startTime = std::chrono::steady_clock::now();
357 std::optional<FutureWithCallbackType> future;
358 counters_->registerReadStarted();
362 ++numReadRequestsOutstanding_;
363 auto init = [
this, &statement, &future]<
typename Self>(Self& self) {
364 auto sself = std::make_shared<Self>(std::move(self));
366 future.emplace(handle_.get().asyncExecute(statement, [sself](
auto&& res)
mutable {
368 boost::asio::get_associated_executor(*sself),
369 [sself, res = std::forward<decltype(res)>(res)]() mutable {
370 sself->complete(std::move(res));
376 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
377 std::move(init), token, boost::asio::get_associated_executor(token)
379 --numReadRequestsOutstanding_;
382 counters_->registerReadFinished(startTime);
386 LOG(log_.error()) <<
"Failed read in coroutine: " << res.error();
388 throwErrorIfNeeded(res.error());
390 counters_->registerReadError();
393 counters_->registerReadRetry();
408 std::vector<ResultType>
409 readEach(CompletionTokenType token, std::vector<StatementType>
const& statements)
411 auto const startTime = std::chrono::steady_clock::now();
413 std::atomic_uint64_t errorsCount = 0u;
414 std::atomic_int numOutstanding = statements.size();
415 numReadRequestsOutstanding_ += statements.size();
417 auto futures = std::vector<FutureWithCallbackType>{};
418 futures.reserve(numOutstanding);
419 counters_->registerReadStarted(statements.size());
421 auto init = [
this, &statements, &futures, &errorsCount, &numOutstanding]<
typename Self>(
424 auto sself = std::make_shared<Self>(std::move(self));
425 auto executionHandler =
426 [&errorsCount, &numOutstanding, sself](
auto const& res)
mutable {
431 if (--numOutstanding == 0) {
433 boost::asio::get_associated_executor(*sself),
434 [sself]()
mutable { sself->complete(); }
440 std::cbegin(statements),
441 std::cend(statements),
442 std::back_inserter(futures),
443 [
this, &executionHandler](
auto const& statement) {
444 return handle_.get().asyncExecute(statement, executionHandler);
449 boost::asio::async_compose<CompletionTokenType, void()>(
450 std::move(init), token, boost::asio::get_associated_executor(token)
452 numReadRequestsOutstanding_ -= statements.size();
454 if (errorsCount > 0) {
456 errorsCount <= statements.size(),
"Errors number cannot exceed statements number"
458 counters_->registerReadError(errorsCount);
459 counters_->registerReadFinished(startTime, statements.size() - errorsCount);
462 counters_->registerReadFinished(startTime, statements.size());
464 std::vector<ResultType> results;
465 results.reserve(futures.size());
470 std::make_move_iterator(std::begin(futures)),
471 std::make_move_iterator(std::end(futures)),
472 std::back_inserter(results),
474 auto entry = future.get();
475 auto&& res = entry.value();
476 return std::move(res);
481 futures.size() == statements.size(),
482 "Futures size must be equal to statements size. Got {} and {}",
487 results.size() == statements.size(),
488 "Results size must be equal to statements size. Got {} and {}",
501 return counters_->report();
506 incrementOutstandingRequestCount()
509 std::unique_lock<std::mutex> lck(throttleMutex_);
510 if (!canAddWriteRequest()) {
511 LOG(log_.
trace()) <<
"Max outstanding requests reached. "
512 <<
"Waiting for other requests to finish";
513 throttleCv_.wait(lck, [
this]() {
return canAddWriteRequest(); });
516 ++numWriteRequestsOutstanding_;
520 decrementOutstandingRequestCount()
523 ASSERT(numWriteRequestsOutstanding_ > 0,
"Decrementing num outstanding below 0");
524 size_t const cur = (--numWriteRequestsOutstanding_);
528 std::lock_guard
const lck(throttleMutex_);
529 throttleCv_.notify_one();
534 std::lock_guard
const lck(syncMutex_);
535 syncCv_.notify_one();
540 canAddWriteRequest()
const
542 return numWriteRequestsOutstanding_ < maxWriteRequestsOutstanding_;
546 finishedAllWriteRequests()
const
548 return numWriteRequestsOutstanding_ == 0;
552 throwErrorIfNeeded(CassandraError err)
const
555 throw DatabaseTimeout();
557 if (err.isInvalidQuery())
558 throw std::runtime_error(
"Invalid query");
Represents a database timeout error.
Definition BackendInterface.hpp:40
A query executor with a changeable retry policy.
Definition AsyncExecutor.hpp:37
ResultOrErrorType read(CompletionTokenType token, StatementType const &statement)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:353
void write(StatementType &&statement)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:191
ResultOrErrorType read(CompletionTokenType token, PreparedStatementType const &preparedStatement, Args &&... args)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:280
ResultOrErrorType read(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:296
void write(PreparedStatementType const &preparedStatement, Args &&... args)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:176
void write(std::vector< StatementType > &&statements)
Non-blocking batched query execution used for writing data.
Definition ExecutionStrategy.hpp:221
void sync()
Wait for all async writes to finish before unblocking.
Definition ExecutionStrategy.hpp:111
ResultOrErrorType writeSync(PreparedStatementType const &preparedStatement, Args &&... args)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:160
bool isTooBusy() const
Definition ExecutionStrategy.hpp:123
ResultOrErrorType writeSync(StatementType const &statement)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:137
std::vector< ResultType > readEach(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:409
boost::json::object stats() const
Get statistics about the backend.
Definition ExecutionStrategy.hpp:499
DefaultExecutionStrategy(Settings const &settings, HandleType const &handle, typename BackendCountersType::PtrType counters=BackendCountersType::make())
Definition ExecutionStrategy.hpp:83
void writeEach(std::vector< StatementType > &&statements)
Non-blocking query execution used for writing data. Contrast with write, this method does not execute...
Definition ExecutionStrategy.hpp:260
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:478
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:488
void forEachBatch(std::ranges::forward_range auto &&container, std::size_t batchSize, auto &&fn)
Iterate over a container in batches.
Definition Batching.hpp:19
Bundles all cassandra settings in one place.
Definition Cluster.hpp:37