Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
AsyncExecutor.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/cassandra/Concepts.hpp"
23#include "data/cassandra/Handle.hpp"
24#include "data/cassandra/Types.hpp"
25#include "data/cassandra/impl/RetryPolicy.hpp"
26#include "util/Mutex.hpp"
27#include "util/log/Logger.hpp"
28
29#include <boost/asio.hpp>
30#include <boost/asio/io_context.hpp>
31
32#include <functional>
33#include <memory>
34#include <mutex>
35#include <optional>
36#include <utility>
37
38namespace data::cassandra::impl {
39
51template <
52 typename StatementType,
53 typename HandleType = Handle,
54 SomeRetryPolicy RetryPolicyType = ExponentialBackoffRetryPolicy>
55class AsyncExecutor : public std::enable_shared_from_this<AsyncExecutor<StatementType, HandleType, RetryPolicyType>> {
56 using FutureWithCallbackType = typename HandleType::FutureWithCallbackType;
57 using CallbackType = std::function<void(typename HandleType::ResultOrErrorType)>;
58 using RetryCallbackType = std::function<void()>;
59
60 util::Logger log_{"Backend"};
61
62 StatementType data_;
63 RetryPolicyType retryPolicy_;
64 CallbackType onComplete_;
65 RetryCallbackType onRetry_;
66
67 // does not exist during initial construction, hence optional
68 using OptionalFuture = std::optional<FutureWithCallbackType>;
70
71public:
75 static void
76 run(boost::asio::io_context& ioc,
77 HandleType const& handle,
78 StatementType&& data,
79 CallbackType&& onComplete,
80 RetryCallbackType&& onRetry)
81 {
82 // this is a helper that allows us to use std::make_shared below
83 struct EnableMakeShared : public AsyncExecutor<StatementType, HandleType, RetryPolicyType> {
84 EnableMakeShared(
85 boost::asio::io_context& ioc,
86 StatementType&& data,
87 CallbackType&& onComplete,
88 RetryCallbackType&& onRetry
89 )
90 : AsyncExecutor(ioc, std::move(data), std::move(onComplete), std::move(onRetry))
91 {
92 }
93 };
94
95 auto ptr = std::make_shared<EnableMakeShared>(ioc, std::move(data), std::move(onComplete), std::move(onRetry));
96 ptr->execute(handle);
97 }
98
99private:
101 boost::asio::io_context& ioc,
102 StatementType&& data,
103 CallbackType&& onComplete,
104 RetryCallbackType&& onRetry
105 )
106 : data_{std::move(data)}, retryPolicy_{ioc}, onComplete_{std::move(onComplete)}, onRetry_{std::move(onRetry)}
107 {
108 }
109
110 void
111 execute(HandleType const& handle)
112 {
113 auto self = this->shared_from_this();
114
115 // lifetime is extended by capturing self ptr
116 auto handler = [this, &handle, self](auto&& res) mutable {
117 if (res) {
118 onComplete_(std::forward<decltype(res)>(res));
119 } else {
120 if (retryPolicy_.shouldRetry(res.error())) {
121 onRetry_();
122 retryPolicy_.retry([self, &handle]() { self->execute(handle); });
123 } else {
124 onComplete_(std::forward<decltype(res)>(res)); // report error
125 }
126 }
127
128 self = nullptr; // explicitly decrement refcount
129 };
130
131 auto future = future_.template lock<std::scoped_lock>();
132 future->emplace(handle.asyncExecute(data_, std::move(handler)));
133 }
134};
135
136} // namespace data::cassandra::impl
A query executor with a changable retry policy.
Definition AsyncExecutor.hpp:55
static void run(boost::asio::io_context &ioc, HandleType const &handle, StatementType &&data, CallbackType &&onComplete, RetryCallbackType &&onRetry)
Create a new instance of the AsyncExecutor and execute it.
Definition AsyncExecutor.hpp:76
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70