Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LoadBalancer.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/ETLState.hpp"
25#include "etlng/InitialLoadObserverInterface.hpp"
27#include "etlng/Source.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
29#include "rpc/Errors.hpp"
30#include "util/Assert.hpp"
31#include "util/Mutex.hpp"
32#include "util/Random.hpp"
33#include "util/ResponseExpirationCache.hpp"
34#include "util/config/ConfigDefinition.hpp"
35#include "util/log/Logger.hpp"
36#include "util/prometheus/Counter.hpp"
37
38#include <boost/asio.hpp>
39#include <boost/asio/io_context.hpp>
40#include <boost/asio/spawn.hpp>
41#include <boost/json/object.hpp>
42#include <boost/json/value.hpp>
43#include <grpcpp/grpcpp.h>
44#include <org/xrpl/rpc/v1/get_ledger.pb.h>
45#include <org/xrpl/rpc/v1/ledger.pb.h>
46#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
47
48#include <chrono>
49#include <concepts>
50#include <cstdint>
51#include <expected>
52#include <memory>
53#include <optional>
54#include <string>
55#include <string_view>
56#include <utility>
57#include <vector>
58
59namespace etlng {
60
65 virtual ~LoadBalancerTag() = default;
66};
67
68template <typename T>
69concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
70
79public:
80 using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
81 using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
82 using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
83
84private:
85 static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16;
86
87 util::Logger log_{"ETL"};
88 // Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache
89 std::optional<util::ResponseExpirationCache> forwardingCache_;
90 std::optional<std::string> forwardingXUserValue_;
91
92 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator_;
93
94 std::vector<SourcePtr> sources_;
95 std::optional<etl::ETLState> etlState_;
96 std::uint32_t downloadRanges_ =
97 kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
98
99 struct ForwardingCounters {
100 std::reference_wrapper<util::prometheus::CounterInt> successDuration;
101 std::reference_wrapper<util::prometheus::CounterInt> failDuration;
102 std::reference_wrapper<util::prometheus::CounterInt> retries;
103 std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
104 std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
105 } forwardingCounters_;
106
107 // Using mutext instead of atomic_bool because choosing a new source to
108 // forward messages should be done with a mutual exclusion otherwise there will be a race condition
109 util::Mutex<bool> hasForwardingSource_{false};
110
111public:
115 static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin";
116
120 static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user";
121
135 boost::asio::io_context& ioc,
136 std::shared_ptr<BackendInterface> backend,
137 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
138 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
139 std::shared_ptr<etl::NetworkValidatedLedgersInterface> validatedLedgers,
140 SourceFactory sourceFactory = makeSource
141 );
142
155 static std::shared_ptr<LoadBalancerInterface>
158 boost::asio::io_context& ioc,
159 std::shared_ptr<BackendInterface> backend,
160 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
161 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
162 std::shared_ptr<etl::NetworkValidatedLedgersInterface> validatedLedgers,
163 SourceFactory sourceFactory = makeSource
164 );
165
174 std::vector<std::string>
176 [[maybe_unused]] uint32_t sequence,
177 [[maybe_unused]] std::chrono::steady_clock::duration retryAfter
178 ) override
179 {
180 ASSERT(false, "Not available for new ETL");
181 std::unreachable();
182 };
183
193 std::vector<std::string>
195 uint32_t sequence,
197 std::chrono::steady_clock::duration retryAfter
198 ) override;
199
213 OptionalGetLedgerResponseType
215 uint32_t ledgerSequence,
216 bool getObjects,
217 bool getObjectNeighbors,
218 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
219 ) override;
220
226 boost::json::value
227 toJson() const override;
228
238 std::expected<boost::json::object, rpc::CombinedError>
240 boost::json::object const& request,
241 std::optional<std::string> const& clientIp,
242 bool isAdmin,
243 boost::asio::yield_context yield
244 ) override;
245
250 std::optional<etl::ETLState>
251 getETLState() noexcept override;
252
259 void
260 stop(boost::asio::yield_context yield) override;
261
262private:
275 template <typename Func>
276 void
277 execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2});
278
282 void
283 chooseForwardingSource();
284
285 std::expected<boost::json::object, rpc::CombinedError>
286 forwardToRippledImpl(
287 boost::json::object const& request,
288 std::optional<std::string> const& clientIp,
289 bool isAdmin,
290 boost::asio::yield_context yield
291 );
292};
293
294} // namespace etlng
An interface for LoadBalancer.
Definition LoadBalancerInterface.hpp:45
This class is used to manage connections to transaction processing processes.
Definition LoadBalancer.hpp:78
void stop(boost::asio::yield_context yield) override
Stop the load balancer. This will stop all subscription sources.
Definition LoadBalancer.cpp:383
LoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< etl::NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
Create an instance of the load balancer.
Definition LoadBalancer.cpp:91
std::optional< etl::ETLState > getETLState() noexcept override
Return state of ETL nodes.
Definition LoadBalancer.cpp:373
std::vector< std::string > loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter) override
Load the initial ledger, writing data to the queue.
Definition LoadBalancer.hpp:175
static std::shared_ptr< LoadBalancerInterface > makeLoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< etl::NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
A factory function for the load balancer.
Definition LoadBalancer.cpp:70
std::expected< boost::json::object, rpc::CombinedError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &clientIp, bool isAdmin, boost::asio::yield_context yield) override
Forward a JSON RPC request to a randomly selected rippled node.
Definition LoadBalancer.cpp:271
OptionalGetLedgerResponseType fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter=std::chrono::seconds{2}) override
Fetch data for a specific ledger.
Definition LoadBalancer.cpp:241
static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding user requests.
Definition LoadBalancer.hpp:120
boost::json::value toJson() const override
Represent the state of this load balancer as a JSON object.
Definition LoadBalancer.cpp:321
static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding admin requests.
Definition LoadBalancer.hpp:115
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
Definition LoadBalancer.hpp:69
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36
A tag class to help identify LoadBalancer in templated code.
Definition LoadBalancer.hpp:64