Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionSource.hpp
1#pragma once
2
4#include "etl/Source.hpp"
5#include "feed/SubscriptionManagerInterface.hpp"
6#include "util/Mutex.hpp"
7#include "util/Retry.hpp"
8#include "util/StopHelper.hpp"
9#include "util/log/Logger.hpp"
10#include "util/prometheus/Gauge.hpp"
11#include "util/requests/Types.hpp"
12#include "util/requests/WsConnection.hpp"
13
14#include <boost/asio/io_context.hpp>
15#include <boost/asio/spawn.hpp>
16#include <boost/asio/strand.hpp>
17#include <boost/beast/http/field.hpp>
18#include <fmt/format.h>
19
20#include <atomic>
21#include <chrono>
22#include <cstdint>
23#include <functional>
24#include <memory>
25#include <optional>
26#include <string>
27#include <utility>
28#include <vector>
29
30namespace etl::impl {
31
38public:
39 using OnConnectHook = SourceBase::OnConnectHook;
40 using OnDisconnectHook = SourceBase::OnDisconnectHook;
41 using OnLedgerClosedHook = SourceBase::OnLedgerClosedHook;
42
43private:
44 util::Logger log_;
45 util::requests::WsConnectionBuilder wsConnectionBuilder_;
46 util::requests::WsConnectionPtr wsConnection_;
47
48 struct ValidatedLedgersData {
49 std::vector<std::pair<uint32_t, uint32_t>> validatedLedgers;
50 std::string validatedLedgersRaw{"N/A"};
51 };
52 util::Mutex<ValidatedLedgersData> validatedLedgersData_;
53
54 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers_;
55 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
56
57 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
58
59 std::chrono::steady_clock::duration wsTimeout_;
60
61 util::Retry retry_;
62
63 OnConnectHook onConnect_;
64 OnDisconnectHook onDisconnect_;
65 OnLedgerClosedHook onLedgerClosed_;
66
67 std::atomic_bool isConnected_{false};
68 std::atomic_bool stop_{false};
69 std::atomic_bool isForwarding_{false};
70
72
73 std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
74
75 util::StopHelper stopHelper_;
76
77 static constexpr std::chrono::seconds kWS_TIMEOUT{30};
78 static constexpr std::chrono::seconds kRETRY_MAX_DELAY{30};
79 static constexpr std::chrono::seconds kRETRY_DELAY{1};
80
81public:
99 boost::asio::io_context& ioContext,
100 std::string const& ip,
101 std::string const& wsPort,
102 std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
103 std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
104 OnConnectHook onConnect,
105 OnDisconnectHook onDisconnect,
106 OnLedgerClosedHook onLedgerClosed,
107 std::chrono::steady_clock::duration const wsTimeout = SubscriptionSource::kWS_TIMEOUT,
108 std::chrono::steady_clock::duration const retryDelay = SubscriptionSource::kRETRY_DELAY
109 );
110
114 void
115 run();
116
123 bool
124 hasLedger(uint32_t sequence) const;
125
131 bool
132 isConnected() const;
133
139 bool
140 isForwarding() const;
141
149 void
151
157 std::chrono::steady_clock::time_point
158 lastMessageTime() const;
159
165 std::string const&
166 validatedRange() const;
167
172 void
173 stop(boost::asio::yield_context yield);
174
175private:
176 void
177 subscribe();
178
179 std::optional<util::requests::RequestError>
180 handleMessage(std::string const& message);
181
182 void
183 handleError(util::requests::RequestError const& error, boost::asio::yield_context yield);
184
185 void
186 logError(util::requests::RequestError const& error) const;
187
188 void
189 setLastMessageTime();
190
191 void
192 setValidatedRange(std::string range);
193
194 static std::string const&
195 getSubscribeCommandJson();
196};
197
198} // namespace etl::impl
bool hasLedger(uint32_t sequence) const
Check if the source has a ledger.
Definition SubscriptionSource.cpp:85
void setForwarding(bool isForwarding)
Set source forwarding.
Definition SubscriptionSource.cpp:115
void stop(boost::asio::yield_context yield)
Stop the source. The source will complete already scheduled operations but will not schedule new ones...
Definition SubscriptionSource.cpp:134
void run()
Run the source.
Definition SubscriptionSource.cpp:79
SubscriptionSource(boost::asio::io_context &ioContext, std::string const &ip, std::string const &wsPort, std::shared_ptr< NetworkValidatedLedgersInterface > validatedLedgers, std::shared_ptr< feed::SubscriptionManagerInterface > subscriptions, OnConnectHook onConnect, OnDisconnectHook onDisconnect, OnLedgerClosedHook onLedgerClosed, std::chrono::steady_clock::duration const wsTimeout=SubscriptionSource::kWS_TIMEOUT, std::chrono::steady_clock::duration const retryDelay=SubscriptionSource::kRETRY_DELAY)
Construct a new Subscription Source object.
Definition SubscriptionSource.cpp:43
std::chrono::steady_clock::time_point lastMessageTime() const
Get the last message time (even if the last message had an error).
Definition SubscriptionSource.cpp:122
bool isForwarding() const
Get whether the source is forwarding.
Definition SubscriptionSource.cpp:109
bool isConnected() const
Check if the source is connected.
Definition SubscriptionSource.cpp:103
std::string const & validatedRange() const
Get the last received raw string of the validated ledgers.
Definition SubscriptionSource.cpp:128
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:78
A container for data that is protected by a mutex. Inspired by Mutex in Rust.
Definition Mutex.hpp:82
A retry mechanism.
Definition Retry.hpp:61
Helper class to stop a class asynchronously.
Definition StopHelper.hpp:15
Error type for HTTP requests.
Definition Types.hpp:15
Builder for WebSocket connections.
Definition WsConnection.hpp:84