Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
AsyncExecutor.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/cassandra/Concepts.hpp"
23#include "data/cassandra/Handle.hpp"
24#include "data/cassandra/Types.hpp"
25#include "data/cassandra/impl/RetryPolicy.hpp"
26#include "util/Mutex.hpp"
27#include "util/log/Logger.hpp"
28
29#include <boost/asio.hpp>
30#include <boost/asio/io_context.hpp>
31
32#include <functional>
33#include <memory>
34#include <mutex>
35#include <optional>
36#include <utility>
37
38namespace data::cassandra::impl {
39
51template <
52 typename StatementType,
53 typename HandleType = Handle,
54 SomeRetryPolicy RetryPolicyType = ExponentialBackoffRetryPolicy>
55class AsyncExecutor : public std::enable_shared_from_this<
56 AsyncExecutor<StatementType, HandleType, RetryPolicyType>> {
57 using FutureWithCallbackType = typename HandleType::FutureWithCallbackType;
58 using CallbackType = std::function<void(typename HandleType::ResultOrErrorType)>;
59 using RetryCallbackType = std::function<void()>;
60
61 util::Logger log_{"Backend"};
62
63 StatementType data_;
64 RetryPolicyType retryPolicy_;
65 CallbackType onComplete_;
66 RetryCallbackType onRetry_;
67
68 // does not exist during initial construction, hence optional
69 using OptionalFuture = std::optional<FutureWithCallbackType>;
71
72public:
76 static void
77 run(boost::asio::io_context& ioc,
78 HandleType const& handle,
79 StatementType&& data,
80 CallbackType&& onComplete,
81 RetryCallbackType&& onRetry)
82 {
83 // this is a helper that allows us to use std::make_shared below
84 struct EnableMakeShared : public AsyncExecutor<StatementType, HandleType, RetryPolicyType> {
85 EnableMakeShared(
86 boost::asio::io_context& ioc,
87 StatementType&& data,
88 CallbackType&& onComplete,
89 RetryCallbackType&& onRetry
90 )
91 : AsyncExecutor(ioc, std::move(data), std::move(onComplete), std::move(onRetry))
92 {
93 }
94 };
95
96 auto ptr = std::make_shared<EnableMakeShared>(
97 ioc, std::move(data), std::move(onComplete), std::move(onRetry)
98 );
99 ptr->execute(handle);
100 }
101
102private:
104 boost::asio::io_context& ioc,
105 StatementType&& data,
106 CallbackType&& onComplete,
107 RetryCallbackType&& onRetry
108 )
109 : data_{std::move(data)}
110 , retryPolicy_{ioc}
111 , onComplete_{std::move(onComplete)}
112 , onRetry_{std::move(onRetry)}
113 {
114 }
115
116 void
117 execute(HandleType const& handle)
118 {
119 auto self = this->shared_from_this();
120
121 // lifetime is extended by capturing self ptr
122 auto handler = [this, &handle, self](auto&& res) mutable {
123 if (res) {
124 onComplete_(std::forward<decltype(res)>(res));
125 } else {
126 if (retryPolicy_.shouldRetry(res.error())) {
127 onRetry_();
128 retryPolicy_.retry([self, &handle]() { self->execute(handle); });
129 } else {
130 onComplete_(std::forward<decltype(res)>(res)); // report error
131 }
132 }
133
134 self = nullptr; // explicitly decrement refcount
135 };
136
137 auto future = future_.template lock<std::scoped_lock>();
138 future->emplace(handle.asyncExecute(data_, std::move(handler)));
139 }
140};
141
142} // namespace data::cassandra::impl
A query executor with a changeable retry policy.
Definition AsyncExecutor.hpp:56
static void run(boost::asio::io_context &ioc, HandleType const &handle, StatementType &&data, CallbackType &&onComplete, RetryCallbackType &&onRetry)
Create a new instance of the AsyncExecutor and execute it.
Definition AsyncExecutor.hpp:77
A retry policy that employs exponential backoff.
Definition RetryPolicy.hpp:38
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:75