Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ExecutionStrategy.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendCounters.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/cassandra/Handle.hpp"
25#include "data/cassandra/Types.hpp"
26#include "data/cassandra/impl/AsyncExecutor.hpp"
27#include "util/Assert.hpp"
28#include "util/Batching.hpp"
29#include "util/log/Logger.hpp"
30
31#include <boost/asio.hpp>
32#include <boost/asio/associated_executor.hpp>
33#include <boost/asio/io_context.hpp>
34#include <boost/asio/io_service.hpp>
35#include <boost/asio/spawn.hpp>
36#include <boost/json/object.hpp>
37
38#include <algorithm>
39#include <atomic>
40#include <chrono>
41#include <condition_variable>
42#include <cstddef>
43#include <cstdint>
44#include <functional>
45#include <memory>
46#include <mutex>
47#include <optional>
48#include <stdexcept>
49#include <thread>
50#include <type_traits>
51#include <vector>
52
53namespace data::cassandra::impl {
54
55// TODO: this could probably be also moved out of impl and into the main cassandra namespace.
56
63template <typename HandleType = Handle, SomeBackendCounters BackendCountersType = BackendCounters>
65 util::Logger log_{"Backend"};
66
67 std::uint32_t maxWriteRequestsOutstanding_;
68 std::atomic_uint32_t numWriteRequestsOutstanding_ = 0;
69
70 std::uint32_t maxReadRequestsOutstanding_;
71 std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
72
73 std::size_t writeBatchSize_;
74
75 std::mutex throttleMutex_;
76 std::condition_variable throttleCv_;
77
78 std::mutex syncMutex_;
79 std::condition_variable syncCv_;
80
81 boost::asio::io_context ioc_;
82 std::optional<boost::asio::io_service::work> work_;
83
84 std::reference_wrapper<HandleType const> handle_;
85 std::thread thread_;
86
87 typename BackendCountersType::PtrType counters_;
88
89public:
90 using ResultOrErrorType = typename HandleType::ResultOrErrorType;
91 using StatementType = typename HandleType::StatementType;
92 using PreparedStatementType = typename HandleType::PreparedStatementType;
93 using FutureType = typename HandleType::FutureType;
94 using FutureWithCallbackType = typename HandleType::FutureWithCallbackType;
95 using ResultType = typename HandleType::ResultType;
96 using CompletionTokenType = boost::asio::yield_context;
97
103 Settings const& settings,
104 HandleType const& handle,
105 typename BackendCountersType::PtrType counters = BackendCountersType::make()
106 )
107 : maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding}
108 , maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding}
109 , writeBatchSize_{settings.writeBatchSize}
110 , work_{ioc_}
111 , handle_{std::cref(handle)}
112 , thread_{[this]() { ioc_.run(); }}
113 , counters_{std::move(counters)}
114 {
115 LOG(log_.info()) << "Max write requests outstanding is " << maxWriteRequestsOutstanding_
116 << "; Max read requests outstanding is " << maxReadRequestsOutstanding_;
117 }
118
119 ~DefaultExecutionStrategy()
120 {
121 work_.reset();
122 ioc_.stop();
123 thread_.join();
124 }
125
129 void
131 {
132 LOG(log_.debug()) << "Waiting to sync all writes...";
133 std::unique_lock<std::mutex> lck(syncMutex_);
134 syncCv_.wait(lck, [this]() { return finishedAllWriteRequests(); });
135 LOG(log_.debug()) << "Sync done.";
136 }
137
141 bool
142 isTooBusy() const
143 {
144 bool const result = numReadRequestsOutstanding_ >= maxReadRequestsOutstanding_;
145 if (result)
146 counters_->registerTooBusy();
147 return result;
148 }
149
155 ResultOrErrorType
156 writeSync(StatementType const& statement)
157 {
158 auto const startTime = std::chrono::steady_clock::now();
159 while (true) {
160 auto res = handle_.get().execute(statement);
161 if (res) {
162 counters_->registerWriteSync(startTime);
163 return res;
164 }
165
166 counters_->registerWriteSyncRetry();
167 LOG(log_.warn()) << "Cassandra sync write error, retrying: " << res.error();
168 std::this_thread::sleep_for(std::chrono::milliseconds(5));
169 }
170 }
171
177 template <typename... Args>
178 ResultOrErrorType
179 writeSync(PreparedStatementType const& preparedStatement, Args&&... args)
180 {
181 return writeSync(preparedStatement.bind(std::forward<Args>(args)...));
182 }
183
193 template <typename... Args>
194 void
195 write(PreparedStatementType const& preparedStatement, Args&&... args)
196 {
197 auto statement = preparedStatement.bind(std::forward<Args>(args)...);
198 write(std::move(statement));
199 }
200
209 void
210 write(StatementType&& statement)
211 {
212 auto const startTime = std::chrono::steady_clock::now();
213
214 incrementOutstandingRequestCount();
215
216 counters_->registerWriteStarted();
217 // Note: lifetime is controlled by std::shared_from_this internally
218 AsyncExecutor<std::decay_t<decltype(statement)>, HandleType>::run(
219 ioc_,
220 handle_,
221 std::move(statement),
222 [this, startTime](auto const&) {
223 decrementOutstandingRequestCount();
224
225 counters_->registerWriteFinished(startTime);
226 },
227 [this]() { counters_->registerWriteRetry(); }
228 );
229 }
230
239 void
240 write(std::vector<StatementType>&& statements)
241 {
242 if (statements.empty())
243 return;
244
245 util::forEachBatch(std::move(statements), writeBatchSize_, [this](auto begin, auto end) {
246 auto const startTime = std::chrono::steady_clock::now();
247 auto chunk = std::vector<StatementType>{};
248
249 chunk.reserve(std::distance(begin, end));
250 std::move(begin, end, std::back_inserter(chunk));
251
252 incrementOutstandingRequestCount();
253 counters_->registerWriteStarted();
254
255 // Note: lifetime is controlled by std::shared_from_this internally
256 AsyncExecutor<std::decay_t<decltype(chunk)>, HandleType>::run(
257 ioc_,
258 handle_,
259 std::move(chunk),
260 [this, startTime](auto const&) {
261 decrementOutstandingRequestCount();
262 counters_->registerWriteFinished(startTime);
263 },
264 [this]() { counters_->registerWriteRetry(); }
265 );
266 });
267 }
268
278 void
279 writeEach(std::vector<StatementType>&& statements)
280 {
281 std::ranges::for_each(std::move(statements), [this](auto& statement) { this->write(std::move(statement)); });
282 }
283
295 template <typename... Args>
296 [[maybe_unused]] ResultOrErrorType
297 read(CompletionTokenType token, PreparedStatementType const& preparedStatement, Args&&... args)
298 {
299 return read(token, preparedStatement.bind(std::forward<Args>(args)...));
300 }
301
312 [[maybe_unused]] ResultOrErrorType
313 read(CompletionTokenType token, std::vector<StatementType> const& statements)
314 {
315 auto const startTime = std::chrono::steady_clock::now();
316
317 auto const numStatements = statements.size();
318 std::optional<FutureWithCallbackType> future;
319 counters_->registerReadStarted(numStatements);
320
321 // todo: perhaps use policy instead
322 while (true) {
323 numReadRequestsOutstanding_ += numStatements;
324
325 auto init = [this, &statements, &future]<typename Self>(Self& self) {
326 auto sself = std::make_shared<Self>(std::move(self));
327
328 future.emplace(handle_.get().asyncExecute(statements, [sself](auto&& res) mutable {
329 boost::asio::post(
330 boost::asio::get_associated_executor(*sself),
331 [sself, res = std::forward<decltype(res)>(res)]() mutable { sself->complete(std::move(res)); }
332 );
333 }));
334 };
335
336 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
337 init, token, boost::asio::get_associated_executor(token)
338 );
339 numReadRequestsOutstanding_ -= numStatements;
340
341 if (res) {
342 counters_->registerReadFinished(startTime, numStatements);
343 return res;
344 }
345
346 LOG(log_.error()) << "Failed batch read in coroutine: " << res.error();
347 try {
348 throwErrorIfNeeded(res.error());
349 } catch (...) {
350 counters_->registerReadError(numStatements);
351 throw;
352 }
353 counters_->registerReadRetry(numStatements);
354 }
355 }
356
367 [[maybe_unused]] ResultOrErrorType
368 read(CompletionTokenType token, StatementType const& statement)
369 {
370 auto const startTime = std::chrono::steady_clock::now();
371
372 std::optional<FutureWithCallbackType> future;
373 counters_->registerReadStarted();
374
375 // todo: perhaps use policy instead
376 while (true) {
377 ++numReadRequestsOutstanding_;
378 auto init = [this, &statement, &future]<typename Self>(Self& self) {
379 auto sself = std::make_shared<Self>(std::move(self));
380
381 future.emplace(handle_.get().asyncExecute(statement, [sself](auto&& res) mutable {
382 boost::asio::post(
383 boost::asio::get_associated_executor(*sself),
384 [sself, res = std::forward<decltype(res)>(res)]() mutable { sself->complete(std::move(res)); }
385 );
386 }));
387 };
388
389 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
390 init, token, boost::asio::get_associated_executor(token)
391 );
392 --numReadRequestsOutstanding_;
393
394 if (res) {
395 counters_->registerReadFinished(startTime);
396 return res;
397 }
398
399 LOG(log_.error()) << "Failed read in coroutine: " << res.error();
400 try {
401 throwErrorIfNeeded(res.error());
402 } catch (...) {
403 counters_->registerReadError();
404 throw;
405 }
406 counters_->registerReadRetry();
407 }
408 }
409
421 std::vector<ResultType>
422 readEach(CompletionTokenType token, std::vector<StatementType> const& statements)
423 {
424 auto const startTime = std::chrono::steady_clock::now();
425
426 std::atomic_uint64_t errorsCount = 0u;
427 std::atomic_int numOutstanding = statements.size();
428 numReadRequestsOutstanding_ += statements.size();
429
430 auto futures = std::vector<FutureWithCallbackType>{};
431 futures.reserve(numOutstanding);
432 counters_->registerReadStarted(statements.size());
433
434 auto init = [this, &statements, &futures, &errorsCount, &numOutstanding]<typename Self>(Self& self) {
435 auto sself = std::make_shared<Self>(std::move(self));
436 auto executionHandler = [&errorsCount, &numOutstanding, sself](auto const& res) mutable {
437 if (not res)
438 ++errorsCount;
439
440 // when all async operations complete unblock the result
441 if (--numOutstanding == 0) {
442 boost::asio::post(boost::asio::get_associated_executor(*sself), [sself]() mutable {
443 sself->complete();
444 });
445 }
446 };
447
448 std::transform(
449 std::cbegin(statements),
450 std::cend(statements),
451 std::back_inserter(futures),
452 [this, &executionHandler](auto const& statement) {
453 return handle_.get().asyncExecute(statement, executionHandler);
454 }
455 );
456 };
457
458 boost::asio::async_compose<CompletionTokenType, void()>(
459 init, token, boost::asio::get_associated_executor(token)
460 );
461 numReadRequestsOutstanding_ -= statements.size();
462
463 if (errorsCount > 0) {
464 ASSERT(errorsCount <= statements.size(), "Errors number cannot exceed statements number");
465 counters_->registerReadError(errorsCount);
466 counters_->registerReadFinished(startTime, statements.size() - errorsCount);
467 throw DatabaseTimeout{};
468 }
469 counters_->registerReadFinished(startTime, statements.size());
470
471 std::vector<ResultType> results;
472 results.reserve(futures.size());
473
474 // it's safe to call blocking get on futures here as we already waited for the coroutine to resume above.
475 std::transform(
476 std::make_move_iterator(std::begin(futures)),
477 std::make_move_iterator(std::end(futures)),
478 std::back_inserter(results),
479 [](auto&& future) {
480 auto entry = future.get();
481 auto&& res = entry.value();
482 return std::move(res);
483 }
484 );
485
486 ASSERT(
487 futures.size() == statements.size(),
488 "Futures size must be equal to statements size. Got {} and {}",
489 futures.size(),
490 statements.size()
491 );
492 ASSERT(
493 results.size() == statements.size(),
494 "Results size must be equal to statements size. Got {} and {}",
495 results.size(),
496 statements.size()
497 );
498 return results;
499 }
500
504 boost::json::object
505 stats() const
506 {
507 return counters_->report();
508 }
509
510private:
511 void
512 incrementOutstandingRequestCount()
513 {
514 {
515 std::unique_lock<std::mutex> lck(throttleMutex_);
516 if (!canAddWriteRequest()) {
517 LOG(log_.trace()) << "Max outstanding requests reached. "
518 << "Waiting for other requests to finish";
519 throttleCv_.wait(lck, [this]() { return canAddWriteRequest(); });
520 }
521 }
522 ++numWriteRequestsOutstanding_;
523 }
524
525 void
526 decrementOutstandingRequestCount()
527 {
528 // sanity check
529 ASSERT(numWriteRequestsOutstanding_ > 0, "Decrementing num outstanding below 0");
530 size_t const cur = (--numWriteRequestsOutstanding_);
531 {
532 // mutex lock required to prevent race condition around spurious
533 // wakeup
534 std::lock_guard const lck(throttleMutex_);
535 throttleCv_.notify_one();
536 }
537 if (cur == 0) {
538 // mutex lock required to prevent race condition around spurious
539 // wakeup
540 std::lock_guard const lck(syncMutex_);
541 syncCv_.notify_one();
542 }
543 }
544
545 bool
546 canAddWriteRequest() const
547 {
548 return numWriteRequestsOutstanding_ < maxWriteRequestsOutstanding_;
549 }
550
551 bool
552 finishedAllWriteRequests() const
553 {
554 return numWriteRequestsOutstanding_ == 0;
555 }
556
557 void
558 throwErrorIfNeeded(CassandraError err) const
559 {
560 if (err.isTimeout())
561 throw DatabaseTimeout();
562
563 if (err.isInvalidQuery())
564 throw std::runtime_error("Invalid query");
565 }
566};
567
568} // namespace data::cassandra::impl
Represents a database timeout error.
Definition BackendInterface.hpp:56
A query executor with a changable retry policy.
Definition AsyncExecutor.hpp:55
Implements async and sync querying against the cassandra DB with support for throttling.
Definition ExecutionStrategy.hpp:64
ResultOrErrorType read(CompletionTokenType token, StatementType const &statement)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:368
void write(StatementType &&statement)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:210
ResultOrErrorType read(CompletionTokenType token, PreparedStatementType const &preparedStatement, Args &&... args)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:297
ResultOrErrorType read(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:313
void write(PreparedStatementType const &preparedStatement, Args &&... args)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:195
void write(std::vector< StatementType > &&statements)
Non-blocking batched query execution used for writing data.
Definition ExecutionStrategy.hpp:240
void sync()
Wait for all async writes to finish before unblocking.
Definition ExecutionStrategy.hpp:130
ResultOrErrorType writeSync(PreparedStatementType const &preparedStatement, Args &&... args)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:179
bool isTooBusy() const
Definition ExecutionStrategy.hpp:142
ResultOrErrorType writeSync(StatementType const &statement)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:156
std::vector< ResultType > readEach(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:422
boost::json::object stats() const
Get statistics about the backend.
Definition ExecutionStrategy.hpp:505
DefaultExecutionStrategy(Settings const &settings, HandleType const &handle, typename BackendCountersType::PtrType counters=BackendCountersType::make())
Definition ExecutionStrategy.hpp:102
void writeEach(std::vector< StatementType > &&statements)
Non-blocking query execution used for writing data. Constrast with write, this method does not execut...
Definition ExecutionStrategy.hpp:279
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
void forEachBatch(std::ranges::forward_range auto &&container, std::size_t batchSize, auto &&fn)
Iterate over a container in batches.
Definition Batching.hpp:38
Bundles all cassandra settings in one place.
Definition Cluster.hpp:43