xrpld
Loading...
Searching...
No Matches
Subscribe.cpp
1#include <xrpld/app/ledger/LedgerMaster.h>
2#include <xrpld/app/main/Application.h>
3#include <xrpld/rpc/Context.h>
4#include <xrpld/rpc/RPCSub.h>
5#include <xrpld/rpc/Role.h>
6#include <xrpld/rpc/detail/RPCHelpers.h>
7#include <xrpld/rpc/detail/Tuning.h>
8
9#include <xrpl/basics/Log.h>
10#include <xrpl/basics/base_uint.h>
11#include <xrpl/json/json_value.h>
12#include <xrpl/ledger/ReadView.h>
13#include <xrpl/protocol/AccountID.h>
14#include <xrpl/protocol/Book.h>
15#include <xrpl/protocol/ErrorCodes.h>
16#include <xrpl/protocol/RPCErr.h>
17#include <xrpl/protocol/jss.h>
18#include <xrpl/resource/Fees.h>
19#include <xrpl/server/InfoSub.h>
20#include <xrpl/server/NetworkOPs.h>
21
22#include <memory>
23#include <optional>
24#include <stdexcept>
25#include <string>
26
27namespace xrpl {
28
29json::Value
31{
32 InfoSub::pointer ispSub;
34
35 if (!context.infoSub && !context.params.isMember(jss::url))
36 {
37 // Must be a JSON-RPC call.
38 JLOG(context.j.info()) << "doSubscribe: RPC subscribe requires a url";
40 }
41
42 if (context.params.isMember(jss::url))
43 {
44 if (context.role != Role::ADMIN)
46
47 std::string const strUrl = context.params[jss::url].asString();
48 std::string strUsername = context.params.isMember(jss::url_username)
49 ? context.params[jss::url_username].asString()
50 : "";
51 std::string strPassword = context.params.isMember(jss::url_password)
52 ? context.params[jss::url_password].asString()
53 : "";
54
55 // DEPRECATED
56 if (context.params.isMember(jss::username))
57 strUsername = context.params[jss::username].asString();
58
59 // DEPRECATED
60 if (context.params.isMember(jss::password))
61 strPassword = context.params[jss::password].asString();
62
63 ispSub = context.netOps.findRpcSub(strUrl);
64 if (!ispSub)
65 {
66 JLOG(context.j.debug()) << "doSubscribe: building: " << strUrl;
67 try
68 {
69 auto rspSub = makeRPCSub(
70 context.app.getOPs(),
71 context.app.getIOContext(),
72 context.app.getJobQueue(),
73 strUrl,
74 strUsername,
75 strPassword,
76 context.app);
77 ispSub =
79 }
80 catch (std::runtime_error const& ex)
81 {
82 return RPC::makeParamError(ex.what());
83 }
84 }
85 else
86 {
87 JLOG(context.j.trace()) << "doSubscribe: reusing: " << strUrl;
88
89 if (auto rpcSub = std::dynamic_pointer_cast<RPCSub>(ispSub))
90 {
91 // Why do we need to check isMember against jss::username and
92 // jss::password here instead of just setting the username and
93 // the password? What about url_username and url_password?
94 if (context.params.isMember(jss::username))
95 rpcSub->setUsername(strUsername);
96
97 if (context.params.isMember(jss::password))
98 rpcSub->setPassword(strPassword);
99 }
100 }
101 }
102 else
103 {
104 ispSub = context.infoSub;
105 }
106 ispSub->setApiVersion(context.apiVersion);
107
108 if (context.params.isMember(jss::streams))
109 {
110 if (!context.params[jss::streams].isArray())
111 {
112 JLOG(context.j.info()) << "doSubscribe: streams requires an array.";
114 }
115
116 for (auto const& it : context.params[jss::streams])
117 {
118 if (!it.isString())
120
121 std::string const streamName = it.asString();
122 if (streamName == "server")
123 {
124 context.netOps.subServer(ispSub, jvResult, context.role == Role::ADMIN);
125 }
126 else if (streamName == "ledger")
127 {
128 context.netOps.subLedger(ispSub, jvResult);
129 }
130 else if (streamName == "book_changes")
131 {
132 context.netOps.subBookChanges(ispSub);
133 }
134 else if (streamName == "manifests")
135 {
136 context.netOps.subManifests(ispSub);
137 }
138 else if (streamName == "transactions")
139 {
140 context.netOps.subTransactions(ispSub);
141 }
142 else if (
143 streamName == "transactions_proposed" ||
144 streamName == "rt_transactions") // DEPRECATED
145 {
146 context.netOps.subRTTransactions(ispSub);
147 }
148 else if (streamName == "validations")
149 {
150 context.netOps.subValidations(ispSub);
151 }
152 else if (streamName == "peer_status")
153 {
154 if (context.role != Role::ADMIN)
156 context.netOps.subPeerStatus(ispSub);
157 }
158 else if (streamName == "consensus")
159 {
160 context.netOps.subConsensus(ispSub);
161 }
162 else
163 {
165 }
166 }
167 }
168
169 auto accountsProposed = context.params.isMember(jss::accounts_proposed)
170 ? jss::accounts_proposed
171 : jss::rt_accounts; // DEPRECATED
172 if (context.params.isMember(accountsProposed))
173 {
174 if (!context.params[accountsProposed].isArray())
176
177 auto ids = RPC::parseAccountIds(context.params[accountsProposed]);
178 if (ids.empty())
180 context.netOps.subAccount(ispSub, ids, true);
181 }
182
183 if (context.params.isMember(jss::accounts))
184 {
185 if (!context.params[jss::accounts].isArray())
187
188 auto ids = RPC::parseAccountIds(context.params[jss::accounts]);
189 if (ids.empty())
191 context.netOps.subAccount(ispSub, ids, false);
192 JLOG(context.j.debug()) << "doSubscribe: accounts: " << ids.size();
193 }
194
195 if (context.params.isMember(jss::account_history_tx_stream))
196 {
197 if (!context.app.config().useTxTables())
198 return rpcError(RpcNotEnabled);
199
201 auto const& req = context.params[jss::account_history_tx_stream];
202 if (!req.isMember(jss::account) || !req[jss::account].isString())
204
205 auto const id = parseBase58<AccountID>(req[jss::account].asString());
206 if (!id)
208
209 if (auto result = context.netOps.subAccountHistory(ispSub, *id); result != RpcSuccess)
210 {
211 return rpcError(result);
212 }
213
214 jvResult[jss::warning] =
215 "account_history_tx_stream is an experimental feature and likely "
216 "to be removed in the future";
217 JLOG(context.j.debug()) << "doSubscribe: account_history_tx_stream: " << toBase58(*id);
218 }
219
220 if (context.params.isMember(jss::books))
221 {
222 if (!context.params[jss::books].isArray())
224
225 for (auto& j : context.params[jss::books])
226 {
227 if (!j.isObject() || !j.isMember(jss::taker_pays) || !j.isMember(jss::taker_gets) ||
228 !j[jss::taker_pays].isObjectOrNull() || !j[jss::taker_gets].isObjectOrNull())
230
231 Book book;
232
233 if (auto const err = RPC::parseSubUnsubJson(book.in, j, jss::taker_pays, context.j);
234 err != RpcSuccess)
235 return rpcError(err);
236
237 if (auto const err = RPC::parseSubUnsubJson(book.out, j, jss::taker_gets, context.j);
238 err != RpcSuccess)
239 return rpcError(err);
240
241 if (book.in == book.out)
242 {
243 JLOG(context.j.info()) << "taker_gets same as taker_pays.";
244 return rpcError(RpcBadMarket);
245 }
246
248
249 if (j.isMember(jss::taker))
250 {
251 if (!j[jss::taker].isString())
253 takerID = parseBase58<AccountID>(j[jss::taker].asString());
254 if (!takerID)
256 }
257
258 if (j.isMember(jss::domain))
259 {
260 uint256 domain;
261 if (!j[jss::domain].isString() || !domain.parseHex(j[jss::domain].asString()))
262 {
264 }
265
266 book.domain = domain;
267 }
268
269 if (!isConsistent(book))
270 {
271 JLOG(context.j.warn()) << "Bad market: " << book;
272 return rpcError(RpcBadMarket);
273 }
274
275 context.netOps.subBook(ispSub, book);
276
277 // both_sides is deprecated.
278 bool const both = (j.isMember(jss::both) && j[jss::both].asBool()) ||
279 (j.isMember(jss::both_sides) && j[jss::both_sides].asBool());
280
281 if (both)
282 context.netOps.subBook(ispSub, reversed(book));
283
284 // state_now is deprecated.
285 if ((j.isMember(jss::snapshot) && j[jss::snapshot].asBool()) ||
286 (j.isMember(jss::state_now) && j[jss::state_now].asBool()))
287 {
291 if (lpLedger)
292 {
295
296 auto add = [&](json::StaticString field) {
297 context.netOps.getBookPage(
298 lpLedger,
299 field == jss::asks ? reversed(book) : book,
300 takerID ? *takerID : noAccount(),
301 false,
303 jvMarker,
304 jvOffers);
305
306 if (jvResult.isMember(field))
307 {
308 json::Value& results(jvResult[field]);
309 for (auto const& e : jvOffers[jss::offers])
310 results.append(e);
311 }
312 else
313 {
314 jvResult[field] = jvOffers[jss::offers];
315 }
316 };
317
318 if (both)
319 {
320 add(jss::bids);
321 add(jss::asks);
322 }
323 else
324 {
325 add(jss::offers);
326 }
327 }
328 }
329 }
330 }
331
332 return jvResult;
333}
334
335} // namespace xrpl
Stream debug() const
Definition Journal.h:297
Stream info() const
Definition Journal.h:303
Stream trace() const
Severity stream access functions.
Definition Journal.h:291
Stream warn() const
Definition Journal.h:309
Lightweight wrapper to tag static string.
Definition json_value.h:44
Represents a JSON value.
Definition json_value.h:130
bool isArray() const
Value & append(Value const &value)
Append value to array at the end.
std::string asString() const
Returns the unquoted string value.
bool isMember(char const *key) const
Return true if the object has a member named key.
virtual Config & config()=0
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition base_uint.h:507
Specifies an order book.
Definition Book.h:16
bool useTxTables() const
Definition Config.h:322
virtual ErrorCodeI subAccountHistory(ref ispListener, AccountID const &account)=0
subscribe an account's new transactions and retrieve the account's historical transactions
virtual bool subTransactions(ref ispListener)=0
virtual bool subPeerStatus(ref ispListener)=0
virtual bool subServer(ref ispListener, json::Value &jvResult, bool admin)=0
virtual bool subConsensus(ref ispListener)=0
virtual void subAccount(ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool realTime)=0
virtual bool subBook(ref ispListener, Book const &)=0
virtual bool subValidations(ref ispListener)=0
virtual bool subRTTransactions(ref ispListener)=0
virtual bool subLedger(ref ispListener, json::Value &jvResult)=0
virtual bool subBookChanges(ref ispListener)=0
virtual pointer addRpcSub(std::string const &strUrl, ref rspEntry)=0
virtual bool subManifests(ref ispListener)=0
virtual pointer findRpcSub(std::string const &strUrl)=0
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:47
void setApiVersion(unsigned int apiVersion)
Definition InfoSub.cpp:201
std::shared_ptr< ReadView const > getPublishedLedger()
virtual void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &book, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, json::Value const &jvMarker, json::Value &jvResult)=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual boost::asio::io_context & getIOContext()=0
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
@ Null
'null' value
Definition json_value.h:19
static constexpr LimitRange kBookOffers
Limits for the book_offers command.
hash_set< AccountID > parseAccountIds(json::Value const &jvArray)
Parses an array of account IDs from a JSON value.
ErrorCodeI parseSubUnsubJson(Asset &asset, json::Value const &params, json::StaticString const &name, beast::Journal j)
Parse subscribe/unsubscribe parameters.
json::Value makeParamError(std::string const &message)
Returns a new json object that indicates invalid parameters.
Definition ErrorCodes.h:219
Charge const kFeeMediumBurdenRpc
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ RpcStreamMalformed
Definition ErrorCodes.h:108
@ RpcBadMarket
Definition ErrorCodes.h:79
@ RpcSuccess
Definition ErrorCodes.h:26
@ RpcActMalformed
Definition ErrorCodes.h:72
@ RpcDomainMalformed
Definition ErrorCodes.h:140
@ RpcNotEnabled
Definition ErrorCodes.h:41
@ RpcInvalidParams
Definition ErrorCodes.h:66
@ RpcNoPermission
Definition ErrorCodes.h:35
std::optional< AccountID > parseBase58(std::string const &s)
Parse AccountID from checked, base58 string.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:93
std::shared_ptr< RPCSub > makeRPCSub(InfoSub::Source &source, boost::asio::io_context &ioContext, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, ServiceRegistry &registry)
Definition RPCSub.cpp:211
json::Value rpcError(ErrorCodeI iError)
Definition RPCErr.cpp:13
@ ADMIN
Definition Role.h:24
Book reversed(Book const &book)
Definition Book.cpp:30
json::Value doSubscribe(RPC::JsonContext &)
Definition Subscribe.cpp:30
AccountID const & noAccount()
A placeholder for empty accounts.
bool isConsistent(Asset const &asset)
Definition Asset.h:312
BaseUInt< 256 > uint256
Definition base_uint.h:562
T dynamic_pointer_cast(T... args)
beast::Journal const j
Definition Context.h:20
Application & app
Definition Context.h:21
InfoSub::pointer infoSub
Definition Context.h:28
Resource::Charge & loadType
Definition Context.h:22
unsigned int apiVersion
Definition Context.h:29
NetworkOPs & netOps
Definition Context.h:23
json::Value params
Definition Context.h:43
T what(T... args)