Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionManager.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/AmendmentCenterInterface.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/Types.hpp"
25#include "feed/SubscriptionManagerInterface.hpp"
26#include "feed/Types.hpp"
27#include "feed/impl/BookChangesFeed.hpp"
28#include "feed/impl/ForwardFeed.hpp"
29#include "feed/impl/LedgerFeed.hpp"
30#include "feed/impl/ProposedTransactionFeed.hpp"
31#include "feed/impl/TransactionFeed.hpp"
32#include "util/async/AnyExecutionContext.hpp"
33#include "util/async/context/BasicExecutionContext.hpp"
34#include "util/config/ConfigDefinition.hpp"
35#include "util/log/Logger.hpp"
36
37#include <boost/asio/executor_work_guard.hpp>
38#include <boost/asio/io_context.hpp>
39#include <boost/asio/spawn.hpp>
40#include <boost/json/object.hpp>
41#include <xrpl/protocol/AccountID.h>
42#include <xrpl/protocol/Book.h>
43#include <xrpl/protocol/Fees.h>
44#include <xrpl/protocol/LedgerHeader.h>
45
46#include <cstdint>
47#include <memory>
48#include <string>
49#include <utility>
50#include <vector>
51
57namespace feed {
58
64 std::shared_ptr<data::BackendInterface const> backend_;
65 std::shared_ptr<data::AmendmentCenterInterface const> amendmentCenter_;
67 impl::ForwardFeed manifestFeed_;
68 impl::ForwardFeed validationsFeed_;
69 impl::LedgerFeed ledgerFeed_;
70 impl::BookChangesFeed bookChangesFeed_;
71 impl::TransactionFeed transactionFeed_;
72 impl::ProposedTransactionFeed proposedTransactionFeed_;
73 uint32_t networkID_{0};
74
75public:
84 static std::shared_ptr<SubscriptionManager>
87 std::shared_ptr<data::BackendInterface const> const& backend,
88 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
89 )
90 {
91 auto const workersNum = config.get<uint64_t>("subscription_workers");
92
93 util::Logger const logger{"Subscriptions"};
94 LOG(logger.info()) << "Starting subscription manager with " << workersNum << " workers";
95
96 return std::make_shared<feed::SubscriptionManager>(
97 util::async::PoolExecutionContext(workersNum), backend, amendmentCenter
98 );
99 }
100
110 std::shared_ptr<data::BackendInterface const> const& backend,
111 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
112 )
113 : backend_(backend)
114 , amendmentCenter_(amendmentCenter)
115 , ctx_(std::move(executor))
116 , manifestFeed_(ctx_, "manifest")
117 , validationsFeed_(ctx_, "validations")
118 , ledgerFeed_(ctx_)
119 , bookChangesFeed_(ctx_)
120 , transactionFeed_(ctx_)
121 , proposedTransactionFeed_(ctx_)
122 {
123 }
124
130 {
131 stop();
132 }
133
137 void
138 stop() override
139 {
140 ctx_.stop();
141 ctx_.join();
142 }
143
148 void
149 subBookChanges(SubscriberSharedPtr const& subscriber) final;
150
155 void
156 unsubBookChanges(SubscriberSharedPtr const& subscriber) final;
157
163 void
165 ripple::LedgerHeader const& lgrInfo,
166 std::vector<data::TransactionAndMetadata> const& transactions
167 ) final;
168
173 void
174 subProposedTransactions(SubscriberSharedPtr const& subscriber) final;
175
180 void
181 unsubProposedTransactions(SubscriberSharedPtr const& subscriber) final;
182
189 void
191 ripple::AccountID const& account,
192 SubscriberSharedPtr const& subscriber
193 ) final;
194
200 void
202 ripple::AccountID const& account,
203 SubscriberSharedPtr const& subscriber
204 ) final;
205
210 void
211 forwardProposedTransaction(boost::json::object const& receivedTxJson) final;
212
219 boost::json::object
220 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber) final;
221
226 void
227 unsubLedger(SubscriberSharedPtr const& subscriber) final;
228
236 void
237 pubLedger(
238 ripple::LedgerHeader const& lgrInfo,
239 ripple::Fees const& fees,
240 std::string const& ledgerRange,
241 std::uint32_t txnCount
242 ) final;
243
248 void
249 subManifest(SubscriberSharedPtr const& subscriber) final;
250
255 void
256 unsubManifest(SubscriberSharedPtr const& subscriber) final;
257
262 void
263 forwardManifest(boost::json::object const& manifestJson) final;
264
269 void
270 subValidation(SubscriberSharedPtr const& subscriber) final;
271
276 void
277 unsubValidation(SubscriberSharedPtr const& subscriber) final;
278
283 void
284 forwardValidation(boost::json::object const& validationJson) final;
285
290 void
291 subTransactions(SubscriberSharedPtr const& subscriber) final;
292
297 void
298 unsubTransactions(SubscriberSharedPtr const& subscriber) final;
299
306 void
307 subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
308
314 void
315 unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
316
323 void
324 subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
325
331 void
332 unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
333
339 void
341 data::TransactionAndMetadata const& txMeta,
342 ripple::LedgerHeader const& lgrInfo
343 ) final;
344
350 boost::json::object
351 report() const final;
352
357 void
358 setNetworkID(uint32_t networkID) final;
359
365 uint32_t
366 getNetworkID() const final;
367};
368
369} // namespace feed
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:44
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:161
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:38
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:89
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:69
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:131
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:179
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:203
void setNetworkID(uint32_t networkID) final
Set the networkID.
Definition SubscriptionManager.cpp:234
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:218
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:143
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:209
uint32_t getNetworkID() const final
Get the networkID.
Definition SubscriptionManager.cpp:240
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:120
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:44
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:50
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:173
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:85
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:108
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:105
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:197
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:59
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:149
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:137
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:188
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:99
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:155
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:138
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:76
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:114
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:167
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:129
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:47
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:51
Definition TransactionFeed.hpp:50
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:96
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:50
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:104
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:33
BasicExecutionContext< impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy > PoolExecutionContext
A asio::thread_pool-based execution context.
Definition BasicExecutionContext.hpp:477
Represents a transaction and its metadata bundled together.
Definition Types.hpp:68
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:41
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:32