22#include "etl/impl/ForwardingSource.hpp"
23#include "etl/impl/SubscriptionSource.hpp"
24#include "etlng/InitialLoadObserverInterface.hpp"
25#include "etlng/Source.hpp"
26#include "etlng/impl/GrpcSource.hpp"
29#include <boost/asio/spawn.hpp>
30#include <boost/json/object.hpp>
31#include <grpcpp/support/status.h>
32#include <org/xrpl/rpc/v1/get_ledger.pb.h>
44namespace etlng::impl {
54 typename GrpcSourceType = GrpcSource,
55 typename SubscriptionSourceTypePtr = std::unique_ptr<etl::impl::SubscriptionSource>,
60 std::string grpcPort_;
62 GrpcSourceType grpcSource_;
63 SubscriptionSourceTypePtr subscriptionSource_;
64 ForwardingSourceType forwardingSource_;
77 template <
typename SomeGrpcSourceType,
typename SomeForwardingSourceType>
78 requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
79 std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
84 SomeGrpcSourceType&& grpcSource,
85 SubscriptionSourceTypePtr subscriptionSource,
86 SomeForwardingSourceType&& forwardingSource
89 , wsPort_(std::move(wsPort))
90 , grpcPort_(std::move(grpcPort))
91 , grpcSource_(std::forward<SomeGrpcSourceType>(grpcSource))
92 , subscriptionSource_(std::move(subscriptionSource))
93 , forwardingSource_(std::forward<SomeForwardingSourceType>(forwardingSource))
103 subscriptionSource_->run();
107 stop(boost::asio::yield_context yield)
final
109 subscriptionSource_->stop(yield);
120 return subscriptionSource_->isConnected();
131 subscriptionSource_->setForwarding(isForwarding);
142 boost::json::object res;
144 res[
"validated_range"] = subscriptionSource_->validatedRange();
145 res[
"is_connected"] = std::to_string(
static_cast<int>(subscriptionSource_->isConnected()));
147 res[
"ws_port"] = wsPort_;
148 res[
"grpc_port"] = grpcPort_;
150 auto last = subscriptionSource_->lastMessageTime();
151 if (last.time_since_epoch().count() != 0) {
152 res[
"last_msg_age_seconds"] = std::to_string(
153 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - last).count()
164 return "{validated range: " + subscriptionSource_->validatedRange() +
", ip: " + ip_ +
165 ", web socket port: " + wsPort_ +
", grpc port: " + grpcPort_ +
"}";
177 return subscriptionSource_->hasLedger(sequence);
191 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
192 fetchLedger(uint32_t sequence,
bool getObjects =
true,
bool getObjectNeighbors =
false) final
194 return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
205 std::pair<std::vector<std::string>,
bool>
208 return grpcSource_.loadInitialLedger(sequence, numMarkers, loader);
220 std::expected<boost::json::object, rpc::ClioError>
222 boost::json::object
const& request,
223 std::optional<std::string>
const& forwardToRippledClientIp,
224 std::string_view xUserValue,
225 boost::asio::yield_context yield
228 return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield);
Definition ForwardingSource.hpp:37
Provides an implementation of a ETL source.
Definition Source.hpp:52
Provides an implementation of a ETL source.
Definition SourceImpl.hpp:57
bool hasLedger(uint32_t sequence) const final
Check if ledger is known by this source.
Definition SourceImpl.hpp:175
std::pair< std::vector< std::string >, bool > loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface &loader) final
Download a ledger in full.
Definition SourceImpl.hpp:206
std::string toString() const final
Definition SourceImpl.hpp:162
void stop(boost::asio::yield_context yield) final
Stop Source.
Definition SourceImpl.hpp:107
void setForwarding(bool isForwarding) final
Set the forwarding state of the source.
Definition SourceImpl.hpp:129
boost::json::object toJson() const final
Represent the source as a JSON object.
Definition SourceImpl.hpp:140
bool isConnected() const final
Check if source is connected.
Definition SourceImpl.hpp:118
void run() final
Run subscriptions loop of the source.
Definition SourceImpl.hpp:101
std::expected< boost::json::object, rpc::ClioError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &forwardToRippledClientIp, std::string_view xUserValue, boost::asio::yield_context yield) const final
Forward a request to rippled.
Definition SourceImpl.hpp:221
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t sequence, bool getObjects=true, bool getObjectNeighbors=false) final
Fetch data for a specific ledger.
Definition SourceImpl.hpp:192
SourceImpl(std::string ip, std::string wsPort, std::string grpcPort, SomeGrpcSourceType &&grpcSource, SubscriptionSourceTypePtr subscriptionSource, SomeForwardingSourceType &&forwardingSource)
Construct a new SourceImpl object.
Definition SourceImpl.hpp:80
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36