Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LoadBalancer.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "etl/ETLState.hpp"
5#include "etl/InitialLoadObserverInterface.hpp"
8#include "etl/Source.hpp"
9#include "feed/SubscriptionManagerInterface.hpp"
10#include "rpc/Errors.hpp"
11#include "util/Mutex.hpp"
12#include "util/Random.hpp"
13#include "util/ResponseExpirationCache.hpp"
14#include "util/config/ConfigDefinition.hpp"
15#include "util/log/Logger.hpp"
16#include "util/prometheus/Counter.hpp"
17
18#include <boost/asio.hpp>
19#include <boost/asio/io_context.hpp>
20#include <boost/asio/spawn.hpp>
21#include <boost/json/object.hpp>
22#include <boost/json/value.hpp>
23#include <grpcpp/grpcpp.h>
24#include <org/xrpl/rpc/v1/get_ledger.pb.h>
25#include <org/xrpl/rpc/v1/ledger.pb.h>
26#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
27
28#include <chrono>
29#include <concepts>
30#include <cstdint>
31#include <expected>
32#include <functional>
33#include <memory>
34#include <optional>
35#include <string>
36#include <string_view>
37#include <vector>
38
39namespace etl {
40
45 virtual ~LoadBalancerTag() = default;
46};
47
48template <typename T>
49concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
50
60public:
61 using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
62 using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
63 using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
64
65private:
66 static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16;
67
68 util::Logger log_{"ETL"};
69 // Forwarding cache must be destroyed after sources because sources have a callback to
70 // invalidate cache
71 std::optional<util::ResponseExpirationCache> forwardingCache_;
72 std::optional<std::string> forwardingXUserValue_;
73
74 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator_;
75
76 std::vector<SourcePtr> sources_;
77 std::optional<ETLState> etlState_;
78 std::uint32_t downloadRanges_ = kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when
79 downloading initial ledger */
80
81 struct ForwardingCounters {
82 std::reference_wrapper<util::prometheus::CounterInt> successDuration;
83 std::reference_wrapper<util::prometheus::CounterInt> failDuration;
84 std::reference_wrapper<util::prometheus::CounterInt> retries;
85 std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
86 std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
87 } forwardingCounters_;
88
89 // Using mutex instead of atomic_bool because choosing a new source to
90 // forward messages should be done with a mutual exclusion otherwise there will be a race
91 // condition
92 util::Mutex<bool> hasForwardingSource_{false};
93
94public:
98 static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin";
99
103 static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user";
104
118 boost::asio::io_context& ioc,
119 std::shared_ptr<BackendInterface> backend,
120 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
121 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
122 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
123 SourceFactory sourceFactory = makeSource
124 );
125
138 static std::shared_ptr<LoadBalancerInterface>
141 boost::asio::io_context& ioc,
142 std::shared_ptr<BackendInterface> backend,
143 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
144 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
145 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
146 SourceFactory sourceFactory = makeSource
147 );
148
162 uint32_t sequence,
164 std::chrono::steady_clock::duration retryAfter
165 ) override;
166
180 OptionalGetLedgerResponseType
182 uint32_t ledgerSequence,
183 bool getObjects,
184 bool getObjectNeighbors,
185 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
186 ) override;
187
193 boost::json::value
194 toJson() const override;
195
205 std::expected<boost::json::object, rpc::CombinedError>
207 boost::json::object const& request,
208 std::optional<std::string> const& clientIp,
209 bool isAdmin,
210 boost::asio::yield_context yield
211 ) override;
212
217 std::optional<ETLState>
218 getETLState() noexcept override;
219
226 void
227 stop(boost::asio::yield_context yield) override;
228
229private:
244 template <typename Func>
245 void
246 execute(
247 Func f,
248 uint32_t ledgerSequence,
249 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
250 );
251
255 void
256 chooseForwardingSource();
257};
258
259} // namespace etl
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:35
An interface for LoadBalancer.
Definition LoadBalancerInterface.hpp:40
static std::shared_ptr< LoadBalancerInterface > makeLoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
A factory function for the load balancer.
Definition LoadBalancer.cpp:50
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, InitialLoadObserverInterface &observer, std::chrono::steady_clock::duration retryAfter) override
Load the initial ledger, writing data to the queue.
Definition LoadBalancer.cpp:204
std::expected< boost::json::object, rpc::CombinedError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &clientIp, bool isAdmin, boost::asio::yield_context yield) override
Forward a JSON RPC request to a randomly selected rippled node.
Definition LoadBalancer.cpp:267
LoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
Create an instance of the load balancer.
Definition LoadBalancer.cpp:71
std::optional< ETLState > getETLState() noexcept override
Return state of ETL nodes.
Definition LoadBalancer.cpp:381
boost::json::value toJson() const override
Represent the state of this load balancer as a JSON object.
Definition LoadBalancer.cpp:322
void stop(boost::asio::yield_context yield) override
Stop the load balancer. This will stop all subscription sources.
Definition LoadBalancer.cpp:391
static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding user requests.
Definition LoadBalancer.hpp:103
static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding admin requests.
Definition LoadBalancer.hpp:98
OptionalGetLedgerResponseType fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter=std::chrono::seconds{2}) override
Fetch data for a specific ledger.
Definition LoadBalancer.cpp:234
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:82
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
Definition LoadBalancer.hpp:49
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:17
A tag class to help identify LoadBalancer in templated code.
Definition LoadBalancer.hpp:44