Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionManager.hpp
1#pragma once
2
3#include "data/AmendmentCenterInterface.hpp"
4#include "data/BackendInterface.hpp"
5#include "data/Types.hpp"
6#include "feed/SubscriptionManagerInterface.hpp"
7#include "feed/Types.hpp"
8#include "feed/impl/BookChangesFeed.hpp"
9#include "feed/impl/ForwardFeed.hpp"
10#include "feed/impl/LedgerFeed.hpp"
11#include "feed/impl/ProposedTransactionFeed.hpp"
12#include "feed/impl/TransactionFeed.hpp"
13#include "util/async/AnyExecutionContext.hpp"
14#include "util/async/context/BasicExecutionContext.hpp"
15#include "util/config/ConfigDefinition.hpp"
16#include "util/log/Logger.hpp"
17
18#include <boost/asio/executor_work_guard.hpp>
19#include <boost/asio/io_context.hpp>
20#include <boost/asio/spawn.hpp>
21#include <boost/json/object.hpp>
22#include <xrpl/protocol/AccountID.h>
23#include <xrpl/protocol/Book.h>
24#include <xrpl/protocol/Fees.h>
25#include <xrpl/protocol/LedgerHeader.h>
26
27#include <cstdint>
28#include <memory>
29#include <string>
30#include <utility>
31#include <vector>
32
38namespace feed {
39
45 std::shared_ptr<data::BackendInterface const> backend_;
46 std::shared_ptr<data::AmendmentCenterInterface const> amendmentCenter_;
48 impl::ForwardFeed manifestFeed_;
49 impl::ForwardFeed validationsFeed_;
50 impl::LedgerFeed ledgerFeed_;
51 impl::BookChangesFeed bookChangesFeed_;
52 impl::TransactionFeed transactionFeed_;
53 impl::ProposedTransactionFeed proposedTransactionFeed_;
54 uint32_t networkID_{0};
55
56public:
65 static std::shared_ptr<SubscriptionManager>
68 std::shared_ptr<data::BackendInterface const> const& backend,
69 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
70 )
71 {
72 auto const workersNum = config.get<uint64_t>("subscription_workers");
73
74 util::Logger const logger{"Subscriptions"};
75 LOG(logger.info()) << "Starting subscription manager with " << workersNum << " workers";
76
77 return std::make_shared<feed::SubscriptionManager>(
78 util::async::PoolExecutionContext(workersNum), backend, amendmentCenter
79 );
80 }
81
91 std::shared_ptr<data::BackendInterface const> const& backend,
92 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
93 )
94 : backend_(backend)
95 , amendmentCenter_(amendmentCenter)
96 , ctx_(std::move(executor))
97 , manifestFeed_(ctx_, "manifest")
98 , validationsFeed_(ctx_, "validations")
99 , ledgerFeed_(ctx_)
100 , bookChangesFeed_(ctx_)
101 , transactionFeed_(ctx_)
102 , proposedTransactionFeed_(ctx_)
103 {
104 }
105
111 {
112 stop();
113 }
114
118 void
119 stop() override
120 {
121 ctx_.stop();
122 ctx_.join();
123 }
124
129 void
130 subBookChanges(SubscriberSharedPtr const& subscriber) final;
131
136 void
137 unsubBookChanges(SubscriberSharedPtr const& subscriber) final;
138
144 void
146 ripple::LedgerHeader const& lgrInfo,
147 std::vector<data::TransactionAndMetadata> const& transactions
148 ) final;
149
154 void
155 subProposedTransactions(SubscriberSharedPtr const& subscriber) final;
156
161 void
162 unsubProposedTransactions(SubscriberSharedPtr const& subscriber) final;
163
170 void
172 ripple::AccountID const& account,
173 SubscriberSharedPtr const& subscriber
174 ) final;
175
181 void
183 ripple::AccountID const& account,
184 SubscriberSharedPtr const& subscriber
185 ) final;
186
191 void
192 forwardProposedTransaction(boost::json::object const& receivedTxJson) final;
193
200 boost::json::object
201 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber) final;
202
207 void
208 unsubLedger(SubscriberSharedPtr const& subscriber) final;
209
217 void
218 pubLedger(
219 ripple::LedgerHeader const& lgrInfo,
220 ripple::Fees const& fees,
221 std::string const& ledgerRange,
222 std::uint32_t txnCount
223 ) final;
224
229 void
230 subManifest(SubscriberSharedPtr const& subscriber) final;
231
236 void
237 unsubManifest(SubscriberSharedPtr const& subscriber) final;
238
243 void
244 forwardManifest(boost::json::object const& manifestJson) final;
245
250 void
251 subValidation(SubscriberSharedPtr const& subscriber) final;
252
257 void
258 unsubValidation(SubscriberSharedPtr const& subscriber) final;
259
264 void
265 forwardValidation(boost::json::object const& validationJson) final;
266
271 void
272 subTransactions(SubscriberSharedPtr const& subscriber) final;
273
278 void
279 unsubTransactions(SubscriberSharedPtr const& subscriber) final;
280
287 void
288 subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
289
295 void
296 unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
297
304 void
305 subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
306
312 void
313 unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
314
320 void
322 data::TransactionAndMetadata const& txMeta,
323 ripple::LedgerHeader const& lgrInfo
324 ) final;
325
331 boost::json::object
332 report() const final;
333
338 void
339 setNetworkID(uint32_t networkID) final;
340
346 uint32_t
347 getNetworkID() const final;
348};
349
350} // namespace feed
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:25
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:142
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:19
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:70
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:50
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:112
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:160
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:184
void setNetworkID(uint32_t networkID) final
Set the networkID.
Definition SubscriptionManager.cpp:215
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:199
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:124
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:190
uint32_t getNetworkID() const final
Get the networkID.
Definition SubscriptionManager.cpp:221
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:101
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:25
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:31
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:154
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:66
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:89
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:86
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:178
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:40
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:130
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:118
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:169
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:80
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:136
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:119
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:57
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:95
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:148
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:110
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:28
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:31
Definition TransactionFeed.hpp:30
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:85
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:14
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:458
Represents a transaction and its metadata bundled together.
Definition Types.hpp:49
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:22
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:13