22#include "etl/impl/SubscriptionSource.hpp"
23#include "etlng/InitialLoadObserverInterface.hpp"
25#include "etlng/Source.hpp"
26#include "etlng/impl/ForwardingSource.hpp"
27#include "etlng/impl/GrpcSource.hpp"
30#include <boost/asio/spawn.hpp>
31#include <boost/json/object.hpp>
32#include <grpcpp/support/status.h>
33#include <org/xrpl/rpc/v1/get_ledger.pb.h>
45namespace etlng::impl {
55 typename GrpcSourceType = GrpcSource,
56 typename SubscriptionSourceTypePtr = std::unique_ptr<etl::impl::SubscriptionSource>,
61 std::string grpcPort_;
63 GrpcSourceType grpcSource_;
64 SubscriptionSourceTypePtr subscriptionSource_;
65 ForwardingSourceType forwardingSource_;
78 template <
typename SomeGrpcSourceType,
typename SomeForwardingSourceType>
79 requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
80 std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
85 SomeGrpcSourceType&& grpcSource,
86 SubscriptionSourceTypePtr subscriptionSource,
87 SomeForwardingSourceType&& forwardingSource
90 , wsPort_(std::move(wsPort))
91 , grpcPort_(std::move(grpcPort))
92 , grpcSource_(std::forward<SomeGrpcSourceType>(grpcSource))
93 , subscriptionSource_(std::move(subscriptionSource))
94 , forwardingSource_(std::forward<SomeForwardingSourceType>(forwardingSource))
104 subscriptionSource_->run();
108 stop(boost::asio::yield_context yield)
final
110 subscriptionSource_->stop(yield);
111 grpcSource_.stop(yield);
122 return subscriptionSource_->isConnected();
133 subscriptionSource_->setForwarding(isForwarding);
144 boost::json::object res;
146 res[
"validated_range"] = subscriptionSource_->validatedRange();
147 res[
"is_connected"] = std::to_string(
static_cast<int>(subscriptionSource_->isConnected()));
149 res[
"ws_port"] = wsPort_;
150 res[
"grpc_port"] = grpcPort_;
152 auto last = subscriptionSource_->lastMessageTime();
153 if (last.time_since_epoch().count() != 0) {
154 res[
"last_msg_age_seconds"] = std::to_string(
155 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - last).count()
166 return "{validated range: " + subscriptionSource_->validatedRange() +
", ip: " + ip_ +
167 ", web socket port: " + wsPort_ +
", grpc port: " + grpcPort_ +
"}";
179 return subscriptionSource_->hasLedger(sequence);
193 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
194 fetchLedger(uint32_t sequence,
bool getObjects =
true,
bool getObjectNeighbors =
false) final
196 return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
210 return grpcSource_.loadInitialLedger(sequence, numMarkers, loader);
222 std::expected<boost::json::object, rpc::ClioError>
224 boost::json::object
const& request,
225 std::optional<std::string>
const& forwardToRippledClientIp,
226 std::string_view xUserValue,
227 boost::asio::yield_context yield
230 return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield);
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:54
Provides an implementation of a ETL source.
Definition Source.hpp:52
Definition ForwardingSource.hpp:37
Provides an implementation of a ETL source.
Definition SourceImpl.hpp:58
bool hasLedger(uint32_t sequence) const final
Check if ledger is known by this source.
Definition SourceImpl.hpp:177
std::string toString() const final
Definition SourceImpl.hpp:164
void stop(boost::asio::yield_context yield) final
Stop Source.
Definition SourceImpl.hpp:108
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface &loader) final
Download a ledger in full.
Definition SourceImpl.hpp:208
void setForwarding(bool isForwarding) final
Set the forwarding state of the source.
Definition SourceImpl.hpp:131
boost::json::object toJson() const final
Represent the source as a JSON object.
Definition SourceImpl.hpp:142
bool isConnected() const final
Check if source is connected.
Definition SourceImpl.hpp:120
void run() final
Run subscriptions loop of the source.
Definition SourceImpl.hpp:102
std::expected< boost::json::object, rpc::ClioError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &forwardToRippledClientIp, std::string_view xUserValue, boost::asio::yield_context yield) const final
Forward a request to rippled.
Definition SourceImpl.hpp:223
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t sequence, bool getObjects=true, bool getObjectNeighbors=false) final
Fetch data for a specific ledger.
Definition SourceImpl.hpp:194
SourceImpl(std::string ip, std::string wsPort, std::string grpcPort, SomeGrpcSourceType &&grpcSource, SubscriptionSourceTypePtr subscriptionSource, SomeForwardingSourceType &&forwardingSource)
Construct a new SourceImpl object.
Definition SourceImpl.hpp:81
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36