Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
AsyncExecutor.hpp
1#pragma once
2
3#include "data/cassandra/Concepts.hpp"
4#include "data/cassandra/Handle.hpp"
5#include "data/cassandra/Types.hpp"
6#include "data/cassandra/impl/RetryPolicy.hpp"
7#include "util/Mutex.hpp"
8#include "util/log/Logger.hpp"
9
10#include <boost/asio.hpp>
11#include <boost/asio/io_context.hpp>
12
13#include <functional>
14#include <memory>
15#include <mutex>
16#include <optional>
17#include <utility>
18
19namespace data::cassandra::impl {
20
32template <
33 typename StatementType,
34 typename HandleType = Handle,
35 SomeRetryPolicy RetryPolicyType = ExponentialBackoffRetryPolicy>
36class AsyncExecutor : public std::enable_shared_from_this<
37 AsyncExecutor<StatementType, HandleType, RetryPolicyType>> {
38 using FutureWithCallbackType = typename HandleType::FutureWithCallbackType;
39 using CallbackType = std::function<void(typename HandleType::ResultOrErrorType)>;
40 using RetryCallbackType = std::function<void()>;
41
42 util::Logger log_{"Backend"};
43
44 StatementType data_;
45 RetryPolicyType retryPolicy_;
46 CallbackType onComplete_;
47 RetryCallbackType onRetry_;
48
49 // does not exist during initial construction, hence optional
50 using OptionalFuture = std::optional<FutureWithCallbackType>;
52
53public:
57 static void
58 run(boost::asio::io_context& ioc,
59 HandleType const& handle,
60 StatementType&& data,
61 CallbackType&& onComplete,
62 RetryCallbackType&& onRetry)
63 {
64 // this is a helper that allows us to use std::make_shared below
65 struct EnableMakeShared : public AsyncExecutor<StatementType, HandleType, RetryPolicyType> {
66 EnableMakeShared(
67 boost::asio::io_context& ioc,
68 StatementType&& data,
69 CallbackType&& onComplete,
70 RetryCallbackType&& onRetry
71 )
72 : AsyncExecutor(ioc, std::move(data), std::move(onComplete), std::move(onRetry))
73 {
74 }
75 };
76
77 auto ptr = std::make_shared<EnableMakeShared>(
78 ioc, std::move(data), std::move(onComplete), std::move(onRetry)
79 );
80 ptr->execute(handle);
81 }
82
83private:
85 boost::asio::io_context& ioc,
86 StatementType&& data,
87 CallbackType&& onComplete,
88 RetryCallbackType&& onRetry
89 )
90 : data_{std::move(data)}
91 , retryPolicy_{ioc}
92 , onComplete_{std::move(onComplete)}
93 , onRetry_{std::move(onRetry)}
94 {
95 }
96
97 void
98 execute(HandleType const& handle)
99 {
100 auto self = this->shared_from_this();
101
102 // lifetime is extended by capturing self ptr
103 auto handler = [this, &handle, self](auto&& res) mutable {
104 if (res) {
105 onComplete_(std::forward<decltype(res)>(res));
106 } else {
107 if (retryPolicy_.shouldRetry(res.error())) {
108 onRetry_();
109 retryPolicy_.retry([self, &handle]() { self->execute(handle); });
110 } else {
111 onComplete_(std::forward<decltype(res)>(res)); // report error
112 }
113 }
114
115 self = nullptr; // explicitly decrement refcount
116 };
117
118 auto future = future_.template lock<std::scoped_lock>();
119 future->emplace(handle.asyncExecute(data_, std::move(handler)));
120 }
121};
122
123} // namespace data::cassandra::impl
A query executor with a changeable retry policy.
Definition AsyncExecutor.hpp:37
static void run(boost::asio::io_context &ioc, HandleType const &handle, StatementType &&data, CallbackType &&onComplete, RetryCallbackType &&onRetry)
Create a new instance of the AsyncExecutor and execute it.
Definition AsyncExecutor.hpp:58
A retry policy that employs exponential backoff.
Definition RetryPolicy.hpp:19
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:82
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:56