Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
SubscriptionManager.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "data/Types.hpp"
24#include "feed/SubscriptionManagerInterface.hpp"
25#include "feed/Types.hpp"
26#include "feed/impl/BookChangesFeed.hpp"
27#include "feed/impl/ForwardFeed.hpp"
28#include "feed/impl/LedgerFeed.hpp"
29#include "feed/impl/ProposedTransactionFeed.hpp"
30#include "feed/impl/TransactionFeed.hpp"
31#include "util/async/AnyExecutionContext.hpp"
32#include "util/async/context/BasicExecutionContext.hpp"
33#include "util/log/Logger.hpp"
34#include "util/newconfig/ConfigDefinition.hpp"
35
36#include <boost/asio/executor_work_guard.hpp>
37#include <boost/asio/io_context.hpp>
38#include <boost/asio/spawn.hpp>
39#include <boost/json/object.hpp>
40#include <xrpl/protocol/AccountID.h>
41#include <xrpl/protocol/Book.h>
42#include <xrpl/protocol/Fees.h>
43#include <xrpl/protocol/LedgerHeader.h>
44
45#include <cstdint>
46#include <memory>
47#include <string>
48#include <utility>
49#include <vector>
50
56namespace feed {
57
62 std::shared_ptr<data::BackendInterface const> backend_;
64 impl::ForwardFeed manifestFeed_;
65 impl::ForwardFeed validationsFeed_;
66 impl::LedgerFeed ledgerFeed_;
67 impl::BookChangesFeed bookChangesFeed_;
68 impl::TransactionFeed transactionFeed_;
69 impl::ProposedTransactionFeed proposedTransactionFeed_;
70
71public:
79 static std::shared_ptr<SubscriptionManager>
82 std::shared_ptr<data::BackendInterface const> const& backend
83 )
84 {
85 auto const workersNum = config.get<uint64_t>("subscription_workers");
86
87 util::Logger const logger{"Subscriptions"};
88 LOG(logger.info()) << "Starting subscription manager with " << workersNum << " workers";
89
90 return std::make_shared<feed::SubscriptionManager>(util::async::PoolExecutionContext(workersNum), backend);
91 }
92
101 std::shared_ptr<data::BackendInterface const> const& backend
102 )
103 : backend_(backend)
104 , ctx_(std::move(executor))
105 , manifestFeed_(ctx_, "manifest")
106 , validationsFeed_(ctx_, "validations")
107 , ledgerFeed_(ctx_)
108 , bookChangesFeed_(ctx_)
109 , transactionFeed_(ctx_)
110 , proposedTransactionFeed_(ctx_)
111 {
112 }
113
118 {
119 stop();
120 }
121
125 void
126 stop() override
127 {
128 ctx_.stop();
129 ctx_.join();
130 }
131
136 void
137 subBookChanges(SubscriberSharedPtr const& subscriber) final;
138
143 void
144 unsubBookChanges(SubscriberSharedPtr const& subscriber) final;
145
151 void
152 pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
153 final;
154
159 void
160 subProposedTransactions(SubscriberSharedPtr const& subscriber) final;
161
166 void
167 unsubProposedTransactions(SubscriberSharedPtr const& subscriber) final;
168
174 void
175 subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
176
182 void
183 unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
184
189 void
190 forwardProposedTransaction(boost::json::object const& receivedTxJson) final;
191
198 boost::json::object
199 subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber) final;
200
205 void
206 unsubLedger(SubscriberSharedPtr const& subscriber) final;
207
215 void
216 pubLedger(
217 ripple::LedgerHeader const& lgrInfo,
218 ripple::Fees const& fees,
219 std::string const& ledgerRange,
220 std::uint32_t txnCount
221 ) final;
222
227 void
228 subManifest(SubscriberSharedPtr const& subscriber) final;
229
234 void
235 unsubManifest(SubscriberSharedPtr const& subscriber) final;
236
241 void
242 forwardManifest(boost::json::object const& manifestJson) final;
243
248 void
249 subValidation(SubscriberSharedPtr const& subscriber) final;
250
255 void
256 unsubValidation(SubscriberSharedPtr const& subscriber) final;
257
262 void
263 forwardValidation(boost::json::object const& validationJson) final;
264
269 void
270 subTransactions(SubscriberSharedPtr const& subscriber) final;
271
276 void
277 unsubTransactions(SubscriberSharedPtr const& subscriber) final;
278
284 void
285 subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
286
292 void
293 unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final;
294
300 void
301 subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
302
308 void
309 unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final;
310
316 void
317 pubTransaction(data::TransactionAndMetadata const& txMeta, ripple::LedgerHeader const& lgrInfo) final;
318
324 boost::json::object
325 report() const final;
326};
327
328} // namespace feed
Interface of subscription manager. A subscription manager is responsible for managing the subscriptio...
Definition SubscriptionManagerInterface.hpp:44
A subscription manager is responsible for managing the subscriptions and publishing the feeds.
Definition SubscriptionManager.hpp:61
void forwardValidation(boost::json::object const &validationJson) final
Forward the validation feed.
Definition SubscriptionManager.cpp:150
void subBookChanges(SubscriberSharedPtr const &subscriber) final
Subscribe to the book changes feed.
Definition SubscriptionManager.cpp:38
void unsubProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed for particular account.
Definition SubscriptionManager.cpp:84
void unsubProposedTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:68
void subManifest(SubscriberSharedPtr const &subscriber) final
Subscribe to the manifest feed.
Definition SubscriptionManager.cpp:120
void subAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive the feed when particular account is affected.
Definition SubscriptionManager.cpp:168
void unsubBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular order book.
Definition SubscriptionManager.cpp:186
boost::json::object report() const final
Get the number of subscribers.
Definition SubscriptionManager.cpp:198
void forwardManifest(boost::json::object const &manifestJson) final
Forward the manifest feed.
Definition SubscriptionManager.cpp:132
void pubTransaction(data::TransactionAndMetadata const &txMeta, ripple::LedgerHeader const &lgrInfo) final
Forward the transactions feed.
Definition SubscriptionManager.cpp:192
void pubLedger(ripple::LedgerHeader const &lgrInfo, ripple::Fees const &fees, std::string const &ledgerRange, std::uint32_t txnCount) final
Publish the ledger feed.
Definition SubscriptionManager.cpp:109
void unsubBookChanges(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the book changes feed.
Definition SubscriptionManager.cpp:44
SubscriptionManager(util::async::AnyExecutionContext &&executor, std::shared_ptr< data::BackendInterface const > const &backend)
Construct a new Subscription Manager object.
Definition SubscriptionManager.hpp:99
void pubBookChanges(ripple::LedgerHeader const &lgrInfo, std::vector< data::TransactionAndMetadata > const &transactions) final
Publish the book changes feed.
Definition SubscriptionManager.cpp:50
void unsubTransactions(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed.
Definition SubscriptionManager.cpp:162
static std::shared_ptr< SubscriptionManager > makeSubscriptionManager(util::config::ClioConfigDefinition const &config, std::shared_ptr< data::BackendInterface const > const &backend)
Factory function to create a new SubscriptionManager with a PoolExecutionContext.
Definition SubscriptionManager.hpp:80
boost::json::object subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const &subscriber) final
Subscribe to the ledger feed.
Definition SubscriptionManager.cpp:97
void subBook(ripple::Book const &book, SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed, only receive feed when particular order book is affected.
Definition SubscriptionManager.cpp:180
void subProposedTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed.
Definition SubscriptionManager.cpp:59
void subValidation(SubscriberSharedPtr const &subscriber) final
Subscribe to the validation feed.
Definition SubscriptionManager.cpp:138
void unsubManifest(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the manifest feed.
Definition SubscriptionManager.cpp:126
void unsubAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Unsubscribe to the transactions feed for particular account.
Definition SubscriptionManager.cpp:174
void forwardProposedTransaction(boost::json::object const &receivedTxJson) final
Forward the proposed transactions feed.
Definition SubscriptionManager.cpp:91
void unsubValidation(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the validation feed.
Definition SubscriptionManager.cpp:144
void stop() override
Stop the SubscriptionManager and wait for all jobs to finish.
Definition SubscriptionManager.hpp:126
void subProposedAccount(ripple::AccountID const &account, SubscriberSharedPtr const &subscriber) final
Subscribe to the proposed transactions feed, only receive the feed when particular account is affecte...
Definition SubscriptionManager.cpp:75
void unsubLedger(SubscriberSharedPtr const &subscriber) final
Unsubscribe to the ledger feed.
Definition SubscriptionManager.cpp:103
void subTransactions(SubscriberSharedPtr const &subscriber) final
Subscribe to the transactions feed.
Definition SubscriptionManager.cpp:156
~SubscriptionManager() override
Destructor of the SubscriptionManager object. It will block until all running jobs finished.
Definition SubscriptionManager.hpp:117
Feed that publishes the ledger info. Example : {'type': 'ledgerClosed', 'ledger_index': 2647935,...
Definition LedgerFeed.hpp:46
Feed that publishes the Proposed Transactions.
Definition ProposedTransactionFeed.hpp:51
Definition TransactionFeed.hpp:49
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
A type-erased execution context.
Definition AnyExecutionContext.hpp:41
void stop() const
Stop the execution context.
Definition AnyExecutionContext.hpp:255
void join() const
Join the execution context.
Definition AnyExecutionContext.hpp:264
A highly configurable execution context.
Definition BasicExecutionContext.hpp:132
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
This namespace implements everything related to subscriptions.
Definition BookChangesFeed.hpp:33
Represents a transaction and its metadata bundled together.
Definition Types.hpp:68
Feed that publishes book changes. This feed will be published every ledger, even if there are no chan...
Definition BookChangesFeed.hpp:40
Feed that publishes the json object as it is.
Definition ForwardFeed.hpp:32