Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LoadBalancer.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/ETLState.hpp"
24#include "etl/InitialLoadObserverInterface.hpp"
27#include "etl/Source.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
29#include "rpc/Errors.hpp"
30#include "util/Mutex.hpp"
31#include "util/Random.hpp"
32#include "util/ResponseExpirationCache.hpp"
33#include "util/config/ConfigDefinition.hpp"
34#include "util/log/Logger.hpp"
35#include "util/prometheus/Counter.hpp"
36
37#include <boost/asio.hpp>
38#include <boost/asio/io_context.hpp>
39#include <boost/asio/spawn.hpp>
40#include <boost/json/object.hpp>
41#include <boost/json/value.hpp>
42#include <grpcpp/grpcpp.h>
43#include <org/xrpl/rpc/v1/get_ledger.pb.h>
44#include <org/xrpl/rpc/v1/ledger.pb.h>
45#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
46
47#include <chrono>
48#include <concepts>
49#include <cstdint>
50#include <expected>
51#include <functional>
52#include <memory>
53#include <optional>
54#include <string>
55#include <string_view>
56#include <vector>
57
58namespace etl {
59
64 virtual ~LoadBalancerTag() = default;
65};
66
67template <typename T>
68concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
69
78public:
79 using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
80 using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
81 using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
82
83private:
84 static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16;
85
86 util::Logger log_{"ETL"};
87 // Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache
88 std::optional<util::ResponseExpirationCache> forwardingCache_;
89 std::optional<std::string> forwardingXUserValue_;
90
91 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator_;
92
93 std::vector<SourcePtr> sources_;
94 std::optional<ETLState> etlState_;
95 std::uint32_t downloadRanges_ =
96 kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
97
98 struct ForwardingCounters {
99 std::reference_wrapper<util::prometheus::CounterInt> successDuration;
100 std::reference_wrapper<util::prometheus::CounterInt> failDuration;
101 std::reference_wrapper<util::prometheus::CounterInt> retries;
102 std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
103 std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
104 } forwardingCounters_;
105
106 // Using mutex instead of atomic_bool because choosing a new source to
107 // forward messages should be done with a mutual exclusion otherwise there will be a race condition
108 util::Mutex<bool> hasForwardingSource_{false};
109
110public:
114 static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin";
115
119 static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user";
120
134 boost::asio::io_context& ioc,
135 std::shared_ptr<BackendInterface> backend,
136 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
137 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
138 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
139 SourceFactory sourceFactory = makeSource
140 );
141
154 static std::shared_ptr<LoadBalancerInterface>
157 boost::asio::io_context& ioc,
158 std::shared_ptr<BackendInterface> backend,
159 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
160 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
161 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
162 SourceFactory sourceFactory = makeSource
163 );
164
176 uint32_t sequence,
178 std::chrono::steady_clock::duration retryAfter
179 ) override;
180
194 OptionalGetLedgerResponseType
196 uint32_t ledgerSequence,
197 bool getObjects,
198 bool getObjectNeighbors,
199 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
200 ) override;
201
207 boost::json::value
208 toJson() const override;
209
219 std::expected<boost::json::object, rpc::CombinedError>
221 boost::json::object const& request,
222 std::optional<std::string> const& clientIp,
223 bool isAdmin,
224 boost::asio::yield_context yield
225 ) override;
226
231 std::optional<ETLState>
232 getETLState() noexcept override;
233
240 void
241 stop(boost::asio::yield_context yield) override;
242
243private:
256 template <typename Func>
257 void
258 execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2});
259
263 void
264 chooseForwardingSource();
265};
266
267} // namespace etl
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:54
An interface for LoadBalancer.
Definition LoadBalancerInterface.hpp:59
static std::shared_ptr< LoadBalancerInterface > makeLoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
A factory function for the load balancer.
Definition LoadBalancer.cpp:69
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, InitialLoadObserverInterface &observer, std::chrono::steady_clock::duration retryAfter) override
Load the initial ledger, writing data to the queue.
Definition LoadBalancer.cpp:216
std::expected< boost::json::object, rpc::CombinedError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &clientIp, bool isAdmin, boost::asio::yield_context yield) override
Forward a JSON RPC request to a randomly selected rippled node.
Definition LoadBalancer.cpp:275
LoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
Create an instance of the load balancer.
Definition LoadBalancer.cpp:90
std::optional< ETLState > getETLState() noexcept override
Return state of ETL nodes.
Definition LoadBalancer.cpp:381
boost::json::value toJson() const override
Represent the state of this load balancer as a JSON object.
Definition LoadBalancer.cpp:329
void stop(boost::asio::yield_context yield) override
Stop the load balancer. This will stop all subscription sources.
Definition LoadBalancer.cpp:391
static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding user requests.
Definition LoadBalancer.hpp:119
static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding admin requests.
Definition LoadBalancer.hpp:114
OptionalGetLedgerResponseType fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter=std::chrono::seconds{2}) override
Fetch data for a specific ledger.
Definition LoadBalancer.cpp:245
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:95
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
Definition LoadBalancer.hpp:68
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36
A tag class to help identify LoadBalancer in templated code.
Definition LoadBalancer.hpp:63