Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionSource.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2024, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
23#include "etl/Source.hpp"
24#include "feed/SubscriptionManagerInterface.hpp"
25#include "util/Mutex.hpp"
26#include "util/Retry.hpp"
27#include "util/StopHelper.hpp"
28#include "util/log/Logger.hpp"
29#include "util/prometheus/Gauge.hpp"
30#include "util/requests/Types.hpp"
31#include "util/requests/WsConnection.hpp"
32
33#include <boost/asio/io_context.hpp>
34#include <boost/asio/spawn.hpp>
35#include <boost/asio/strand.hpp>
36#include <boost/beast/http/field.hpp>
37#include <fmt/format.h>
38
39#include <atomic>
40#include <chrono>
41#include <cstdint>
42#include <functional>
43#include <memory>
44#include <optional>
45#include <string>
46#include <utility>
47#include <vector>
48
49namespace etl::impl {
50
57public:
58 using OnConnectHook = SourceBase::OnConnectHook;
59 using OnDisconnectHook = SourceBase::OnDisconnectHook;
60 using OnLedgerClosedHook = SourceBase::OnLedgerClosedHook;
61
62private:
63 util::Logger log_;
64 util::requests::WsConnectionBuilder wsConnectionBuilder_;
65 util::requests::WsConnectionPtr wsConnection_;
66
67 struct ValidatedLedgersData {
68 std::vector<std::pair<uint32_t, uint32_t>> validatedLedgers;
69 std::string validatedLedgersRaw{"N/A"};
70 };
71 util::Mutex<ValidatedLedgersData> validatedLedgersData_;
72
73 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers_;
74 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
75
76 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
77
78 std::chrono::steady_clock::duration wsTimeout_;
79
80 util::Retry retry_;
81
82 OnConnectHook onConnect_;
83 OnDisconnectHook onDisconnect_;
84 OnLedgerClosedHook onLedgerClosed_;
85
86 std::atomic_bool isConnected_{false};
87 std::atomic_bool stop_{false};
88 std::atomic_bool isForwarding_{false};
89
91
92 std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
93
94 util::StopHelper stopHelper_;
95
96 static constexpr std::chrono::seconds kWS_TIMEOUT{30};
97 static constexpr std::chrono::seconds kRETRY_MAX_DELAY{30};
98 static constexpr std::chrono::seconds kRETRY_DELAY{1};
99
100public:
118 boost::asio::io_context& ioContext,
119 std::string const& ip,
120 std::string const& wsPort,
121 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
122 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
123 OnConnectHook onConnect,
124 OnDisconnectHook onDisconnect,
125 OnLedgerClosedHook onLedgerClosed,
126 std::chrono::steady_clock::duration const wsTimeout = SubscriptionSource::kWS_TIMEOUT,
127 std::chrono::steady_clock::duration const retryDelay = SubscriptionSource::kRETRY_DELAY
128 );
129
133 void
134 run();
135
142 bool
143 hasLedger(uint32_t sequence) const;
144
150 bool
151 isConnected() const;
152
158 bool
159 isForwarding() const;
160
168 void
170
176 std::chrono::steady_clock::time_point
177 lastMessageTime() const;
178
184 std::string const&
185 validatedRange() const;
186
191 void
192 stop(boost::asio::yield_context yield);
193
194private:
195 void
196 subscribe();
197
198 std::optional<util::requests::RequestError>
199 handleMessage(std::string const& message);
200
201 void
202 handleError(util::requests::RequestError const& error, boost::asio::yield_context yield);
203
204 void
205 logError(util::requests::RequestError const& error) const;
206
207 void
208 setLastMessageTime();
209
210 void
211 setValidatedRange(std::string range);
212
213 static std::string const&
214 getSubscribeCommandJson();
215};
216
217} // namespace etl::impl
bool hasLedger(uint32_t sequence) const
Check if the source has a ledger.
Definition SubscriptionSource.cpp:104
void setForwarding(bool isForwarding)
Set source forwarding.
Definition SubscriptionSource.cpp:134
void stop(boost::asio::yield_context yield)
Stop the source. The source will complete already scheduled operations but will not schedule new ones...
Definition SubscriptionSource.cpp:153
void run()
Run the source.
Definition SubscriptionSource.cpp:98
SubscriptionSource(boost::asio::io_context &ioContext, std::string const &ip, std::string const &wsPort, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, OnConnectHook onConnect, OnDisconnectHook onDisconnect, OnLedgerClosedHook onLedgerClosed, std::chrono::steady_clock::duration const wsTimeout=SubscriptionSource::kWS_TIMEOUT, std::chrono::steady_clock::duration const retryDelay=SubscriptionSource::kRETRY_DELAY)
Construct a new Subscription Source object.
Definition SubscriptionSource.cpp:62
std::chrono::steady_clock::time_point lastMessageTime() const
Get the last message time (even if the last message had an error).
Definition SubscriptionSource.cpp:141
bool isForwarding() const
Get whether the source is forwarding.
Definition SubscriptionSource.cpp:128
bool isConnected() const
Check if the source is connected.
Definition SubscriptionSource.cpp:122
std::string const & validatedRange() const
Get the last received raw string of the validated ledgers.
Definition SubscriptionSource.cpp:147
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:101
A retry mechanism.
Definition Retry.hpp:80
Helper class to stop a class asynchronously.
Definition StopHelper.hpp:34
Error type for HTTP requests.
Definition Types.hpp:34
Builder for WebSocket connections.
Definition WsConnection.hpp:103