22#include "data/BackendInterface.hpp"
23#include "data/Types.hpp"
24#include "feed/SubscriptionManagerInterface.hpp"
25#include "feed/Types.hpp"
26#include "feed/impl/BookChangesFeed.hpp"
27#include "feed/impl/ForwardFeed.hpp"
28#include "feed/impl/LedgerFeed.hpp"
29#include "feed/impl/ProposedTransactionFeed.hpp"
30#include "feed/impl/TransactionFeed.hpp"
31#include "util/async/AnyExecutionContext.hpp"
32#include "util/async/context/BasicExecutionContext.hpp"
33#include "util/log/Logger.hpp"
34#include "util/newconfig/ConfigDefinition.hpp"
36#include <boost/asio/executor_work_guard.hpp>
37#include <boost/asio/io_context.hpp>
38#include <boost/asio/spawn.hpp>
39#include <boost/json/object.hpp>
40#include <xrpl/protocol/AccountID.h>
41#include <xrpl/protocol/Book.h>
42#include <xrpl/protocol/Fees.h>
43#include <xrpl/protocol/LedgerHeader.h>
62 std::shared_ptr<data::BackendInterface const> backend_;
79 static std::shared_ptr<SubscriptionManager>
82 std::shared_ptr<data::BackendInterface const>
const& backend
85 auto const workersNum = config.
get<uint64_t>(
"subscription_workers");
88 LOG(logger.info()) <<
"Starting subscription manager with " << workersNum <<
" workers";
101 std::shared_ptr<data::BackendInterface const>
const& backend
104 , ctx_(std::move(executor))
105 , manifestFeed_(ctx_,
"manifest")
106 , validationsFeed_(ctx_,
"validations")
108 , bookChangesFeed_(ctx_)
109 , transactionFeed_(ctx_)
110 , proposedTransactionFeed_(ctx_)
152 pubBookChanges(ripple::LedgerHeader
const& lgrInfo, std::vector<data::TransactionAndMetadata>
const& transactions)
175 subProposedAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
183 unsubProposedAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
199 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr
const& subscriber)
final;
206 unsubLedger(SubscriberSharedPtr
const& subscriber)
final;
217 ripple::LedgerHeader
const& lgrInfo,
218 ripple::Fees
const& fees,
219 std::string
const& ledgerRange,
220 std::uint32_t txnCount
228 subManifest(SubscriberSharedPtr
const& subscriber)
final;
285 subAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
293 unsubAccount(ripple::AccountID
const& account, SubscriberSharedPtr
const& subscriber)
final;
301 subBook(ripple::Book
const& book, SubscriberSharedPtr
const& subscriber)
final;
309 unsubBook(ripple::Book
const& book, SubscriberSharedPtr
const& subscriber)
final;
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:44
A subscription manager is responsible for managing the subscriptions and publishing the feeds.
Definition SubscriptionManager.hpp:61
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:150
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:38
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:84
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:68
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:120
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:168
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:186
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:198
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:132
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:192
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:109
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:44
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:99
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:50
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:162
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:80
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:97
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:180
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:59
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:138
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:126
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:174
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:91
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:144
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:126
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:75
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:103
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:156
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:117
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:46
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:51
Definition TransactionFeed.hpp:49
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
void stop() const
Stop the execution context.
Definition AnyExecutionContext.hpp:255
void join() const
Join the execution context.
Definition AnyExecutionContext.hpp:264
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:33
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:40
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:32