Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionSource.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
23#include "etl/Source.hpp"
24#include "feed/SubscriptionManagerInterface.hpp"
25#include "util/Mutex.hpp"
26#include "util/Retry.hpp"
27#include "util/StopHelper.hpp"
28#include "util/log/Logger.hpp"
29#include "util/prometheus/Gauge.hpp"
30#include "util/requests/Types.hpp"
31#include "util/requests/WsConnection.hpp"
32
33#include <boost/asio/io_context.hpp>
34#include <boost/asio/spawn.hpp>
35#include <boost/asio/strand.hpp>
36#include <boost/beast/http/field.hpp>
37#include <fmt/core.h>
38
39#include <atomic>
40#include <chrono>
41#include <cstdint>
42#include <functional>
43#include <memory>
44#include <optional>
45#include <string>
46#include <utility>
47#include <vector>
48
49namespace etl::impl {
50
56public:
57 using OnConnectHook = SourceBase::OnConnectHook;
58 using OnDisconnectHook = SourceBase::OnDisconnectHook;
59 using OnLedgerClosedHook = SourceBase::OnLedgerClosedHook;
60
61private:
62 util::Logger log_;
63 util::requests::WsConnectionBuilder wsConnectionBuilder_;
64 util::requests::WsConnectionPtr wsConnection_;
65
66 struct ValidatedLedgersData {
67 std::vector<std::pair<uint32_t, uint32_t>> validatedLedgers;
68 std::string validatedLedgersRaw{"N/A"};
69 };
70 util::Mutex<ValidatedLedgersData> validatedLedgersData_;
71
72 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers_;
73 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
74
75 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
76
77 std::chrono::steady_clock::duration wsTimeout_;
78
79 util::Retry retry_;
80
81 OnConnectHook onConnect_;
82 OnDisconnectHook onDisconnect_;
83 OnLedgerClosedHook onLedgerClosed_;
84
85 std::atomic_bool isConnected_{false};
86 std::atomic_bool stop_{false};
87 std::atomic_bool isForwarding_{false};
88
90
91 std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
92
93 util::StopHelper stopHelper_;
94
95 static constexpr std::chrono::seconds kWS_TIMEOUT{30};
96 static constexpr std::chrono::seconds kRETRY_MAX_DELAY{30};
97 static constexpr std::chrono::seconds kRETRY_DELAY{1};
98
99public:
116 boost::asio::io_context& ioContext,
117 std::string const& ip,
118 std::string const& wsPort,
119 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
120 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
121 OnConnectHook onConnect,
122 OnDisconnectHook onDisconnect,
123 OnLedgerClosedHook onLedgerClosed,
124 std::chrono::steady_clock::duration const wsTimeout = SubscriptionSource::kWS_TIMEOUT,
125 std::chrono::steady_clock::duration const retryDelay = SubscriptionSource::kRETRY_DELAY
126 );
127
131 void
132 run();
133
140 bool
141 hasLedger(uint32_t sequence) const;
142
148 bool
149 isConnected() const;
150
156 bool
157 isForwarding() const;
158
166 void
168
174 std::chrono::steady_clock::time_point
175 lastMessageTime() const;
176
182 std::string const&
183 validatedRange() const;
184
188 void
189 stop(boost::asio::yield_context yield);
190
191private:
192 void
193 subscribe();
194
195 std::optional<util::requests::RequestError>
196 handleMessage(std::string const& message);
197
198 void
199 handleError(util::requests::RequestError const& error, boost::asio::yield_context yield);
200
201 void
202 logError(util::requests::RequestError const& error) const;
203
204 void
205 setLastMessageTime();
206
207 void
208 setValidatedRange(std::string range);
209
210 static std::string const&
211 getSubscribeCommandJson();
212};
213
214} // namespace etl::impl
This class is used to subscribe to a source of ledger data and forward it to the subscription manager...
Definition SubscriptionSource.hpp:55
bool hasLedger(uint32_t sequence) const
Check if the source has a ledger.
Definition SubscriptionSource.cpp:100
void setForwarding(bool isForwarding)
Set source forwarding.
Definition SubscriptionSource.cpp:130
void stop(boost::asio::yield_context yield)
Stop the source. The source will complete already scheduled operations but will not schedule new ones...
Definition SubscriptionSource.cpp:149
void run()
Run the source.
Definition SubscriptionSource.cpp:94
SubscriptionSource(boost::asio::io_context &ioContext, std::string const &ip, std::string const &wsPort, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, OnConnectHook onConnect, OnDisconnectHook onDisconnect, OnLedgerClosedHook onLedgerClosed, std::chrono::steady_clock::duration const wsTimeout=SubscriptionSource::kWS_TIMEOUT, std::chrono::steady_clock::duration const retryDelay=SubscriptionSource::kRETRY_DELAY)
Construct a new Subscription Source object.
Definition SubscriptionSource.cpp:60
std::chrono::steady_clock::time_point lastMessageTime() const
Get the last message time (even if the last message had an error)
Definition SubscriptionSource.cpp:137
bool isForwarding() const
Get whether the source is forwarding.
Definition SubscriptionSource.cpp:124
bool isConnected() const
Check if the source is connected.
Definition SubscriptionSource.cpp:118
std::string const & validatedRange() const
Get the last received raw string of the validated ledgers.
Definition SubscriptionSource.cpp:143
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:96
A retry mechanism.
Definition Retry.hpp:80
Helper class to stop a class asynchronously.
Definition StopHelper.hpp:34
Error type for HTTP requests.
Definition Types.hpp:34
Builder for WebSocket connections.
Definition WsConnection.hpp:97