Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
TransactionFeed.hpp
1#pragma once
2
3#include "data/AmendmentCenterInterface.hpp"
4#include "data/BackendInterface.hpp"
5#include "data/Types.hpp"
6#include "feed/Types.hpp"
7#include "feed/impl/TrackableSignal.hpp"
8#include "feed/impl/TrackableSignalMap.hpp"
9#include "feed/impl/Util.hpp"
10#include "util/async/AnyExecutionContext.hpp"
11#include "util/async/AnyStrand.hpp"
12#include "util/log/Logger.hpp"
13#include "util/prometheus/Gauge.hpp"
14
15#include <boost/asio/io_context.hpp>
16#include <boost/asio/strand.hpp>
17#include <fmt/format.h>
18#include <xrpl/protocol/AccountID.h>
19#include <xrpl/protocol/Book.h>
20#include <xrpl/protocol/LedgerHeader.h>
21
22#include <cstdint>
23#include <functional>
24#include <memory>
25#include <string>
26#include <unordered_set>
27
28namespace feed::impl {
29
31 // Hold two versions of transaction messages
32 struct AllVersionsMsgsType {
33 std::string v1;
34 std::string v2;
35 };
36
37 struct TransactionSlot {
38 std::reference_wrapper<TransactionFeed> feed;
39 std::weak_ptr<Subscriber> subscriptionContextWeakPtr;
40
41 TransactionSlot(TransactionFeed& feed, SubscriberSharedPtr const& connection)
42 : feed(feed), subscriptionContextWeakPtr(connection)
43 {
44 }
45
46 void
47 operator()(std::shared_ptr<AllVersionsMsgsType> const& allVersionMsgs) const;
48 };
49
50 util::Logger logger_{"Subscriptions"};
51
53 std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
54 std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
55 std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
56
58 accountSignal_;
60 bookSignal_;
62
63 // Signals for proposed tx subscribers
65 accountProposedSignal_;
67
68 std::unordered_set<SubscriberPtr> notified_; // Used by slots to prevent double notifications
69 // if tx contains multiple subscribed accounts
70
71public:
77 : strand_(executionCtx.makeStrand())
78 , subAllCount_(getSubscriptionsGaugeInt("tx"))
79 , subAccountCount_(getSubscriptionsGaugeInt("account"))
80 , subBookCount_(getSubscriptionsGaugeInt("book"))
81 {
82 }
83
88
93 void
94 sub(SubscriberSharedPtr const& subscriber);
95
102 void
103 sub(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber);
104
111 void
112 sub(ripple::Book const& book, SubscriberSharedPtr const& subscriber);
113
118 void
119 subProposed(SubscriberSharedPtr const& subscriber);
120
127 void
128 subProposed(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber);
129
134 void
135 unsub(SubscriberSharedPtr const& subscriber);
136
142 void
143 unsub(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber);
144
149 void
150 unsubProposed(SubscriberSharedPtr const& subscriber);
151
157 void
158 unsubProposed(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber);
159
165 void
166 unsub(ripple::Book const& book, SubscriberSharedPtr const& subscriber);
167
175 void
176 pub(data::TransactionAndMetadata const& txMeta,
177 ripple::LedgerHeader const& lgrInfo,
178 std::shared_ptr<data::BackendInterface const> const& backend,
179 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter,
180 uint32_t networkID);
181
185 std::uint64_t
186 transactionSubCount() const;
187
191 std::uint64_t
192 accountSubCount() const;
193
197 std::uint64_t
198 bookSubCount() const;
199
200private:
201 void
202 unsubInternal(SubscriberPtr subscriber);
203
204 void
205 unsubInternal(ripple::AccountID const& account, SubscriberPtr subscriber);
206
207 void
208 unsubProposedInternal(SubscriberPtr subscriber);
209
210 void
211 unsubProposedInternal(ripple::AccountID const& account, SubscriberPtr subscriber);
212
213 void
214 unsubInternal(ripple::Book const& book, SubscriberPtr subscriber);
215};
216} // namespace feed::impl
Class to manage a map of key and its associative signal.
Definition TrackableSignalMap.hpp:30
A thread-safe class to manage a signal and its tracking connections.
Definition TrackableSignal.hpp:26
void unsub(SubscriberSharedPtr const &subscriber)
Unsubscribe to the transaction feed.
Definition TransactionFeed.cpp:128
void sub(SubscriberSharedPtr const &subscriber)
Subscribe to the transaction feed.
Definition TransactionFeed.cpp:60
void unsubProposed(SubscriberSharedPtr const &subscriber)
Unsubscribe to the transaction feed for proposed transaction stream.
Definition TransactionFeed.cpp:140
std::uint64_t bookSubCount() const
Get the number of books subscribers.
Definition TransactionFeed.cpp:173
std::uint64_t transactionSubCount() const
Get the number of subscribers of the transaction feed.
Definition TransactionFeed.cpp:161
TransactionFeed(TransactionFeed &&)=delete
Move constructor is deleted because TransactionSlot takes TransactionFeed by reference.
std::uint64_t accountSubCount() const
Get the number of accounts subscribers.
Definition TransactionFeed.cpp:167
TransactionFeed(util::async::AnyExecutionContext &executionCtx)
Construct a new Transaction Feed object.
Definition TransactionFeed.hpp:76
void pub(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter, uint32_t networkID)
Publishes the transaction feed.
Definition TransactionFeed.cpp:179
void subProposed(SubscriberSharedPtr const &subscriber)
Subscribe to the transaction feed for proposed transaction stream.
Definition TransactionFeed.cpp:86
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
A type-erased execution context.
Definition AnyStrand.hpp:21
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:14
Represents a transaction and its metadata bundled together.
Definition Types.hpp:49