3#include "data/AmendmentCenterInterface.hpp"
4#include "data/BackendInterface.hpp"
5#include "data/Types.hpp"
6#include "feed/SubscriptionManagerInterface.hpp"
7#include "feed/Types.hpp"
8#include "feed/impl/BookChangesFeed.hpp"
9#include "feed/impl/ForwardFeed.hpp"
10#include "feed/impl/LedgerFeed.hpp"
11#include "feed/impl/ProposedTransactionFeed.hpp"
12#include "feed/impl/TransactionFeed.hpp"
13#include "util/async/AnyExecutionContext.hpp"
14#include "util/async/context/BasicExecutionContext.hpp"
15#include "util/config/ConfigDefinition.hpp"
16#include "util/log/Logger.hpp"
18#include <boost/asio/executor_work_guard.hpp>
19#include <boost/asio/io_context.hpp>
20#include <boost/asio/spawn.hpp>
21#include <boost/json/object.hpp>
22#include <xrpl/protocol/AccountID.h>
23#include <xrpl/protocol/Book.h>
24#include <xrpl/protocol/Fees.h>
25#include <xrpl/protocol/LedgerHeader.h>
45 std::shared_ptr<data::BackendInterface const> backend_;
46 std::shared_ptr<data::AmendmentCenterInterface const> amendmentCenter_;
54 uint32_t networkID_{0};
65 static std::shared_ptr<SubscriptionManager>
68 std::shared_ptr<data::BackendInterface const>
const& backend,
69 std::shared_ptr<data::AmendmentCenterInterface const>
const& amendmentCenter
72 auto const workersNum = config.
get<uint64_t>(
"subscription_workers");
75 LOG(logger.info()) <<
"Starting subscription manager with " << workersNum <<
" workers";
77 return std::make_shared<feed::SubscriptionManager>(
91 std::shared_ptr<data::BackendInterface const>
const& backend,
92 std::shared_ptr<data::AmendmentCenterInterface const>
const& amendmentCenter
95 , amendmentCenter_(amendmentCenter)
96 , ctx_(std::move(executor))
97 , manifestFeed_(ctx_,
"manifest")
98 , validationsFeed_(ctx_,
"validations")
100 , bookChangesFeed_(ctx_)
101 , transactionFeed_(ctx_)
102 , proposedTransactionFeed_(ctx_)
146 ripple::LedgerHeader
const& lgrInfo,
147 std::vector<data::TransactionAndMetadata>
const& transactions
172 ripple::AccountID
const& account,
173 SubscriberSharedPtr
const& subscriber
183 ripple::AccountID
const& account,
184 SubscriberSharedPtr
const& subscriber
201 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr
const& subscriber)
final;
208 unsubLedger(SubscriberSharedPtr
const& subscriber)
final;
219 ripple::LedgerHeader
const& lgrInfo,
220 ripple::Fees
const& fees,
221 std::string
const& ledgerRange,
222 std::uint32_t txnCount
230 subManifest(SubscriberSharedPtr
const& subscriber)
final;
288 subAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
296 unsubAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
305 subBook(ripple::Book
const& book, SubscriberSharedPtr
const& subscriber)
final;
313 unsubBook(ripple::Book
const& book, SubscriberSharedPtr
const& subscriber)
final;
323 ripple::LedgerHeader
const& lgrInfo
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:25
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:142
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:19
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:70
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:50
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:112
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:160
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:184
void setNetworkID(uint32_t networkID) final
Set the networkID.
Definition SubscriptionManager.cpp:215
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:199
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:124
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:190
uint32_t getNetworkID() const final
Get the networkID.
Definition SubscriptionManager.cpp:221
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:101
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:25
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:31
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:154
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:66
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:89
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:86
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:178
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:40
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:130
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:118
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:169
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:80
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:136
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:119
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:57
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:95
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:148
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:110
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:28
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:31
Definition TransactionFeed.hpp:30
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:85
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:14
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:458
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:22
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:13