3#include "etl/InitialLoadObserverInterface.hpp"
5#include "etl/Source.hpp"
6#include "etl/impl/ForwardingSource.hpp"
7#include "etl/impl/GrpcSource.hpp"
8#include "etl/impl/SubscriptionSource.hpp"
11#include <boost/asio/spawn.hpp>
12#include <boost/json/object.hpp>
13#include <grpcpp/support/status.h>
14#include <org/xrpl/rpc/v1/get_ledger.pb.h>
37 typename SubscriptionSourceTypePtr = std::unique_ptr<impl::SubscriptionSource>,
38 typename ForwardingSourceType = impl::ForwardingSource>
42 std::string grpcPort_;
44 GrpcSourceType grpcSource_;
45 SubscriptionSourceTypePtr subscriptionSource_;
46 ForwardingSourceType forwardingSource_;
59 template <
typename SomeGrpcSourceType,
typename SomeForwardingSourceType>
60 requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
61 std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
66 SomeGrpcSourceType&& grpcSource,
67 SubscriptionSourceTypePtr subscriptionSource,
68 SomeForwardingSourceType&& forwardingSource
71 , wsPort_(std::move(wsPort))
72 , grpcPort_(std::move(grpcPort))
73 , grpcSource_(std::forward<SomeGrpcSourceType>(grpcSource))
74 , subscriptionSource_(std::move(subscriptionSource))
75 , forwardingSource_(std::forward<SomeForwardingSourceType>(forwardingSource))
85 subscriptionSource_->run();
89 stop(boost::asio::yield_context yield)
final
91 subscriptionSource_->stop(yield);
92 grpcSource_.stop(yield);
103 return subscriptionSource_->isConnected();
114 subscriptionSource_->setForwarding(isForwarding);
122 [[nodiscard]] boost::json::object
125 boost::json::object res;
127 res[
"validated_range"] = subscriptionSource_->validatedRange();
128 res[
"is_connected"] = std::to_string(
static_cast<int>(subscriptionSource_->isConnected()));
130 res[
"ws_port"] = wsPort_;
131 res[
"grpc_port"] = grpcPort_;
133 auto last = subscriptionSource_->lastMessageTime();
134 if (last.time_since_epoch().count() != 0) {
135 res[
"last_msg_age_seconds"] = std::to_string(
136 std::chrono::duration_cast<std::chrono::seconds>(
137 std::chrono::steady_clock::now() - last
147 [[nodiscard]] std::string
150 return "{validated range: " + subscriptionSource_->validatedRange() +
", ip: " + ip_ +
151 ", web socket port: " + wsPort_ +
", grpc port: " + grpcPort_ +
"}";
163 return subscriptionSource_->hasLedger(sequence);
178 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
179 fetchLedger(uint32_t sequence,
bool getObjects =
true,
bool getObjectNeighbors =
false) final
181 return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
195 std::uint32_t numMarkers,
199 return grpcSource_.loadInitialLedger(sequence, numMarkers, loader);
211 [[nodiscard]] std::expected<boost::json::object, rpc::ClioError>
213 boost::json::object
const& request,
214 std::optional<std::string>
const& forwardToRippledClientIp,
215 std::string_view xUserValue,
216 boost::asio::yield_context yield
219 return forwardingSource_.forwardToRippled(
220 request, forwardToRippledClientIp, xUserValue, yield
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:35
Provides an implementation of a ETL source.
Definition Source.hpp:33
Definition GrpcSource.hpp:21
std::string toString() const final
Definition SourceImpl.hpp:148
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, InitialLoadObserverInterface &loader) final
Download a ledger in full.
Definition SourceImpl.hpp:193
SourceImpl(std::string ip, std::string wsPort, std::string grpcPort, SomeGrpcSourceType &&grpcSource, SubscriptionSourceTypePtr subscriptionSource, SomeForwardingSourceType &&forwardingSource)
Construct a new SourceImpl object.
Definition SourceImpl.hpp:62
boost::json::object toJson() const final
Represent the source as a JSON object.
Definition SourceImpl.hpp:123
void run() final
Run subscriptions loop of the source.
Definition SourceImpl.hpp:83
std::expected< boost::json::object, rpc::ClioError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &forwardToRippledClientIp, std::string_view xUserValue, boost::asio::yield_context yield) const final
Forward a request to rippled.
Definition SourceImpl.hpp:212
bool isConnected() const final
Check if source is connected.
Definition SourceImpl.hpp:101
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t sequence, bool getObjects=true, bool getObjectNeighbors=false) final
Fetch data for a specific ledger.
Definition SourceImpl.hpp:179
bool hasLedger(uint32_t sequence) const final
Check if ledger is known by this source.
Definition SourceImpl.hpp:161
void setForwarding(bool isForwarding) final
Set the forwarding state of the source.
Definition SourceImpl.hpp:112
void stop(boost::asio::yield_context yield) final
Stop Source.
Definition SourceImpl.hpp:89
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:17