22#include "data/AmendmentCenterInterface.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/Types.hpp"
25#include "feed/SubscriptionManagerInterface.hpp"
26#include "feed/Types.hpp"
27#include "feed/impl/BookChangesFeed.hpp"
28#include "feed/impl/ForwardFeed.hpp"
29#include "feed/impl/LedgerFeed.hpp"
30#include "feed/impl/ProposedTransactionFeed.hpp"
31#include "feed/impl/TransactionFeed.hpp"
32#include "util/async/AnyExecutionContext.hpp"
33#include "util/async/context/BasicExecutionContext.hpp"
34#include "util/config/ConfigDefinition.hpp"
35#include "util/log/Logger.hpp"
37#include <boost/asio/executor_work_guard.hpp>
38#include <boost/asio/io_context.hpp>
39#include <boost/asio/spawn.hpp>
40#include <boost/json/object.hpp>
41#include <xrpl/protocol/AccountID.h>
42#include <xrpl/protocol/Book.h>
43#include <xrpl/protocol/Fees.h>
44#include <xrpl/protocol/LedgerHeader.h>
64 std::shared_ptr<data::BackendInterface const> backend_;
65 std::shared_ptr<data::AmendmentCenterInterface const> amendmentCenter_;
73 uint32_t networkID_{0};
84 static std::shared_ptr<SubscriptionManager>
87 std::shared_ptr<data::BackendInterface const>
const& backend,
88 std::shared_ptr<data::AmendmentCenterInterface const>
const& amendmentCenter
91 auto const workersNum = config.
get<uint64_t>(
"subscription_workers");
94 LOG(logger.info()) <<
"Starting subscription manager with " << workersNum <<
" workers";
96 return std::make_shared<feed::SubscriptionManager>(
110 std::shared_ptr<data::BackendInterface const>
const& backend,
111 std::shared_ptr<data::AmendmentCenterInterface const>
const& amendmentCenter
114 , amendmentCenter_(amendmentCenter)
115 , ctx_(std::move(executor))
116 , manifestFeed_(ctx_,
"manifest")
117 , validationsFeed_(ctx_,
"validations")
119 , bookChangesFeed_(ctx_)
120 , transactionFeed_(ctx_)
121 , proposedTransactionFeed_(ctx_)
165 ripple::LedgerHeader
const& lgrInfo,
166 std::vector<data::TransactionAndMetadata>
const& transactions
191 ripple::AccountID
const& account,
192 SubscriberSharedPtr
const& subscriber
202 ripple::AccountID
const& account,
203 SubscriberSharedPtr
const& subscriber
220 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr
const& subscriber)
final;
227 unsubLedger(SubscriberSharedPtr
const& subscriber)
final;
238 ripple::LedgerHeader
const& lgrInfo,
239 ripple::Fees
const& fees,
240 std::string
const& ledgerRange,
241 std::uint32_t txnCount
249 subManifest(SubscriberSharedPtr
const& subscriber)
final;
307 subAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
315 unsubAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
324 subBook(ripple::Book
const& book, SubscriberSharedPtr
const& subscriber)
final;
332 unsubBook(ripple::Book
const& book, SubscriberSharedPtr
const& subscriber)
final;
342 ripple::LedgerHeader
const& lgrInfo
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:44
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:161
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:38
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:89
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:69
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:131
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:179
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:203
void setNetworkID(uint32_t networkID) final
Set the networkID.
Definition SubscriptionManager.cpp:234
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:218
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:143
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:209
uint32_t getNetworkID() const final
Get the networkID.
Definition SubscriptionManager.cpp:240
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:120
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:44
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:50
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:173
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:85
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:108
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:105
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:197
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:59
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:149
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:137
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:188
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:99
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:155
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:138
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:76
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:114
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:167
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:129
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:47
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:51
Definition TransactionFeed.hpp:50
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:104
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:33
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:477
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:41
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:32