22#include "data/BackendInterface.hpp"
23#include "etl/ETLState.hpp"
25#include "etlng/InitialLoadObserverInterface.hpp"
27#include "etlng/Source.hpp"
28#include "feed/SubscriptionManagerInterface.hpp"
30#include "util/Assert.hpp"
31#include "util/Mutex.hpp"
32#include "util/Random.hpp"
33#include "util/ResponseExpirationCache.hpp"
34#include "util/config/ConfigDefinition.hpp"
35#include "util/log/Logger.hpp"
36#include "util/prometheus/Counter.hpp"
38#include <boost/asio.hpp>
39#include <boost/asio/io_context.hpp>
40#include <boost/asio/spawn.hpp>
41#include <boost/json/object.hpp>
42#include <boost/json/value.hpp>
43#include <grpcpp/grpcpp.h>
44#include <org/xrpl/rpc/v1/get_ledger.pb.h>
45#include <org/xrpl/rpc/v1/ledger.pb.h>
46#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
80 using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
81 using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
82 using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
85 static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16;
89 std::optional<util::ResponseExpirationCache> forwardingCache_;
90 std::optional<std::string> forwardingXUserValue_;
92 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator_;
94 std::vector<SourcePtr> sources_;
95 std::optional<etl::ETLState> etlState_;
96 std::uint32_t downloadRanges_ =
97 kDEFAULT_DOWNLOAD_RANGES;
99 struct ForwardingCounters {
100 std::reference_wrapper<util::prometheus::CounterInt> successDuration;
101 std::reference_wrapper<util::prometheus::CounterInt> failDuration;
102 std::reference_wrapper<util::prometheus::CounterInt> retries;
103 std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
104 std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
105 } forwardingCounters_;
135 boost::asio::io_context& ioc,
136 std::shared_ptr<BackendInterface> backend,
137 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
138 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
139 std::shared_ptr<etl::NetworkValidatedLedgersInterface> validatedLedgers,
140 SourceFactory sourceFactory = makeSource
155 static std::shared_ptr<LoadBalancerInterface>
158 boost::asio::io_context& ioc,
159 std::shared_ptr<BackendInterface> backend,
160 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
161 std::unique_ptr<util::RandomGeneratorInterface> randomGenerator,
162 std::shared_ptr<etl::NetworkValidatedLedgersInterface> validatedLedgers,
163 SourceFactory sourceFactory = makeSource
174 std::vector<std::string>
176 [[maybe_unused]] uint32_t sequence,
177 [[maybe_unused]] std::chrono::steady_clock::duration retryAfter
180 ASSERT(
false,
"Not available for new ETL");
193 std::vector<std::string>
197 std::chrono::steady_clock::duration retryAfter
213 OptionalGetLedgerResponseType
215 uint32_t ledgerSequence,
217 bool getObjectNeighbors,
218 std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
238 std::expected<boost::json::object, rpc::CombinedError>
240 boost::json::object
const& request,
241 std::optional<std::string>
const& clientIp,
243 boost::asio::yield_context yield
250 std::optional<etl::ETLState>
260 stop(boost::asio::yield_context yield) override;
275 template <typename Func>
277 execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2});
283 chooseForwardingSource();
285 std::expected<boost::json::object, rpc::CombinedError>
286 forwardToRippledImpl(
287 boost::json::object
const& request,
288 std::optional<std::string>
const& clientIp,
290 boost::asio::yield_context yield
An interface for LoadBalancer.
Definition LoadBalancerInterface.hpp:45
This class is used to manage connections to transaction processing processes.
Definition LoadBalancer.hpp:78
void stop(boost::asio::yield_context yield) override
Stop the load balancer. This will stop all subscription sources.
Definition LoadBalancer.cpp:383
LoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< etl::NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
Create an instance of the load balancer.
Definition LoadBalancer.cpp:91
std::optional< etl::ETLState > getETLState() noexcept override
Return state of ETL nodes.
Definition LoadBalancer.cpp:373
std::vector< std::string > loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter) override
Load the initial ledger, writing data to the queue.
Definition LoadBalancer.hpp:175
static std::shared_ptr< LoadBalancerInterface > makeLoadBalancer(util::config::ClioConfigDefinition const &config, boost::asio::io_context &ioc, std::shared_ptr< BackendInterface > backend, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, std::unique_ptr< util::RandomGeneratorInterface > randomGenerator, std::shared_ptr< etl::NetworkValidatedLedgersInterface > validatedLedgers, SourceFactory sourceFactory=makeSource)
A factory function for the load balancer.
Definition LoadBalancer.cpp:70
std::expected< boost::json::object, rpc::CombinedError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &clientIp, bool isAdmin, boost::asio::yield_context yield) override
Forward a JSON RPC request to a randomly selected rippled node.
Definition LoadBalancer.cpp:271
OptionalGetLedgerResponseType fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter=std::chrono::seconds{2}) override
Fetch data for a specific ledger.
Definition LoadBalancer.cpp:241
static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding user requests.
Definition LoadBalancer.hpp:120
boost::json::value toJson() const override
Represent the state of this load balancer as a JSON object.
Definition LoadBalancer.cpp:321
static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE
Value for the X-User header when forwarding admin requests.
Definition LoadBalancer.hpp:115
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
Definition LoadBalancer.hpp:69
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36
A tag class to help identify LoadBalancer in templated code.
Definition LoadBalancer.hpp:64