Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LoadBalancer.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/ETLState.hpp"
24#include "etl/InitialLoadObserverInterface.hpp"
27#include "etl/Source.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
29#include "rpc/Errors.hpp"
30#include "util/Mutex.hpp"
31#include "util/Random.hpp"
32#include "util/ResponseExpirationCache.hpp"
33#include "util/config/ConfigDefinition.hpp"
34#include "util/log/Logger.hpp"
35#include "util/prometheus/Counter.hpp"
36
37#include <boost/asio.hpp>
38#include <boost/asio/io_context.hpp>
39#include <boost/asio/spawn.hpp>
40#include <boost/json/object.hpp>
41#include <boost/json/value.hpp>
42#include <grpcpp/grpcpp.h>
43#include <org/xrpl/rpc/v1/get_ledger.pb.h>
44#include <org/xrpl/rpc/v1/ledger.pb.h>
45#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
46
47#include <chrono>
48#include <concepts>
49#include <cstdint>
50#include <expected>
51#include <functional>
52#include <memory>
53#include <optional>
54#include <string>
55#include <string_view>
56#include <vector>
57
58namespace etl {
59
64 virtual ~LoadBalancerTag() = default;
65};
66
67template <typename T>
68concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
69
79public:
80 using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
81 using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
82 using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
83
84private:
85 static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16;
86
87 util::Logger log_{"ETL"};
88 // Forwarding cache must be destroyed after sources because sources have a callback to
89 // invalidate cache
90 std::optional<util::ResponseExpirationCache> forwardingCache_;
91 std::optional<std::string> forwardingXUserValue_;
92
93 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator_;
94
95 std::vector<SourcePtr> sources_;
96 std::optional<ETLState> etlState_;
97 std::uint32_t downloadRanges_ = kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when
98 downloading initial ledger */
99
100 struct ForwardingCounters {
101 std::reference_wrapper<util::prometheus::CounterInt> successDuration;
102 std::reference_wrapper<util::prometheus::CounterInt> failDuration;
103 std::reference_wrapper<util::prometheus::CounterInt> retries;
104 std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
105 std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
106 } forwardingCounters_;
107
108 // Using mutex instead of atomic_bool because choosing a new source to
109 // forward messages should be done with a mutual exclusion otherwise there will be a race
110 // condition
111 util::Mutex<bool> hasForwardingSource_{false};
112
113public:
117 static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin";
118
122 static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user";
123
137 boost::asio::io_context& ioc,
138 std::shared_ptr<BackendInterface> backend,
139 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
140 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
141 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
142 SourceFactory sourceFactory = makeSource
143 );
144
157 static std::shared_ptr<LoadBalancerInterface>
160 boost::asio::io_context& ioc,
161 std::shared_ptr<BackendInterface> backend,
162 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
163 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
164 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
165 SourceFactory sourceFactory = makeSource
166 );
167
181 uint32_t sequence,
183 std::chrono::steady_clock::duration retryAfter
184 ) override;
185
199 OptionalGetLedgerResponseType
201 uint32_t ledgerSequence,
202 bool getObjects,
203 bool getObjectNeighbors,
204 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
205 ) override;
206
212 boost::json::value
213 toJson() const override;
214
224 std::expected<boost::json::object, rpc::CombinedError>
226 boost::json::object const& request,
227 std::optional<std::string> const& clientIp,
228 bool isAdmin,
229 boost::asio::yield_context yield
230 ) override;
231
236 std::optional<ETLState>
237 getETLState() noexcept override;
238
245 void
246 stop(boost::asio::yield_context yield) override;
247
248private:
263 template <typename Func>
264 void
265 execute(
266 Func f,
267 uint32_t ledgerSequence,
268 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
269 );
270
274 void
275 chooseForwardingSource();
276};
277
278} // namespace etl
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:54
An interface for LoadBalancer.
Definition LoadBalancerInterface.hpp:59
static std::shared_ptr< LoadBalancerInterface > makeLoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
A factory function for the load balancer.
Definition LoadBalancer.cpp:69
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, InitialLoadObserverInterface &observer, std::chrono::steady_clock::duration retryAfter) override
Load the initial ledger, writing data to the queue.
Definition LoadBalancer.cpp:222
std::expected< boost::json::object, rpc::CombinedError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &clientIp, bool isAdmin, boost::asio::yield_context yield) override
Forward a JSON RPC request to a randomly selected rippled node.
Definition LoadBalancer.cpp:285
LoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
Create an instance of the load balancer.
Definition LoadBalancer.cpp:90
std::optional< ETLState > getETLState() noexcept override
Return state of ETL nodes.
Definition LoadBalancer.cpp:399
boost::json::value toJson() const override
Represent the state of this load balancer as a JSON object.
Definition LoadBalancer.cpp:340
void stop(boost::asio::yield_context yield) override
Stop the load balancer. This will stop all subscription sources.
Definition LoadBalancer.cpp:409
static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding user requests.
Definition LoadBalancer.hpp:122
static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding admin requests.
Definition LoadBalancer.hpp:117
OptionalGetLedgerResponseType fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter=std::chrono::seconds{2}) override
Fetch data for a specific ledger.
Definition LoadBalancer.cpp:252
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
Definition LoadBalancer.hpp:68
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36
A tag class to help identify LoadBalancer in templated code.
Definition LoadBalancer.hpp:63