22#include "etl/Source.hpp"
23#include "etl/impl/ForwardingSource.hpp"
24#include "etl/impl/GrpcSource.hpp"
25#include "etl/impl/SubscriptionSource.hpp"
28#include <boost/asio/spawn.hpp>
29#include <boost/json/object.hpp>
30#include <grpcpp/support/status.h>
31#include <org/xrpl/rpc/v1/get_ledger.pb.h>
53 typename GrpcSourceType = GrpcSource,
54 typename SubscriptionSourceTypePtr = std::unique_ptr<SubscriptionSource>,
55 typename ForwardingSourceType = ForwardingSource>
59 std::string grpcPort_;
61 GrpcSourceType grpcSource_;
62 SubscriptionSourceTypePtr subscriptionSource_;
63 ForwardingSourceType forwardingSource_;
76 template <
typename SomeGrpcSourceType,
typename SomeForwardingSourceType>
77 requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
78 std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
83 SomeGrpcSourceType&& grpcSource,
84 SubscriptionSourceTypePtr subscriptionSource,
85 SomeForwardingSourceType&& forwardingSource
88 , wsPort_(std::move(wsPort))
89 , grpcPort_(std::move(grpcPort))
90 , grpcSource_(std::forward<SomeGrpcSourceType>(grpcSource))
91 , subscriptionSource_(std::move(subscriptionSource))
92 , forwardingSource_(std::forward<SomeForwardingSourceType>(forwardingSource))
102 subscriptionSource_->run();
106 stop(boost::asio::yield_context yield)
final
108 subscriptionSource_->stop(yield);
119 return subscriptionSource_->isConnected();
130 subscriptionSource_->setForwarding(isForwarding);
141 boost::json::object res;
143 res[
"validated_range"] = subscriptionSource_->validatedRange();
144 res[
"is_connected"] = std::to_string(
static_cast<int>(subscriptionSource_->isConnected()));
146 res[
"ws_port"] = wsPort_;
147 res[
"grpc_port"] = grpcPort_;
149 auto last = subscriptionSource_->lastMessageTime();
150 if (last.time_since_epoch().count() != 0) {
151 res[
"last_msg_age_seconds"] = std::to_string(
152 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - last).count()
163 return "{validated range: " + subscriptionSource_->validatedRange() +
", ip: " + ip_ +
164 ", web socket port: " + wsPort_ +
", grpc port: " + grpcPort_ +
"}";
176 return subscriptionSource_->hasLedger(sequence);
190 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
191 fetchLedger(uint32_t sequence,
bool getObjects =
true,
bool getObjectNeighbors =
false) final
193 return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
204 std::pair<std::vector<std::string>,
bool>
207 return grpcSource_.loadInitialLedger(sequence, numMarkers, cacheOnly);
219 std::expected<boost::json::object, rpc::ClioError>
221 boost::json::object
const& request,
222 std::optional<std::string>
const& forwardToRippledClientIp,
223 std::string_view xUserValue,
224 boost::asio::yield_context yield
227 return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield);
Provides an implementation of a ETL source.
Definition Source.hpp:54
Provides an implementation of a ETL source.
Definition SourceImpl.hpp:56
std::string toString() const final
Definition SourceImpl.hpp:161
boost::json::object toJson() const final
Represent the source as a JSON object.
Definition SourceImpl.hpp:139
void run() final
Run subscriptions loop of the source.
Definition SourceImpl.hpp:100
std::expected< boost::json::object, rpc::ClioError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &forwardToRippledClientIp, std::string_view xUserValue, boost::asio::yield_context yield) const final
Forward a request to rippled.
Definition SourceImpl.hpp:220
bool isConnected() const final
Check if source is connected.
Definition SourceImpl.hpp:117
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t sequence, bool getObjects=true, bool getObjectNeighbors=false) final
Fetch data for a specific ledger.
Definition SourceImpl.hpp:191
SourceImpl(std::string ip, std::string wsPort, std::string grpcPort, SomeGrpcSourceType &&grpcSource, SubscriptionSourceTypePtr subscriptionSource, SomeForwardingSourceType &&forwardingSource)
Construct a new SourceImpl object.
Definition SourceImpl.hpp:79
std::pair< std::vector< std::string >, bool > loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly=false) final
Download a ledger in full.
Definition SourceImpl.hpp:205
bool hasLedger(uint32_t sequence) const final
Check if ledger is known by this source.
Definition SourceImpl.hpp:174
void setForwarding(bool isForwarding) final
Set the forwarding state of the source.
Definition SourceImpl.hpp:128
void stop(boost::asio::yield_context yield) final
Stop Source.
Definition SourceImpl.hpp:106