Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SourceImpl.hpp
1#pragma once
2
3#include "etl/InitialLoadObserverInterface.hpp"
5#include "etl/Source.hpp"
6#include "etl/impl/ForwardingSource.hpp"
7#include "etl/impl/GrpcSource.hpp"
8#include "etl/impl/SubscriptionSource.hpp"
9#include "rpc/Errors.hpp"
10
11#include <boost/asio/spawn.hpp>
12#include <boost/json/object.hpp>
13#include <grpcpp/support/status.h>
14#include <org/xrpl/rpc/v1/get_ledger.pb.h>
15
16#include <chrono>
17#include <cstdint>
18#include <expected>
19#include <memory>
20#include <optional>
21#include <string>
22#include <string_view>
23#include <utility>
24#include <vector>
25
26namespace etl::impl {
27
35template <
36 typename GrpcSourceType = GrpcSource,
37 typename SubscriptionSourceTypePtr = std::unique_ptr<impl::SubscriptionSource>,
38 typename ForwardingSourceType = impl::ForwardingSource>
39class SourceImpl : public SourceBase {
40 std::string ip_;
41 std::string wsPort_;
42 std::string grpcPort_;
43
44 GrpcSourceType grpcSource_;
45 SubscriptionSourceTypePtr subscriptionSource_;
46 ForwardingSourceType forwardingSource_;
47
48public:
59 template <typename SomeGrpcSourceType, typename SomeForwardingSourceType>
60 requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
61 std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
63 std::string ip,
64 std::string wsPort,
65 std::string grpcPort,
66 SomeGrpcSourceType&& grpcSource,
67 SubscriptionSourceTypePtr subscriptionSource,
68 SomeForwardingSourceType&& forwardingSource
69 )
70 : ip_(std::move(ip))
71 , wsPort_(std::move(wsPort))
72 , grpcPort_(std::move(grpcPort))
73 , grpcSource_(std::forward<SomeGrpcSourceType>(grpcSource))
74 , subscriptionSource_(std::move(subscriptionSource))
75 , forwardingSource_(std::forward<SomeForwardingSourceType>(forwardingSource))
76 {
77 }
78
82 void
83 run() final
84 {
85 subscriptionSource_->run();
86 }
87
88 void
89 stop(boost::asio::yield_context yield) final
90 {
91 subscriptionSource_->stop(yield);
92 grpcSource_.stop(yield);
93 }
94
100 [[nodiscard]] bool
101 isConnected() const final
102 {
103 return subscriptionSource_->isConnected();
104 }
105
111 void
112 setForwarding(bool isForwarding) final
113 {
114 subscriptionSource_->setForwarding(isForwarding);
115 }
116
122 [[nodiscard]] boost::json::object
123 toJson() const final
124 {
125 boost::json::object res;
126
127 res["validated_range"] = subscriptionSource_->validatedRange();
128 res["is_connected"] = std::to_string(static_cast<int>(subscriptionSource_->isConnected()));
129 res["ip"] = ip_;
130 res["ws_port"] = wsPort_;
131 res["grpc_port"] = grpcPort_;
132
133 auto last = subscriptionSource_->lastMessageTime();
134 if (last.time_since_epoch().count() != 0) {
135 res["last_msg_age_seconds"] = std::to_string(
136 std::chrono::duration_cast<std::chrono::seconds>(
137 std::chrono::steady_clock::now() - last
138 )
139 .count()
140 );
141 }
142
143 return res;
144 }
145
147 [[nodiscard]] std::string
148 toString() const final
149 {
150 return "{validated range: " + subscriptionSource_->validatedRange() + ", ip: " + ip_ +
151 ", web socket port: " + wsPort_ + ", grpc port: " + grpcPort_ + "}";
152 }
153
160 [[nodiscard]] bool
161 hasLedger(uint32_t sequence) const final
162 {
163 return subscriptionSource_->hasLedger(sequence);
164 }
165
178 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
179 fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) final
180 {
181 return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
182 }
183
194 uint32_t sequence,
195 std::uint32_t numMarkers,
197 ) final
198 {
199 return grpcSource_.loadInitialLedger(sequence, numMarkers, loader);
200 }
201
211 [[nodiscard]] std::expected<boost::json::object, rpc::ClioError>
213 boost::json::object const& request,
214 std::optional<std::string> const& forwardToRippledClientIp,
215 std::string_view xUserValue,
216 boost::asio::yield_context yield
217 ) const final
218 {
219 return forwardingSource_.forwardToRippled(
220 request, forwardToRippledClientIp, xUserValue, yield
221 );
222 }
223};
224
225} // namespace etl::impl
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:35
Provides an implementation of a ETL source.
Definition Source.hpp:33
Definition GrpcSource.hpp:21
std::string toString() const final
Definition SourceImpl.hpp:148
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, InitialLoadObserverInterface &loader) final
Download a ledger in full.
Definition SourceImpl.hpp:193
SourceImpl(std::string ip, std::string wsPort, std::string grpcPort, SomeGrpcSourceType &&grpcSource, SubscriptionSourceTypePtr subscriptionSource, SomeForwardingSourceType &&forwardingSource)
Construct a new SourceImpl object.
Definition SourceImpl.hpp:62
boost::json::object toJson() const final
Represent the source as a JSON object.
Definition SourceImpl.hpp:123
void run() final
Run subscriptions loop of the source.
Definition SourceImpl.hpp:83
std::expected< boost::json::object, rpc::ClioError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &forwardToRippledClientIp, std::string_view xUserValue, boost::asio::yield_context yield) const final
Forward a request to rippled.
Definition SourceImpl.hpp:212
bool isConnected() const final
Check if source is connected.
Definition SourceImpl.hpp:101
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t sequence, bool getObjects=true, bool getObjectNeighbors=false) final
Fetch data for a specific ledger.
Definition SourceImpl.hpp:179
bool hasLedger(uint32_t sequence) const final
Check if ledger is known by this source.
Definition SourceImpl.hpp:161
void setForwarding(bool isForwarding) final
Set the forwarding state of the source.
Definition SourceImpl.hpp:112
void stop(boost::asio::yield_context yield) final
Stop Source.
Definition SourceImpl.hpp:89
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:17