Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionManager.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/AmendmentCenterInterface.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/Types.hpp"
25#include "feed/SubscriptionManagerInterface.hpp"
26#include "feed/Types.hpp"
27#include "feed/impl/BookChangesFeed.hpp"
28#include "feed/impl/ForwardFeed.hpp"
29#include "feed/impl/LedgerFeed.hpp"
30#include "feed/impl/ProposedTransactionFeed.hpp"
31#include "feed/impl/TransactionFeed.hpp"
32#include "util/async/AnyExecutionContext.hpp"
33#include "util/async/context/BasicExecutionContext.hpp"
34#include "util/config/ConfigDefinition.hpp"
35#include "util/log/Logger.hpp"
36
37#include <boost/asio/executor_work_guard.hpp>
38#include <boost/asio/io_context.hpp>
39#include <boost/asio/spawn.hpp>
40#include <boost/json/object.hpp>
41#include <xrpl/protocol/AccountID.h>
42#include <xrpl/protocol/Book.h>
43#include <xrpl/protocol/Fees.h>
44#include <xrpl/protocol/LedgerHeader.h>
45
46#include <cstdint>
47#include <memory>
48#include <string>
49#include <utility>
50#include <vector>
51
57namespace feed {
58
63 std::shared_ptr<data::BackendInterface const> backend_;
64 std::shared_ptr<data::AmendmentCenterInterface const> amendmentCenter_;
66 impl::ForwardFeed manifestFeed_;
67 impl::ForwardFeed validationsFeed_;
68 impl::LedgerFeed ledgerFeed_;
69 impl::BookChangesFeed bookChangesFeed_;
70 impl::TransactionFeed transactionFeed_;
71 impl::ProposedTransactionFeed proposedTransactionFeed_;
72 uint32_t networkID_{0};
73
74public:
83 static std::shared_ptr<SubscriptionManager>
86 std::shared_ptr<data::BackendInterface const> const& backend,
87 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
88 )
89 {
90 auto const workersNum = config.get<uint64_t>("subscription_workers");
91
92 util::Logger const logger{"Subscriptions"};
93 LOG(logger.info()) << "Starting subscription manager with " << workersNum << " workers";
94
95 return std::make_shared<feed::SubscriptionManager>(
96 util::async::PoolExecutionContext(workersNum), backend, amendmentCenter
97 );
98 }
99
109 std::shared_ptr<data::BackendInterface const> const& backend,
110 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
111 )
112 : backend_(backend)
113 , amendmentCenter_(amendmentCenter)
114 , ctx_(std::move(executor))
115 , manifestFeed_(ctx_, "manifest")
116 , validationsFeed_(ctx_, "validations")
117 , ledgerFeed_(ctx_)
118 , bookChangesFeed_(ctx_)
119 , transactionFeed_(ctx_)
120 , proposedTransactionFeed_(ctx_)
121 {
122 }
123
128 {
129 stop();
130 }
131
135 void
136 stop() override
137 {
138 ctx_.stop();
139 ctx_.join();
140 }
141
146 void
147 subBookChanges(SubscriberSharedPtr const& subscriber) final;
148
153 void
154 unsubBookChanges(SubscriberSharedPtr const& subscriber) final;
155
161 void
162 pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
163 final;
164
169 void
170 subProposedTransactions(SubscriberSharedPtr const& subscriber) final;
171
176 void
177 unsubProposedTransactions(SubscriberSharedPtr const& subscriber) final;
178
184 void
185 subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
186
192 void
193 unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
194
199 void
200 forwardProposedTransaction(boost::json::object const& receivedTxJson) final;
201
208 boost::json::object
209 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber) final;
210
215 void
216 unsubLedger(SubscriberSharedPtr const& subscriber) final;
217
225 void
226 pubLedger(
227 ripple::LedgerHeader const& lgrInfo,
228 ripple::Fees const& fees,
229 std::string const& ledgerRange,
230 std::uint32_t txnCount
231 ) final;
232
237 void
238 subManifest(SubscriberSharedPtr const& subscriber) final;
239
244 void
245 unsubManifest(SubscriberSharedPtr const& subscriber) final;
246
251 void
252 forwardManifest(boost::json::object const& manifestJson) final;
253
258 void
259 subValidation(SubscriberSharedPtr const& subscriber) final;
260
265 void
266 unsubValidation(SubscriberSharedPtr const& subscriber) final;
267
272 void
273 forwardValidation(boost::json::object const& validationJson) final;
274
279 void
280 subTransactions(SubscriberSharedPtr const& subscriber) final;
281
286 void
287 unsubTransactions(SubscriberSharedPtr const& subscriber) final;
288
294 void
295 subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
296
302 void
303 unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
304
310 void
311 subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
312
318 void
319 unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
320
326 void
327 pubTransaction(data::TransactionAndMetadata const& txMeta, ripple::LedgerHeader const& lgrInfo) final;
328
334 boost::json::object
335 report() const final;
336
341 void
342 setNetworkID(uint32_t networkID) final;
343
349 uint32_t
350 getNetworkID() const final;
351};
352
353} // namespace feed
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:44
A subscription manager is responsible for managing the subscriptions and publishing the feeds.
Definition SubscriptionManager.hpp:62
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:150
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:38
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:84
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:68
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:120
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:168
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:186
void setNetworkID(uint32_t networkID) final
Set the networkID.
Definition SubscriptionManager.cpp:214
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:198
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:132
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:192
uint32_t getNetworkID() const final
Get the networkID.
Definition SubscriptionManager.cpp:220
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:109
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:44
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:50
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:162
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:84
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:107
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:97
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:180
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:59
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:138
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:126
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:174
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:91
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:144
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:136
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:75
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:103
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:156
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:127
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:46
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:51
Definition TransactionFeed.hpp:50
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
void stop() const
Stop the execution context.
Definition AnyExecutionContext.hpp:255
void join() const
Join the execution context.
Definition AnyExecutionContext.hpp:264
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:33
Represents a transaction and its metadata bundled together.
Definition Types.hpp:68
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:40
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:32