Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SourceImpl.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "etl/InitialLoadObserverInterface.hpp"
24#include "etl/Source.hpp"
25#include "etl/impl/ForwardingSource.hpp"
26#include "etl/impl/GrpcSource.hpp"
27#include "etl/impl/SubscriptionSource.hpp"
28#include "rpc/Errors.hpp"
29
30#include <boost/asio/spawn.hpp>
31#include <boost/json/object.hpp>
32#include <grpcpp/support/status.h>
33#include <org/xrpl/rpc/v1/get_ledger.pb.h>
34
35#include <chrono>
36#include <cstdint>
37#include <expected>
38#include <memory>
39#include <optional>
40#include <string>
41#include <string_view>
42#include <utility>
43#include <vector>
44
45namespace etl::impl {
46
54template <
55 typename GrpcSourceType = GrpcSource,
56 typename SubscriptionSourceTypePtr = std::unique_ptr<impl::SubscriptionSource>,
57 typename ForwardingSourceType = impl::ForwardingSource>
58class SourceImpl : public SourceBase {
59 std::string ip_;
60 std::string wsPort_;
61 std::string grpcPort_;
62
63 GrpcSourceType grpcSource_;
64 SubscriptionSourceTypePtr subscriptionSource_;
65 ForwardingSourceType forwardingSource_;
66
67public:
78 template <typename SomeGrpcSourceType, typename SomeForwardingSourceType>
79 requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
80 std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
82 std::string ip,
83 std::string wsPort,
84 std::string grpcPort,
85 SomeGrpcSourceType&& grpcSource,
86 SubscriptionSourceTypePtr subscriptionSource,
87 SomeForwardingSourceType&& forwardingSource
88 )
89 : ip_(std::move(ip))
90 , wsPort_(std::move(wsPort))
91 , grpcPort_(std::move(grpcPort))
92 , grpcSource_(std::forward<SomeGrpcSourceType>(grpcSource))
93 , subscriptionSource_(std::move(subscriptionSource))
94 , forwardingSource_(std::forward<SomeForwardingSourceType>(forwardingSource))
95 {
96 }
97
101 void
102 run() final
103 {
104 subscriptionSource_->run();
105 }
106
107 void
108 stop(boost::asio::yield_context yield) final
109 {
110 subscriptionSource_->stop(yield);
111 grpcSource_.stop(yield);
112 }
113
119 bool
120 isConnected() const final
121 {
122 return subscriptionSource_->isConnected();
123 }
124
130 void
131 setForwarding(bool isForwarding) final
132 {
133 subscriptionSource_->setForwarding(isForwarding);
134 }
135
141 boost::json::object
142 toJson() const final
143 {
144 boost::json::object res;
145
146 res["validated_range"] = subscriptionSource_->validatedRange();
147 res["is_connected"] = std::to_string(static_cast<int>(subscriptionSource_->isConnected()));
148 res["ip"] = ip_;
149 res["ws_port"] = wsPort_;
150 res["grpc_port"] = grpcPort_;
151
152 auto last = subscriptionSource_->lastMessageTime();
153 if (last.time_since_epoch().count() != 0) {
154 res["last_msg_age_seconds"] = std::to_string(
155 std::chrono::duration_cast<std::chrono::seconds>(
156 std::chrono::steady_clock::now() - last
157 )
158 .count()
159 );
160 }
161
162 return res;
163 }
164
166 std::string
167 toString() const final
168 {
169 return "{validated range: " + subscriptionSource_->validatedRange() + ", ip: " + ip_ +
170 ", web socket port: " + wsPort_ + ", grpc port: " + grpcPort_ + "}";
171 }
172
179 bool
180 hasLedger(uint32_t sequence) const final
181 {
182 return subscriptionSource_->hasLedger(sequence);
183 }
184
197 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
198 fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) final
199 {
200 return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
201 }
202
213 uint32_t sequence,
214 std::uint32_t numMarkers,
216 ) final
217 {
218 return grpcSource_.loadInitialLedger(sequence, numMarkers, loader);
219 }
220
230 std::expected<boost::json::object, rpc::ClioError>
232 boost::json::object const& request,
233 std::optional<std::string> const& forwardToRippledClientIp,
234 std::string_view xUserValue,
235 boost::asio::yield_context yield
236 ) const final
237 {
238 return forwardingSource_.forwardToRippled(
239 request, forwardToRippledClientIp, xUserValue, yield
240 );
241 }
242};
243
244} // namespace etl::impl
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:54
Provides an implementation of a ETL source.
Definition Source.hpp:52
Definition GrpcSource.hpp:40
std::string toString() const final
Definition SourceImpl.hpp:167
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, InitialLoadObserverInterface &loader) final
Download a ledger in full.
Definition SourceImpl.hpp:212
SourceImpl(std::string ip, std::string wsPort, std::string grpcPort, SomeGrpcSourceType &&grpcSource, SubscriptionSourceTypePtr subscriptionSource, SomeForwardingSourceType &&forwardingSource)
Construct a new SourceImpl object.
Definition SourceImpl.hpp:81
boost::json::object toJson() const final
Represent the source as a JSON object.
Definition SourceImpl.hpp:142
void run() final
Run subscriptions loop of the source.
Definition SourceImpl.hpp:102
std::expected< boost::json::object, rpc::ClioError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &forwardToRippledClientIp, std::string_view xUserValue, boost::asio::yield_context yield) const final
Forward a request to rippled.
Definition SourceImpl.hpp:231
bool isConnected() const final
Check if source is connected.
Definition SourceImpl.hpp:120
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t sequence, bool getObjects=true, bool getObjectNeighbors=false) final
Fetch data for a specific ledger.
Definition SourceImpl.hpp:198
bool hasLedger(uint32_t sequence) const final
Check if ledger is known by this source.
Definition SourceImpl.hpp:180
void setForwarding(bool isForwarding) final
Set the forwarding state of the source.
Definition SourceImpl.hpp:131
void stop(boost::asio::yield_context yield) final
Stop Source.
Definition SourceImpl.hpp:108
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36