Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ExecutionStrategy.hpp
1#pragma once
2
3#include "data/BackendCounters.hpp"
4#include "data/BackendInterface.hpp"
5#include "data/cassandra/Handle.hpp"
6#include "data/cassandra/Types.hpp"
7#include "data/cassandra/impl/AsyncExecutor.hpp"
8#include "util/Assert.hpp"
9#include "util/Batching.hpp"
10#include "util/log/Logger.hpp"
11
12#include <boost/asio.hpp>
13#include <boost/asio/associated_executor.hpp>
14#include <boost/asio/executor_work_guard.hpp>
15#include <boost/asio/io_context.hpp>
16#include <boost/asio/spawn.hpp>
17#include <boost/json/object.hpp>
18
19#include <algorithm>
20#include <atomic>
21#include <chrono>
22#include <condition_variable>
23#include <cstddef>
24#include <cstdint>
25#include <functional>
26#include <memory>
27#include <mutex>
28#include <optional>
29#include <stdexcept>
30#include <thread>
31#include <type_traits>
32#include <vector>
33
34namespace data::cassandra::impl {
35
36// TODO: this could probably be also moved out of impl and into the main cassandra namespace.
37
44template <typename HandleType = Handle, SomeBackendCounters BackendCountersType = BackendCounters>
46 util::Logger log_{"Backend"};
47
48 std::uint32_t maxWriteRequestsOutstanding_;
49 std::atomic_uint32_t numWriteRequestsOutstanding_ = 0;
50
51 std::uint32_t maxReadRequestsOutstanding_;
52 std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
53
54 std::size_t writeBatchSize_;
55
56 std::mutex throttleMutex_;
57 std::condition_variable throttleCv_;
58
59 std::mutex syncMutex_;
60 std::condition_variable syncCv_;
61
62 boost::asio::io_context ioc_;
63 std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
64
65 std::reference_wrapper<HandleType const> handle_;
66 std::thread thread_;
67
68 typename BackendCountersType::PtrType counters_;
69
70public:
71 using ResultOrErrorType = typename HandleType::ResultOrErrorType;
72 using StatementType = typename HandleType::StatementType;
73 using PreparedStatementType = typename HandleType::PreparedStatementType;
74 using FutureType = typename HandleType::FutureType;
75 using FutureWithCallbackType = typename HandleType::FutureWithCallbackType;
76 using ResultType = typename HandleType::ResultType;
77 using CompletionTokenType = boost::asio::yield_context;
78
84 Settings const& settings,
85 HandleType const& handle,
86 typename BackendCountersType::PtrType counters = BackendCountersType::make()
87 )
88 : maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding}
89 , maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding}
90 , writeBatchSize_{settings.writeBatchSize}
91 , work_{boost::asio::make_work_guard(ioc_)}
92 , handle_{std::cref(handle)}
93 , thread_{[this]() { ioc_.run(); }}
94 , counters_{std::move(counters)}
95 {
96 LOG(log_.info()) << "Max write requests outstanding is " << maxWriteRequestsOutstanding_
97 << "; Max read requests outstanding is " << maxReadRequestsOutstanding_;
98 }
99
100 ~DefaultExecutionStrategy()
101 {
102 work_.reset();
103 ioc_.stop();
104 thread_.join();
105 }
106
110 void
112 {
113 LOG(log_.debug()) << "Waiting to sync all writes...";
114 std::unique_lock<std::mutex> lck(syncMutex_);
115 syncCv_.wait(lck, [this]() { return finishedAllWriteRequests(); });
116 LOG(log_.debug()) << "Sync done.";
117 }
118
122 bool
123 isTooBusy() const
124 {
125 bool const result = numReadRequestsOutstanding_ >= maxReadRequestsOutstanding_;
126 if (result)
127 counters_->registerTooBusy();
128 return result;
129 }
130
136 ResultOrErrorType
137 writeSync(StatementType const& statement)
138 {
139 auto const startTime = std::chrono::steady_clock::now();
140 while (true) {
141 auto res = handle_.get().execute(statement);
142 if (res) {
143 counters_->registerWriteSync(startTime);
144 return res;
145 }
146
147 counters_->registerWriteSyncRetry();
148 LOG(log_.warn()) << "Cassandra sync write error, retrying: " << res.error();
149 std::this_thread::sleep_for(std::chrono::milliseconds(5));
150 }
151 }
152
158 template <typename... Args>
159 ResultOrErrorType
160 writeSync(PreparedStatementType const& preparedStatement, Args&&... args)
161 {
162 return writeSync(preparedStatement.bind(std::forward<Args>(args)...));
163 }
164
174 template <typename... Args>
175 void
176 write(PreparedStatementType const& preparedStatement, Args&&... args)
177 {
178 auto statement = preparedStatement.bind(std::forward<Args>(args)...);
179 write(std::move(statement));
180 }
181
190 void
191 write(StatementType&& statement)
192 {
193 auto const startTime = std::chrono::steady_clock::now();
194
195 incrementOutstandingRequestCount();
196
197 counters_->registerWriteStarted();
198 // Note: lifetime is controlled by std::shared_from_this internally
199 AsyncExecutor<std::decay_t<decltype(statement)>, HandleType>::run(
200 ioc_,
201 handle_,
202 std::move(statement),
203 [this, startTime](auto const&) {
204 decrementOutstandingRequestCount();
205
206 counters_->registerWriteFinished(startTime);
207 },
208 [this]() { counters_->registerWriteRetry(); }
209 );
210 }
211
220 void
221 write(std::vector<StatementType>&& statements)
222 {
223 if (statements.empty())
224 return;
225
226 util::forEachBatch(std::move(statements), writeBatchSize_, [this](auto begin, auto end) {
227 auto const startTime = std::chrono::steady_clock::now();
228 auto chunk = std::vector<StatementType>{};
229
230 chunk.reserve(std::distance(begin, end));
231 std::move(begin, end, std::back_inserter(chunk));
232
233 incrementOutstandingRequestCount();
234 counters_->registerWriteStarted();
235
236 // Note: lifetime is controlled by std::shared_from_this internally
237 AsyncExecutor<std::decay_t<decltype(chunk)>, HandleType>::run(
238 ioc_,
239 handle_,
240 std::move(chunk),
241 [this, startTime](auto const&) {
242 decrementOutstandingRequestCount();
243 counters_->registerWriteFinished(startTime);
244 },
245 [this]() { counters_->registerWriteRetry(); }
246 );
247 });
248 }
249
259 void
260 writeEach(std::vector<StatementType>&& statements)
261 {
262 std::ranges::for_each(std::move(statements), [this](auto& statement) {
263 this->write(std::move(statement));
264 });
265 }
266
278 template <typename... Args>
279 [[maybe_unused]] ResultOrErrorType
280 read(CompletionTokenType token, PreparedStatementType const& preparedStatement, Args&&... args)
281 {
282 return read(token, preparedStatement.bind(std::forward<Args>(args)...));
283 }
284
295 [[maybe_unused]] ResultOrErrorType
296 read(CompletionTokenType token, std::vector<StatementType> const& statements)
297 {
298 auto const startTime = std::chrono::steady_clock::now();
299
300 auto const numStatements = statements.size();
301 std::optional<FutureWithCallbackType> future;
302 counters_->registerReadStarted(numStatements);
303
304 // todo: perhaps use policy instead
305 while (true) {
306 numReadRequestsOutstanding_ += numStatements;
307
308 auto init = [this, &statements, &future]<typename Self>(Self& self) {
309 auto sself = std::make_shared<Self>(std::move(self));
310
311 future.emplace(handle_.get().asyncExecute(statements, [sself](auto&& res) mutable {
312 boost::asio::post(
313 boost::asio::get_associated_executor(*sself),
314 [sself, res = std::forward<decltype(res)>(res)]() mutable {
315 sself->complete(std::move(res));
316 }
317 );
318 }));
319 };
320
321 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
322 std::move(init), token, boost::asio::get_associated_executor(token)
323 );
324 numReadRequestsOutstanding_ -= numStatements;
325
326 if (res) {
327 counters_->registerReadFinished(startTime, numStatements);
328 return res;
329 }
330
331 LOG(log_.error()) << "Failed batch read in coroutine: " << res.error();
332 try {
333 throwErrorIfNeeded(res.error());
334 } catch (...) {
335 counters_->registerReadError(numStatements);
336 throw;
337 }
338 counters_->registerReadRetry(numStatements);
339 }
340 }
341
352 [[maybe_unused]] ResultOrErrorType
353 read(CompletionTokenType token, StatementType const& statement)
354 {
355 auto const startTime = std::chrono::steady_clock::now();
356
357 std::optional<FutureWithCallbackType> future;
358 counters_->registerReadStarted();
359
360 // todo: perhaps use policy instead
361 while (true) {
362 ++numReadRequestsOutstanding_;
363 auto init = [this, &statement, &future]<typename Self>(Self& self) {
364 auto sself = std::make_shared<Self>(std::move(self));
365
366 future.emplace(handle_.get().asyncExecute(statement, [sself](auto&& res) mutable {
367 boost::asio::post(
368 boost::asio::get_associated_executor(*sself),
369 [sself, res = std::forward<decltype(res)>(res)]() mutable {
370 sself->complete(std::move(res));
371 }
372 );
373 }));
374 };
375
376 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
377 std::move(init), token, boost::asio::get_associated_executor(token)
378 );
379 --numReadRequestsOutstanding_;
380
381 if (res) {
382 counters_->registerReadFinished(startTime);
383 return res;
384 }
385
386 LOG(log_.error()) << "Failed read in coroutine: " << res.error();
387 try {
388 throwErrorIfNeeded(res.error());
389 } catch (...) {
390 counters_->registerReadError();
391 throw;
392 }
393 counters_->registerReadRetry();
394 }
395 }
396
408 std::vector<ResultType>
409 readEach(CompletionTokenType token, std::vector<StatementType> const& statements)
410 {
411 auto const startTime = std::chrono::steady_clock::now();
412
413 std::atomic_uint64_t errorsCount = 0u;
414 std::atomic_int numOutstanding = statements.size();
415 numReadRequestsOutstanding_ += statements.size();
416
417 auto futures = std::vector<FutureWithCallbackType>{};
418 futures.reserve(numOutstanding);
419 counters_->registerReadStarted(statements.size());
420
421 auto init = [this, &statements, &futures, &errorsCount, &numOutstanding]<typename Self>(
422 Self& self
423 ) {
424 auto sself = std::make_shared<Self>(std::move(self));
425 auto executionHandler =
426 [&errorsCount, &numOutstanding, sself](auto const& res) mutable {
427 if (not res)
428 ++errorsCount;
429
430 // when all async operations complete unblock the result
431 if (--numOutstanding == 0) {
432 boost::asio::post(
433 boost::asio::get_associated_executor(*sself),
434 [sself]() mutable { sself->complete(); }
435 );
436 }
437 };
438
439 std::transform(
440 std::cbegin(statements),
441 std::cend(statements),
442 std::back_inserter(futures),
443 [this, &executionHandler](auto const& statement) {
444 return handle_.get().asyncExecute(statement, executionHandler);
445 }
446 );
447 };
448
449 boost::asio::async_compose<CompletionTokenType, void()>(
450 std::move(init), token, boost::asio::get_associated_executor(token)
451 );
452 numReadRequestsOutstanding_ -= statements.size();
453
454 if (errorsCount > 0) {
455 ASSERT(
456 errorsCount <= statements.size(), "Errors number cannot exceed statements number"
457 );
458 counters_->registerReadError(errorsCount);
459 counters_->registerReadFinished(startTime, statements.size() - errorsCount);
460 throw DatabaseTimeout{};
461 }
462 counters_->registerReadFinished(startTime, statements.size());
463
464 std::vector<ResultType> results;
465 results.reserve(futures.size());
466
467 // it's safe to call blocking get on futures here as we already waited for the coroutine to
468 // resume above.
469 std::transform(
470 std::make_move_iterator(std::begin(futures)),
471 std::make_move_iterator(std::end(futures)),
472 std::back_inserter(results),
473 [](auto&& future) {
474 auto entry = future.get();
475 auto&& res = entry.value();
476 return std::move(res);
477 }
478 );
479
480 ASSERT(
481 futures.size() == statements.size(),
482 "Futures size must be equal to statements size. Got {} and {}",
483 futures.size(),
484 statements.size()
485 );
486 ASSERT(
487 results.size() == statements.size(),
488 "Results size must be equal to statements size. Got {} and {}",
489 results.size(),
490 statements.size()
491 );
492 return results;
493 }
494
498 boost::json::object
499 stats() const
500 {
501 return counters_->report();
502 }
503
504private:
505 void
506 incrementOutstandingRequestCount()
507 {
508 {
509 std::unique_lock<std::mutex> lck(throttleMutex_);
510 if (!canAddWriteRequest()) {
511 LOG(log_.trace()) << "Max outstanding requests reached. "
512 << "Waiting for other requests to finish";
513 throttleCv_.wait(lck, [this]() { return canAddWriteRequest(); });
514 }
515 }
516 ++numWriteRequestsOutstanding_;
517 }
518
519 void
520 decrementOutstandingRequestCount()
521 {
522 // sanity check
523 ASSERT(numWriteRequestsOutstanding_ > 0, "Decrementing num outstanding below 0");
524 size_t const cur = (--numWriteRequestsOutstanding_);
525 {
526 // mutex lock required to prevent race condition around spurious
527 // wakeup
528 std::lock_guard const lck(throttleMutex_);
529 throttleCv_.notify_one();
530 }
531 if (cur == 0) {
532 // mutex lock required to prevent race condition around spurious
533 // wakeup
534 std::lock_guard const lck(syncMutex_);
535 syncCv_.notify_one();
536 }
537 }
538
539 bool
540 canAddWriteRequest() const
541 {
542 return numWriteRequestsOutstanding_ < maxWriteRequestsOutstanding_;
543 }
544
545 bool
546 finishedAllWriteRequests() const
547 {
548 return numWriteRequestsOutstanding_ == 0;
549 }
550
551 void
552 throwErrorIfNeeded(CassandraError err) const
553 {
554 if (err.isTimeout())
555 throw DatabaseTimeout();
556
557 if (err.isInvalidQuery())
558 throw std::runtime_error("Invalid query");
559 }
560};
561
562} // namespace data::cassandra::impl
Represents a database timeout error.
Definition BackendInterface.hpp:40
A query executor with a changeable retry policy.
Definition AsyncExecutor.hpp:37
ResultOrErrorType read(CompletionTokenType token, StatementType const &statement)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:353
void write(StatementType &&statement)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:191
ResultOrErrorType read(CompletionTokenType token, PreparedStatementType const &preparedStatement, Args &&... args)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:280
ResultOrErrorType read(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:296
void write(PreparedStatementType const &preparedStatement, Args &&... args)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:176
void write(std::vector< StatementType > &&statements)
Non-blocking batched query execution used for writing data.
Definition ExecutionStrategy.hpp:221
void sync()
Wait for all async writes to finish before unblocking.
Definition ExecutionStrategy.hpp:111
ResultOrErrorType writeSync(PreparedStatementType const &preparedStatement, Args &&... args)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:160
bool isTooBusy() const
Definition ExecutionStrategy.hpp:123
ResultOrErrorType writeSync(StatementType const &statement)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:137
std::vector< ResultType > readEach(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:409
boost::json::object stats() const
Get statistics about the backend.
Definition ExecutionStrategy.hpp:499
DefaultExecutionStrategy(Settings const &settings, HandleType const &handle, typename BackendCountersType::PtrType counters=BackendCountersType::make())
Definition ExecutionStrategy.hpp:83
void writeEach(std::vector< StatementType > &&statements)
Non-blocking query execution used for writing data. Contrast with write, this method does not execute...
Definition ExecutionStrategy.hpp:260
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:478
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:488
void forEachBatch(std::ranges::forward_range auto &&container, std::size_t batchSize, auto &&fn)
Iterate over a container in batches.
Definition Batching.hpp:19
Bundles all cassandra settings in one place.
Definition Cluster.hpp:37