Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionManager.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/AmendmentCenterInterface.hpp"
23#include "data/BackendInterface.hpp"
24#include "data/Types.hpp"
25#include "feed/SubscriptionManagerInterface.hpp"
26#include "feed/Types.hpp"
27#include "feed/impl/BookChangesFeed.hpp"
28#include "feed/impl/ForwardFeed.hpp"
29#include "feed/impl/LedgerFeed.hpp"
30#include "feed/impl/ProposedTransactionFeed.hpp"
31#include "feed/impl/TransactionFeed.hpp"
32#include "util/async/AnyExecutionContext.hpp"
33#include "util/async/context/BasicExecutionContext.hpp"
34#include "util/config/ConfigDefinition.hpp"
35#include "util/log/Logger.hpp"
36
37#include <boost/asio/executor_work_guard.hpp>
38#include <boost/asio/io_context.hpp>
39#include <boost/asio/spawn.hpp>
40#include <boost/json/object.hpp>
41#include <xrpl/protocol/AccountID.h>
42#include <xrpl/protocol/Book.h>
43#include <xrpl/protocol/Fees.h>
44#include <xrpl/protocol/LedgerHeader.h>
45
46#include <cstdint>
47#include <memory>
48#include <string>
49#include <utility>
50#include <vector>
51
57namespace feed {
58
63 std::shared_ptr<data::BackendInterface const> backend_;
64 std::shared_ptr<data::AmendmentCenterInterface const> amendmentCenter_;
66 impl::ForwardFeed manifestFeed_;
67 impl::ForwardFeed validationsFeed_;
68 impl::LedgerFeed ledgerFeed_;
69 impl::BookChangesFeed bookChangesFeed_;
70 impl::TransactionFeed transactionFeed_;
71 impl::ProposedTransactionFeed proposedTransactionFeed_;
72 uint32_t networkID_{0};
73
74public:
83 static std::shared_ptr<SubscriptionManager>
86 std::shared_ptr<data::BackendInterface const> const& backend,
87 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
88 )
89 {
90 auto const workersNum = config.get<uint64_t>("subscription_workers");
91
92 util::Logger const logger{"Subscriptions"};
93 LOG(logger.info()) << "Starting subscription manager with " << workersNum << " workers";
94
95 return std::make_shared<feed::SubscriptionManager>(
96 util::async::PoolExecutionContext(workersNum), backend, amendmentCenter
97 );
98 }
99
109 std::shared_ptr<data::BackendInterface const> const& backend,
110 std::shared_ptr<data::AmendmentCenterInterface const> const& amendmentCenter
111 )
112 : backend_(backend)
113 , amendmentCenter_(amendmentCenter)
114 , ctx_(std::move(executor))
115 , manifestFeed_(ctx_, "manifest")
116 , validationsFeed_(ctx_, "validations")
117 , ledgerFeed_(ctx_)
118 , bookChangesFeed_(ctx_)
119 , transactionFeed_(ctx_)
120 , proposedTransactionFeed_(ctx_)
121 {
122 }
123
128 {
129 stop();
130 }
131
135 void
136 stop() override
137 {
138 ctx_.stop();
139 ctx_.join();
140 }
141
146 void
147 subBookChanges(SubscriberSharedPtr const& subscriber) final;
148
153 void
154 unsubBookChanges(SubscriberSharedPtr const& subscriber) final;
155
161 void
163 ripple::LedgerHeader const& lgrInfo,
164 std::vector<data::TransactionAndMetadata> const& transactions
165 ) final;
166
171 void
172 subProposedTransactions(SubscriberSharedPtr const& subscriber) final;
173
178 void
179 unsubProposedTransactions(SubscriberSharedPtr const& subscriber) final;
180
186 void
187 subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
188
194 void
195 unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
196
201 void
202 forwardProposedTransaction(boost::json::object const& receivedTxJson) final;
203
210 boost::json::object
211 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber) final;
212
217 void
218 unsubLedger(SubscriberSharedPtr const& subscriber) final;
219
227 void
228 pubLedger(
229 ripple::LedgerHeader const& lgrInfo,
230 ripple::Fees const& fees,
231 std::string const& ledgerRange,
232 std::uint32_t txnCount
233 ) final;
234
239 void
240 subManifest(SubscriberSharedPtr const& subscriber) final;
241
246 void
247 unsubManifest(SubscriberSharedPtr const& subscriber) final;
248
253 void
254 forwardManifest(boost::json::object const& manifestJson) final;
255
260 void
261 subValidation(SubscriberSharedPtr const& subscriber) final;
262
267 void
268 unsubValidation(SubscriberSharedPtr const& subscriber) final;
269
274 void
275 forwardValidation(boost::json::object const& validationJson) final;
276
281 void
282 subTransactions(SubscriberSharedPtr const& subscriber) final;
283
288 void
289 unsubTransactions(SubscriberSharedPtr const& subscriber) final;
290
296 void
297 subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
298
304 void
305 unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
306
312 void
313 subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
314
320 void
321 unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
322
328 void
329 pubTransaction(data::TransactionAndMetadata const& txMeta, ripple::LedgerHeader const& lgrInfo) final;
330
336 boost::json::object
337 report() const final;
338
343 void
344 setNetworkID(uint32_t networkID) final;
345
351 uint32_t
352 getNetworkID() const final;
353};
354
355} // namespace feed
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:44
A subscription manager is responsible for managing the subscriptions and publishing the feeds.
Definition SubscriptionManager.hpp:62
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:150
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:38
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:84
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:68
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:120
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:168
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:186
void setNetworkID(uint32_t networkID) final
Set the networkID.
Definition SubscriptionManager.cpp:214
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:198
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:132
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:192
uint32_t getNetworkID() const final
Get the networkID.
Definition SubscriptionManager.cpp:220
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:109
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:44
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:50
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:162
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:84
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend, std::shared_ptr< data::AmendmentCenterInterface const > const &amendmentCenter)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:107
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:97
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:180
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:59
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:138
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:126
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:174
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:91
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:144
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:136
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:75
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:103
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:156
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:127
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:46
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:51
Definition TransactionFeed.hpp:50
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
void stop() const
Stop the execution context.
Definition AnyExecutionContext.hpp:250
void join() const
Join the execution context.
Definition AnyExecutionContext.hpp:259
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:33
Represents a transaction and its metadata bundled together.
Definition Types.hpp:68
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:40
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:32