Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ExecutionStrategy.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendCounters.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/cassandra/Handle.hpp"
25#include "data/cassandra/Types.hpp"
26#include "data/cassandra/impl/AsyncExecutor.hpp"
27#include "util/Assert.hpp"
28#include "util/Batching.hpp"
29#include "util/log/Logger.hpp"
30
31#include <boost/asio.hpp>
32#include <boost/asio/associated_executor.hpp>
33#include <boost/asio/executor_work_guard.hpp>
34#include <boost/asio/io_context.hpp>
35#include <boost/asio/spawn.hpp>
36#include <boost/json/object.hpp>
37
38#include <algorithm>
39#include <atomic>
40#include <chrono>
41#include <condition_variable>
42#include <cstddef>
43#include <cstdint>
44#include <functional>
45#include <memory>
46#include <mutex>
47#include <optional>
48#include <stdexcept>
49#include <thread>
50#include <type_traits>
51#include <vector>
52
53namespace data::cassandra::impl {
54
55// TODO: this could probably be also moved out of impl and into the main cassandra namespace.
56
63template <typename HandleType = Handle, SomeBackendCounters BackendCountersType = BackendCounters>
65 util::Logger log_{"Backend"};
66
67 std::uint32_t maxWriteRequestsOutstanding_;
68 std::atomic_uint32_t numWriteRequestsOutstanding_ = 0;
69
70 std::uint32_t maxReadRequestsOutstanding_;
71 std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
72
73 std::size_t writeBatchSize_;
74
75 std::mutex throttleMutex_;
76 std::condition_variable throttleCv_;
77
78 std::mutex syncMutex_;
79 std::condition_variable syncCv_;
80
81 boost::asio::io_context ioc_;
82 std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
83
84 std::reference_wrapper<HandleType const> handle_;
85 std::thread thread_;
86
87 typename BackendCountersType::PtrType counters_;
88
89public:
90 using ResultOrErrorType = typename HandleType::ResultOrErrorType;
91 using StatementType = typename HandleType::StatementType;
92 using PreparedStatementType = typename HandleType::PreparedStatementType;
93 using FutureType = typename HandleType::FutureType;
94 using FutureWithCallbackType = typename HandleType::FutureWithCallbackType;
95 using ResultType = typename HandleType::ResultType;
96 using CompletionTokenType = boost::asio::yield_context;
97
103 Settings const& settings,
104 HandleType const& handle,
105 typename BackendCountersType::PtrType counters = BackendCountersType::make()
106 )
107 : maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding}
108 , maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding}
109 , writeBatchSize_{settings.writeBatchSize}
110 , work_{boost::asio::make_work_guard(ioc_)}
111 , handle_{std::cref(handle)}
112 , thread_{[this]() { ioc_.run(); }}
113 , counters_{std::move(counters)}
114 {
115 LOG(log_.info()) << "Max write requests outstanding is " << maxWriteRequestsOutstanding_
116 << "; Max read requests outstanding is " << maxReadRequestsOutstanding_;
117 }
118
119 ~DefaultExecutionStrategy()
120 {
121 work_.reset();
122 ioc_.stop();
123 thread_.join();
124 }
125
129 void
131 {
132 LOG(log_.debug()) << "Waiting to sync all writes...";
133 std::unique_lock<std::mutex> lck(syncMutex_);
134 syncCv_.wait(lck, [this]() { return finishedAllWriteRequests(); });
135 LOG(log_.debug()) << "Sync done.";
136 }
137
141 bool
142 isTooBusy() const
143 {
144 bool const result = numReadRequestsOutstanding_ >= maxReadRequestsOutstanding_;
145 if (result)
146 counters_->registerTooBusy();
147 return result;
148 }
149
155 ResultOrErrorType
156 writeSync(StatementType const& statement)
157 {
158 auto const startTime = std::chrono::steady_clock::now();
159 while (true) {
160 auto res = handle_.get().execute(statement);
161 if (res) {
162 counters_->registerWriteSync(startTime);
163 return res;
164 }
165
166 counters_->registerWriteSyncRetry();
167 LOG(log_.warn()) << "Cassandra sync write error, retrying: " << res.error();
168 std::this_thread::sleep_for(std::chrono::milliseconds(5));
169 }
170 }
171
177 template <typename... Args>
178 ResultOrErrorType
179 writeSync(PreparedStatementType const& preparedStatement, Args&&... args)
180 {
181 return writeSync(preparedStatement.bind(std::forward<Args>(args)...));
182 }
183
193 template <typename... Args>
194 void
195 write(PreparedStatementType const& preparedStatement, Args&&... args)
196 {
197 auto statement = preparedStatement.bind(std::forward<Args>(args)...);
198 write(std::move(statement));
199 }
200
209 void
210 write(StatementType&& statement)
211 {
212 auto const startTime = std::chrono::steady_clock::now();
213
214 incrementOutstandingRequestCount();
215
216 counters_->registerWriteStarted();
217 // Note: lifetime is controlled by std::shared_from_this internally
218 AsyncExecutor<std::decay_t<decltype(statement)>, HandleType>::run(
219 ioc_,
220 handle_,
221 std::move(statement),
222 [this, startTime](auto const&) {
223 decrementOutstandingRequestCount();
224
225 counters_->registerWriteFinished(startTime);
226 },
227 [this]() { counters_->registerWriteRetry(); }
228 );
229 }
230
239 void
240 write(std::vector<StatementType>&& statements)
241 {
242 if (statements.empty())
243 return;
244
245 util::forEachBatch(std::move(statements), writeBatchSize_, [this](auto begin, auto end) {
246 auto const startTime = std::chrono::steady_clock::now();
247 auto chunk = std::vector<StatementType>{};
248
249 chunk.reserve(std::distance(begin, end));
250 std::move(begin, end, std::back_inserter(chunk));
251
252 incrementOutstandingRequestCount();
253 counters_->registerWriteStarted();
254
255 // Note: lifetime is controlled by std::shared_from_this internally
256 AsyncExecutor<std::decay_t<decltype(chunk)>, HandleType>::run(
257 ioc_,
258 handle_,
259 std::move(chunk),
260 [this, startTime](auto const&) {
261 decrementOutstandingRequestCount();
262 counters_->registerWriteFinished(startTime);
263 },
264 [this]() { counters_->registerWriteRetry(); }
265 );
266 });
267 }
268
278 void
279 writeEach(std::vector<StatementType>&& statements)
280 {
281 std::ranges::for_each(std::move(statements), [this](auto& statement) {
282 this->write(std::move(statement));
283 });
284 }
285
297 template <typename... Args>
298 [[maybe_unused]] ResultOrErrorType
299 read(CompletionTokenType token, PreparedStatementType const& preparedStatement, Args&&... args)
300 {
301 return read(token, preparedStatement.bind(std::forward<Args>(args)...));
302 }
303
314 [[maybe_unused]] ResultOrErrorType
315 read(CompletionTokenType token, std::vector<StatementType> const& statements)
316 {
317 auto const startTime = std::chrono::steady_clock::now();
318
319 auto const numStatements = statements.size();
320 std::optional<FutureWithCallbackType> future;
321 counters_->registerReadStarted(numStatements);
322
323 // todo: perhaps use policy instead
324 while (true) {
325 numReadRequestsOutstanding_ += numStatements;
326
327 auto init = [this, &statements, &future]<typename Self>(Self& self) {
328 auto sself = std::make_shared<Self>(std::move(self));
329
330 future.emplace(handle_.get().asyncExecute(statements, [sself](auto&& res) mutable {
331 boost::asio::post(
332 boost::asio::get_associated_executor(*sself),
333 [sself, res = std::forward<decltype(res)>(res)]() mutable {
334 sself->complete(std::move(res));
335 }
336 );
337 }));
338 };
339
340 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
341 std::move(init), token, boost::asio::get_associated_executor(token)
342 );
343 numReadRequestsOutstanding_ -= numStatements;
344
345 if (res) {
346 counters_->registerReadFinished(startTime, numStatements);
347 return res;
348 }
349
350 LOG(log_.error()) << "Failed batch read in coroutine: " << res.error();
351 try {
352 throwErrorIfNeeded(res.error());
353 } catch (...) {
354 counters_->registerReadError(numStatements);
355 throw;
356 }
357 counters_->registerReadRetry(numStatements);
358 }
359 }
360
371 [[maybe_unused]] ResultOrErrorType
372 read(CompletionTokenType token, StatementType const& statement)
373 {
374 auto const startTime = std::chrono::steady_clock::now();
375
376 std::optional<FutureWithCallbackType> future;
377 counters_->registerReadStarted();
378
379 // todo: perhaps use policy instead
380 while (true) {
381 ++numReadRequestsOutstanding_;
382 auto init = [this, &statement, &future]<typename Self>(Self& self) {
383 auto sself = std::make_shared<Self>(std::move(self));
384
385 future.emplace(handle_.get().asyncExecute(statement, [sself](auto&& res) mutable {
386 boost::asio::post(
387 boost::asio::get_associated_executor(*sself),
388 [sself, res = std::forward<decltype(res)>(res)]() mutable {
389 sself->complete(std::move(res));
390 }
391 );
392 }));
393 };
394
395 auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
396 std::move(init), token, boost::asio::get_associated_executor(token)
397 );
398 --numReadRequestsOutstanding_;
399
400 if (res) {
401 counters_->registerReadFinished(startTime);
402 return res;
403 }
404
405 LOG(log_.error()) << "Failed read in coroutine: " << res.error();
406 try {
407 throwErrorIfNeeded(res.error());
408 } catch (...) {
409 counters_->registerReadError();
410 throw;
411 }
412 counters_->registerReadRetry();
413 }
414 }
415
427 std::vector<ResultType>
428 readEach(CompletionTokenType token, std::vector<StatementType> const& statements)
429 {
430 auto const startTime = std::chrono::steady_clock::now();
431
432 std::atomic_uint64_t errorsCount = 0u;
433 std::atomic_int numOutstanding = statements.size();
434 numReadRequestsOutstanding_ += statements.size();
435
436 auto futures = std::vector<FutureWithCallbackType>{};
437 futures.reserve(numOutstanding);
438 counters_->registerReadStarted(statements.size());
439
440 auto init = [this, &statements, &futures, &errorsCount, &numOutstanding]<typename Self>(
441 Self& self
442 ) {
443 auto sself = std::make_shared<Self>(std::move(self));
444 auto executionHandler =
445 [&errorsCount, &numOutstanding, sself](auto const& res) mutable {
446 if (not res)
447 ++errorsCount;
448
449 // when all async operations complete unblock the result
450 if (--numOutstanding == 0) {
451 boost::asio::post(
452 boost::asio::get_associated_executor(*sself),
453 [sself]() mutable { sself->complete(); }
454 );
455 }
456 };
457
458 std::transform(
459 std::cbegin(statements),
460 std::cend(statements),
461 std::back_inserter(futures),
462 [this, &executionHandler](auto const& statement) {
463 return handle_.get().asyncExecute(statement, executionHandler);
464 }
465 );
466 };
467
468 boost::asio::async_compose<CompletionTokenType, void()>(
469 std::move(init), token, boost::asio::get_associated_executor(token)
470 );
471 numReadRequestsOutstanding_ -= statements.size();
472
473 if (errorsCount > 0) {
474 ASSERT(
475 errorsCount <= statements.size(), "Errors number cannot exceed statements number"
476 );
477 counters_->registerReadError(errorsCount);
478 counters_->registerReadFinished(startTime, statements.size() - errorsCount);
479 throw DatabaseTimeout{};
480 }
481 counters_->registerReadFinished(startTime, statements.size());
482
483 std::vector<ResultType> results;
484 results.reserve(futures.size());
485
486 // it's safe to call blocking get on futures here as we already waited for the coroutine to
487 // resume above.
488 std::transform(
489 std::make_move_iterator(std::begin(futures)),
490 std::make_move_iterator(std::end(futures)),
491 std::back_inserter(results),
492 [](auto&& future) {
493 auto entry = future.get();
494 auto&& res = entry.value();
495 return std::move(res);
496 }
497 );
498
499 ASSERT(
500 futures.size() == statements.size(),
501 "Futures size must be equal to statements size. Got {} and {}",
502 futures.size(),
503 statements.size()
504 );
505 ASSERT(
506 results.size() == statements.size(),
507 "Results size must be equal to statements size. Got {} and {}",
508 results.size(),
509 statements.size()
510 );
511 return results;
512 }
513
517 boost::json::object
518 stats() const
519 {
520 return counters_->report();
521 }
522
523private:
524 void
525 incrementOutstandingRequestCount()
526 {
527 {
528 std::unique_lock<std::mutex> lck(throttleMutex_);
529 if (!canAddWriteRequest()) {
530 LOG(log_.trace()) << "Max outstanding requests reached. "
531 << "Waiting for other requests to finish";
532 throttleCv_.wait(lck, [this]() { return canAddWriteRequest(); });
533 }
534 }
535 ++numWriteRequestsOutstanding_;
536 }
537
538 void
539 decrementOutstandingRequestCount()
540 {
541 // sanity check
542 ASSERT(numWriteRequestsOutstanding_ > 0, "Decrementing num outstanding below 0");
543 size_t const cur = (--numWriteRequestsOutstanding_);
544 {
545 // mutex lock required to prevent race condition around spurious
546 // wakeup
547 std::lock_guard const lck(throttleMutex_);
548 throttleCv_.notify_one();
549 }
550 if (cur == 0) {
551 // mutex lock required to prevent race condition around spurious
552 // wakeup
553 std::lock_guard const lck(syncMutex_);
554 syncCv_.notify_one();
555 }
556 }
557
558 bool
559 canAddWriteRequest() const
560 {
561 return numWriteRequestsOutstanding_ < maxWriteRequestsOutstanding_;
562 }
563
564 bool
565 finishedAllWriteRequests() const
566 {
567 return numWriteRequestsOutstanding_ == 0;
568 }
569
570 void
571 throwErrorIfNeeded(CassandraError err) const
572 {
573 if (err.isTimeout())
574 throw DatabaseTimeout();
575
576 if (err.isInvalidQuery())
577 throw std::runtime_error("Invalid query");
578 }
579};
580
581} // namespace data::cassandra::impl
Represents a database timeout error.
Definition BackendInterface.hpp:59
A query executor with a changeable retry policy.
Definition AsyncExecutor.hpp:56
ResultOrErrorType read(CompletionTokenType token, StatementType const &statement)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:372
void write(StatementType &&statement)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:210
ResultOrErrorType read(CompletionTokenType token, PreparedStatementType const &preparedStatement, Args &&... args)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:299
ResultOrErrorType read(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:315
void write(PreparedStatementType const &preparedStatement, Args &&... args)
Non-blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:195
void write(std::vector< StatementType > &&statements)
Non-blocking batched query execution used for writing data.
Definition ExecutionStrategy.hpp:240
void sync()
Wait for all async writes to finish before unblocking.
Definition ExecutionStrategy.hpp:130
ResultOrErrorType writeSync(PreparedStatementType const &preparedStatement, Args &&... args)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:179
bool isTooBusy() const
Definition ExecutionStrategy.hpp:142
ResultOrErrorType writeSync(StatementType const &statement)
Blocking query execution used for writing data.
Definition ExecutionStrategy.hpp:156
std::vector< ResultType > readEach(CompletionTokenType token, std::vector< StatementType > const &statements)
Coroutine-based query execution used for reading data.
Definition ExecutionStrategy.hpp:428
boost::json::object stats() const
Get statistics about the backend.
Definition ExecutionStrategy.hpp:518
DefaultExecutionStrategy(Settings const &settings, HandleType const &handle, typename BackendCountersType::PtrType counters=BackendCountersType::make())
Definition ExecutionStrategy.hpp:102
void writeEach(std::vector< StatementType > &&statements)
Non-blocking query execution used for writing data. Contrast with write, this method does not execute...
Definition ExecutionStrategy.hpp:279
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
Pump trace(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::TRC severity.
Definition Logger.cpp:497
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:507
void forEachBatch(std::ranges::forward_range auto &&container, std::size_t batchSize, auto &&fn)
Iterate over a container in batches.
Definition Batching.hpp:38
Bundles all cassandra settings in one place.
Definition Cluster.hpp:56