rippled
Loading...
Searching...
No Matches
RPCSub.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/rpc/RPCCall.h>
21#include <xrpld/rpc/RPCSub.h>
22
23#include <xrpl/basics/Log.h>
24#include <xrpl/basics/StringUtilities.h>
25#include <xrpl/basics/contract.h>
26#include <xrpl/json/to_string.h>
27
28#include <deque>
29
30namespace ripple {
31
32// Subscription object for JSON-RPC
33class RPCSubImp : public RPCSub
34{
35public:
37 InfoSub::Source& source,
38 boost::asio::io_context& io_context,
39 JobQueue& jobQueue,
40 std::string const& strUrl,
41 std::string const& strUsername,
42 std::string const& strPassword,
43 Logs& logs)
44 : RPCSub(source)
45 , m_io_context(io_context)
46 , m_jobQueue(jobQueue)
47 , mUrl(strUrl)
48 , mSSL(false)
49 , mUsername(strUsername)
50 , mPassword(strPassword)
51 , mSending(false)
52 , j_(logs.journal("RPCSub"))
53 , logs_(logs)
54 {
55 parsedURL pUrl;
56
57 if (!parseUrl(pUrl, strUrl))
58 Throw<std::runtime_error>("Failed to parse url.");
59 else if (pUrl.scheme == "https")
60 mSSL = true;
61 else if (pUrl.scheme != "http")
62 Throw<std::runtime_error>("Only http and https is supported.");
63
64 mSeq = 1;
65
66 mIp = pUrl.domain;
67 mPort = (!pUrl.port) ? (mSSL ? 443 : 80) : *pUrl.port;
68 mPath = pUrl.path;
69
70 JLOG(j_.info()) << "RPCCall::fromNetwork sub: ip=" << mIp
71 << " port=" << mPort
72 << " ssl= " << (mSSL ? "yes" : "no") << " path='"
73 << mPath << "'";
74 }
75
76 ~RPCSubImp() = default;
77
78 void
79 send(Json::Value const& jvObj, bool broadcast) override
80 {
82
83 auto jm = broadcast ? j_.debug() : j_.info();
84 JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
85
87
88 if (!mSending)
89 {
90 // Start a sending thread.
91 JLOG(j_.info()) << "RPCCall::fromNetwork start";
92
94 jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
95 sendThread();
96 });
97 }
98 }
99
100 void
101 setUsername(std::string const& strUsername) override
102 {
104
105 mUsername = strUsername;
106 }
107
108 void
109 setPassword(std::string const& strPassword) override
110 {
112
113 mPassword = strPassword;
114 }
115
116private:
117 // XXX Could probably create a bunch of send jobs in a single get of the
118 // lock.
119 void
121 {
122 Json::Value jvEvent;
123 bool bSend;
124
125 do
126 {
127 {
128 // Obtain the lock to manipulate the queue and change sending.
130
131 if (mDeque.empty())
132 {
133 mSending = false;
134 bSend = false;
135 }
136 else
137 {
138 auto const [seq, env] = mDeque.front();
139
141
142 jvEvent = env;
143 jvEvent["seq"] = seq;
144
145 bSend = true;
146 }
147 }
148
149 // Send outside of the lock.
150 if (bSend)
151 {
152 // XXX Might not need this in a try.
153 try
154 {
155 JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
156
159 mIp,
160 mPort,
161 mUsername,
162 mPassword,
163 mPath,
164 "event",
165 jvEvent,
166 mSSL,
167 true,
168 logs_);
169 }
170 catch (std::exception const& e)
171 {
172 JLOG(j_.info())
173 << "RPCCall::fromNetwork exception: " << e.what();
174 }
175 }
176 } while (bSend);
177 }
178
179private:
180 boost::asio::io_context& m_io_context;
182
186 bool mSSL;
190
191 int mSeq; // Next id to allocate.
192
193 bool mSending; // Sending threead is active.
194
196
199};
200
201//------------------------------------------------------------------------------
202
204{
205}
206
209 InfoSub::Source& source,
210 boost::asio::io_context& io_context,
211 JobQueue& jobQueue,
212 std::string const& strUrl,
213 std::string const& strUsername,
214 std::string const& strPassword,
215 Logs& logs)
216{
218 std::ref(source),
219 std::ref(io_context),
220 std::ref(jobQueue),
221 strUrl,
222 strUsername,
223 strPassword,
224 logs);
225}
226
227} // namespace ripple
Represents a JSON value.
Definition json_value.h:149
A generic endpoint for log messages.
Definition Journal.h:60
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Abstracts the source of subscription data.
Definition InfoSub.h:68
Manages a client's subscription to data feeds.
Definition InfoSub.h:52
std::mutex mLock
Definition InfoSub.h:239
A pool of threads to perform work.
Definition JobQueue.h:58
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:168
Manages partitions for logging.
Definition Log.h:52
~RPCSubImp()=default
std::string mIp
Definition RPCSub.cpp:184
std::deque< std::pair< int, Json::Value > > mDeque
Definition RPCSub.cpp:195
void setPassword(std::string const &strPassword) override
Definition RPCSub.cpp:109
std::string mUsername
Definition RPCSub.cpp:187
beast::Journal const j_
Definition RPCSub.cpp:197
std::uint16_t mPort
Definition RPCSub.cpp:185
JobQueue & m_jobQueue
Definition RPCSub.cpp:181
boost::asio::io_context & m_io_context
Definition RPCSub.cpp:180
void setUsername(std::string const &strUsername) override
Definition RPCSub.cpp:101
std::string mPath
Definition RPCSub.cpp:189
void send(Json::Value const &jvObj, bool broadcast) override
Definition RPCSub.cpp:79
RPCSubImp(InfoSub::Source &source, boost::asio::io_context &io_context, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition RPCSub.cpp:36
std::string mPassword
Definition RPCSub.cpp:188
std::string mUrl
Definition RPCSub.cpp:183
Subscription object for JSON RPC.
Definition RPCSub.h:32
RPCSub(InfoSub::Source &source)
Definition RPCSub.cpp:203
An endpoint that consumes resources.
Definition Consumer.h:36
T empty(T... args)
T front(T... args)
T is_same_v
T make_pair(T... args)
void fromNetwork(boost::asio::io_context &io_context, std::string const &strIp, std::uint16_t const iPort, std::string const &strUsername, std::string const &strPassword, std::string const &strPath, std::string const &strMethod, Json::Value const &jvParams, bool const bSSL, bool const quiet, Logs &logs, std::function< void(Json::Value const &jvInput)> callbackFuncP, std::unordered_map< std::string, std::string > headers)
Definition RPCCall.cpp:1661
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
std::shared_ptr< RPCSub > make_RPCSub(InfoSub::Source &source, boost::asio::io_context &io_context, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition RPCSub.cpp:208
@ jtCLIENT_SUBSCRIBE
Definition Job.h:46
T pop_front(T... args)
T push_back(T... args)
T ref(T... args)
std::optional< std::uint16_t > port
T what(T... args)