Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SourceImpl.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "etl/impl/SubscriptionSource.hpp"
23#include "etlng/InitialLoadObserverInterface.hpp"
25#include "etlng/Source.hpp"
26#include "etlng/impl/ForwardingSource.hpp"
27#include "etlng/impl/GrpcSource.hpp"
28#include "rpc/Errors.hpp"
29
30#include <boost/asio/spawn.hpp>
31#include <boost/json/object.hpp>
32#include <grpcpp/support/status.h>
33#include <org/xrpl/rpc/v1/get_ledger.pb.h>
34
35#include <chrono>
36#include <cstdint>
37#include <expected>
38#include <memory>
39#include <optional>
40#include <string>
41#include <string_view>
42#include <utility>
43#include <vector>
44
45namespace etlng::impl {
46
54template <
55 typename GrpcSourceType = GrpcSource,
56 typename SubscriptionSourceTypePtr = std::unique_ptr<etl::impl::SubscriptionSource>,
57 typename ForwardingSourceType = etlng::impl::ForwardingSource>
58class SourceImpl : public SourceBase {
59 std::string ip_;
60 std::string wsPort_;
61 std::string grpcPort_;
62
63 GrpcSourceType grpcSource_;
64 SubscriptionSourceTypePtr subscriptionSource_;
65 ForwardingSourceType forwardingSource_;
66
67public:
78 template <typename SomeGrpcSourceType, typename SomeForwardingSourceType>
79 requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
80 std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
82 std::string ip,
83 std::string wsPort,
84 std::string grpcPort,
85 SomeGrpcSourceType&& grpcSource,
86 SubscriptionSourceTypePtr subscriptionSource,
87 SomeForwardingSourceType&& forwardingSource
88 )
89 : ip_(std::move(ip))
90 , wsPort_(std::move(wsPort))
91 , grpcPort_(std::move(grpcPort))
92 , grpcSource_(std::forward<SomeGrpcSourceType>(grpcSource))
93 , subscriptionSource_(std::move(subscriptionSource))
94 , forwardingSource_(std::forward<SomeForwardingSourceType>(forwardingSource))
95 {
96 }
97
101 void
102 run() final
103 {
104 subscriptionSource_->run();
105 }
106
107 void
108 stop(boost::asio::yield_context yield) final
109 {
110 subscriptionSource_->stop(yield);
111 grpcSource_.stop(yield);
112 }
113
119 bool
120 isConnected() const final
121 {
122 return subscriptionSource_->isConnected();
123 }
124
130 void
131 setForwarding(bool isForwarding) final
132 {
133 subscriptionSource_->setForwarding(isForwarding);
134 }
135
141 boost::json::object
142 toJson() const final
143 {
144 boost::json::object res;
145
146 res["validated_range"] = subscriptionSource_->validatedRange();
147 res["is_connected"] = std::to_string(static_cast<int>(subscriptionSource_->isConnected()));
148 res["ip"] = ip_;
149 res["ws_port"] = wsPort_;
150 res["grpc_port"] = grpcPort_;
151
152 auto last = subscriptionSource_->lastMessageTime();
153 if (last.time_since_epoch().count() != 0) {
154 res["last_msg_age_seconds"] = std::to_string(
155 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - last).count()
156 );
157 }
158
159 return res;
160 }
161
163 std::string
164 toString() const final
165 {
166 return "{validated range: " + subscriptionSource_->validatedRange() + ", ip: " + ip_ +
167 ", web socket port: " + wsPort_ + ", grpc port: " + grpcPort_ + "}";
168 }
169
176 bool
177 hasLedger(uint32_t sequence) const final
178 {
179 return subscriptionSource_->hasLedger(sequence);
180 }
181
193 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
194 fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) final
195 {
196 return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
197 }
198
208 loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) final
209 {
210 return grpcSource_.loadInitialLedger(sequence, numMarkers, loader);
211 }
212
222 std::expected<boost::json::object, rpc::ClioError>
224 boost::json::object const& request,
225 std::optional<std::string> const& forwardToRippledClientIp,
226 std::string_view xUserValue,
227 boost::asio::yield_context yield
228 ) const final
229 {
230 return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield);
231 }
232};
233
234} // namespace etlng::impl
std::expected< std::vector< std::string >, InitialLedgerLoadError > InitialLedgerLoadResult
The result type of the initial ledger load.
Definition LoadBalancerInterface.hpp:54
Provides an implementation of a ETL source.
Definition Source.hpp:52
Definition ForwardingSource.hpp:37
Provides an implementation of a ETL source.
Definition SourceImpl.hpp:58
bool hasLedger(uint32_t sequence) const final
Check if ledger is known by this source.
Definition SourceImpl.hpp:177
std::string toString() const final
Definition SourceImpl.hpp:164
void stop(boost::asio::yield_context yield) final
Stop Source.
Definition SourceImpl.hpp:108
InitialLedgerLoadResult loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface &loader) final
Download a ledger in full.
Definition SourceImpl.hpp:208
void setForwarding(bool isForwarding) final
Set the forwarding state of the source.
Definition SourceImpl.hpp:131
boost::json::object toJson() const final
Represent the source as a JSON object.
Definition SourceImpl.hpp:142
bool isConnected() const final
Check if source is connected.
Definition SourceImpl.hpp:120
void run() final
Run subscriptions loop of the source.
Definition SourceImpl.hpp:102
std::expected< boost::json::object, rpc::ClioError > forwardToRippled(boost::json::object const &request, std::optional< std::string > const &forwardToRippledClientIp, std::string_view xUserValue, boost::asio::yield_context yield) const final
Forward a request to rippled.
Definition SourceImpl.hpp:223
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t sequence, bool getObjects=true, bool getObjectNeighbors=false) final
Fetch data for a specific ledger.
Definition SourceImpl.hpp:194
SourceImpl(std::string ip, std::string wsPort, std::string grpcPort, SomeGrpcSourceType &&grpcSource, SubscriptionSourceTypePtr subscriptionSource, SomeForwardingSourceType &&forwardingSource)
Construct a new SourceImpl object.
Definition SourceImpl.hpp:81
The interface for observing the initial ledger load.
Definition InitialLoadObserverInterface.hpp:36