Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
LoadBalancer.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etl/ETLState.hpp"
25#include "etl/Source.hpp"
26#include "etlng/InitialLoadObserverInterface.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
29#include "rpc/Errors.hpp"
30#include "util/Assert.hpp"
31#include "util/Mutex.hpp"
32#include "util/ResponseExpirationCache.hpp"
33#include "util/config/ConfigDefinition.hpp"
34#include "util/log/Logger.hpp"
35#include "util/prometheus/Counter.hpp"
36
37#include <boost/asio.hpp>
38#include <boost/asio/io_context.hpp>
39#include <boost/asio/spawn.hpp>
40#include <boost/json/object.hpp>
41#include <boost/json/value.hpp>
42#include <grpcpp/grpcpp.h>
43#include <org/xrpl/rpc/v1/get_ledger.pb.h>
44#include <org/xrpl/rpc/v1/ledger.pb.h>
45#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
46
47#include <chrono>
48#include <concepts>
49#include <cstdint>
50#include <expected>
51#include <memory>
52#include <optional>
53#include <string>
54#include <string_view>
55#include <utility>
56#include <vector>
57
58namespace etl {
59
64 virtual ~LoadBalancerTag() = default;
65};
66
67template <typename T>
68concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
69
78public:
79 using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
80 using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
81 using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
82
83private:
84 static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16;
85
86 util::Logger log_{"ETL"};
87 // Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache
88 std::optional<util::ResponseExpirationCache> forwardingCache_;
89 std::optional<std::string> forwardingXUserValue_;
90
91 std::vector<SourcePtr> sources_;
92 std::optional<ETLState> etlState_;
93 std::uint32_t downloadRanges_ =
94 kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
95
96 struct ForwardingCounters {
97 std::reference_wrapper<util::prometheus::CounterInt> successDuration;
98 std::reference_wrapper<util::prometheus::CounterInt> failDuration;
99 std::reference_wrapper<util::prometheus::CounterInt> retries;
100 std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
101 std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
102 } forwardingCounters_;
103
104 // Using mutex instead of atomic_bool because choosing a new source to
105 // forward messages should be done with a mutual exclusion otherwise there will be a race condition
106 util::Mutex<bool> hasForwardingSource_{false};
107
108public:
112 static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin";
113
117 static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user";
118
131 boost::asio::io_context& ioc,
132 std::shared_ptr<BackendInterface> backend,
133 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
134 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
135 SourceFactory sourceFactory = makeSource
136 );
137
149 static std::shared_ptr<LoadBalancerInterface>
152 boost::asio::io_context& ioc,
153 std::shared_ptr<BackendInterface> backend,
154 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
155 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
156 SourceFactory sourceFactory = makeSource
157 );
158
167 std::vector<std::string>
168 loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2})
169 override;
170
180 std::vector<std::string>
182 [[maybe_unused]] uint32_t sequence,
183 [[maybe_unused]] etlng::InitialLoadObserverInterface& observer,
184 [[maybe_unused]] std::chrono::steady_clock::duration retryAfter
185 ) override
186 {
187 ASSERT(false, "Not available for old ETL");
188 std::unreachable();
189 }
190
204 OptionalGetLedgerResponseType
206 uint32_t ledgerSequence,
207 bool getObjects,
208 bool getObjectNeighbors,
209 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
210 ) override;
211
217 boost::json::value
218 toJson() const override;
219
229 std::expected<boost::json::object, rpc::CombinedError>
231 boost::json::object const& request,
232 std::optional<std::string> const& clientIp,
233 bool isAdmin,
234 boost::asio::yield_context yield
235 ) override;
236
241 std::optional<ETLState>
242 getETLState() noexcept override;
243
250 void
251 stop(boost::asio::yield_context yield) override;
252
253private:
266 template <typename Func>
267 void
268 execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2});
269
273 void
274 chooseForwardingSource();
275
276 std::expected<boost::json::object, rpc::CombinedError>
277 forwardToRippledImpl(
278 boost::json::object const& request,
279 std::optional<std::string> const& clientIp,
280 bool isAdmin,
281 boost::asio::yield_context yield
282 );
283};
284
285} // namespace etl
This class is used to manage connections to transaction processing processes.
Definition LoadBalancer.hpp:77
std::vector< std::string > loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter=std::chrono::seconds{2}) override
Load the initial ledger, writing data to the queue.
Definition LoadBalancer.cpp:205
static std::shared_ptr< LoadBalancerInterface > makeLoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
A factory function for the load balancer.
Definition LoadBalancer.cpp:69
LoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
Create an instance of the load balancer.
Definition LoadBalancer.cpp:83
std::expected< boost::json::object, rpc::CombinedError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &clientIp, bool isAdmin, boost::asio::yield_context yield) override
Forward a JSON RPC request to a randomly selected rippled node.
Definition LoadBalancer.cpp:258
std::optional< ETLState > getETLState() noexcept override
Return state of ETL nodes.
Definition LoadBalancer.cpp:360
std::vector< std::string > loadInitialLedger(uint32_t sequence, etlng::InitialLoadObserverInterface &observer, std::chrono::steady_clock::duration retryAfter) override
Load the initial ledger, writing data to the queue.
Definition LoadBalancer.hpp:181
boost::json::value toJson() const override
Represent the state of this load balancer as a JSON object.
Definition LoadBalancer.cpp:308
void stop(boost::asio::yield_context yield) override
Stop the load balancer. This will stop all subscription sources.
Definition LoadBalancer.cpp:370
static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding user requests.
Definition LoadBalancer.hpp:117
static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding admin requests.
Definition LoadBalancer.hpp:112
OptionalGetLedgerResponseType fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter=std::chrono::seconds{2}) override
Fetch data for a specific ledger.
Definition LoadBalancer.cpp:228
An interface for LoadBalancer.
Definition LoadBalancerInterface.hpp:45
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
Definition LoadBalancer.hpp:68
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:39
SourcePtr makeSource(util::config::ObjectView const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, std::chrono::steady_clock::duration forwardingTimeout, SourceBase::OnConnectHook onConnect, SourceBase::OnDisconnectHook onDisconnect, SourceBase::OnLedgerClosedHook onLedgerClosed)
Create a source.
Definition Source.cpp:41
A tag class to help identify LoadBalancer in templated code.
Definition LoadBalancer.hpp:63
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36