rippled
Loading...
Searching...
No Matches
RPCSub.cpp
1#include <xrpld/rpc/RPCCall.h>
2#include <xrpld/rpc/RPCSub.h>
3
4#include <xrpl/basics/Log.h>
5#include <xrpl/basics/StringUtilities.h>
6#include <xrpl/basics/contract.h>
7#include <xrpl/json/to_string.h>
8
9#include <deque>
10
11namespace xrpl {
12
13// Subscription object for JSON-RPC
14class RPCSubImp : public RPCSub
15{
16public:
18 InfoSub::Source& source,
19 boost::asio::io_context& io_context,
20 JobQueue& jobQueue,
21 std::string const& strUrl,
22 std::string const& strUsername,
23 std::string const& strPassword,
24 ServiceRegistry& registry)
25 : RPCSub(source)
26 , m_io_context(io_context)
27 , m_jobQueue(jobQueue)
28 , mUrl(strUrl)
29 , mUsername(strUsername)
30 , mPassword(strPassword)
31 , j_(registry.getJournal("RPCSub"))
32 , logs_(registry.getLogs())
33 {
34 parsedURL pUrl;
35
36 if (!parseUrl(pUrl, strUrl))
37 {
38 Throw<std::runtime_error>("Failed to parse url.");
39 }
40 else if (pUrl.scheme == "https")
41 {
42 mSSL = true;
43 }
44 else if (pUrl.scheme != "http")
45 {
46 Throw<std::runtime_error>("Only http and https is supported.");
47 }
48
49 mSeq = 1;
50
51 mIp = pUrl.domain;
52 if (!pUrl.port)
53 {
54 mPort = mSSL ? 443 : 80;
55 }
56 else
57 {
58 mPort = *pUrl.port;
59 }
60 mPath = pUrl.path;
61
62 JLOG(j_.info()) << "RPCCall::fromNetwork sub: ip=" << mIp << " port=" << mPort
63 << " ssl= " << (mSSL ? "yes" : "no") << " path='" << mPath << "'";
64 }
65
66 ~RPCSubImp() = default;
67
68 void
69 send(Json::Value const& jvObj, bool broadcast) override
70 {
71 std::lock_guard const sl(mLock);
72
73 auto jm = broadcast ? j_.debug() : j_.info();
74 JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
75
77
78 if (!mSending)
79 {
80 // Start a sending thread.
81 JLOG(j_.info()) << "RPCCall::fromNetwork start";
82
83 mSending =
84 m_jobQueue.addJob(jtCLIENT_SUBSCRIBE, "RPCSubSendThr", [this]() { sendThread(); });
85 }
86 }
87
88 void
89 setUsername(std::string const& strUsername) override
90 {
91 std::lock_guard const sl(mLock);
92
93 mUsername = strUsername;
94 }
95
96 void
97 setPassword(std::string const& strPassword) override
98 {
99 std::lock_guard const sl(mLock);
100
101 mPassword = strPassword;
102 }
103
104private:
105 // XXX Could probably create a bunch of send jobs in a single get of the
106 // lock.
107 void
109 {
110 Json::Value jvEvent;
111 bool bSend = false;
112
113 do
114 {
115 {
116 // Obtain the lock to manipulate the queue and change sending.
117 std::lock_guard const sl(mLock);
118
119 if (mDeque.empty())
120 {
121 mSending = false;
122 bSend = false;
123 }
124 else
125 {
126 auto const [seq, env] = mDeque.front();
127
129
130 jvEvent = env;
131 jvEvent["seq"] = seq;
132
133 bSend = true;
134 }
135 }
136
137 // Send outside of the lock.
138 if (bSend)
139 {
140 // XXX Might not need this in a try.
141 try
142 {
143 JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
144
147 mIp,
148 mPort,
149 mUsername,
150 mPassword,
151 mPath,
152 "event",
153 jvEvent,
154 mSSL,
155 true,
156 logs_);
157 }
158 catch (std::exception const& e)
159 {
160 JLOG(j_.info()) << "RPCCall::fromNetwork exception: " << e.what();
161 }
162 }
163 } while (bSend);
164 }
165
166private:
167 boost::asio::io_context& m_io_context;
169
173 bool mSSL{false};
177
178 int mSeq; // Next id to allocate.
179
180 bool mSending{false}; // Sending thread is active.
181
183
186};
187
188//------------------------------------------------------------------------------
189
191{
192}
193
196 InfoSub::Source& source,
197 boost::asio::io_context& io_context,
198 JobQueue& jobQueue,
199 std::string const& strUrl,
200 std::string const& strUsername,
201 std::string const& strPassword,
202 ServiceRegistry& registry)
203{
205 std::ref(source),
206 std::ref(io_context),
207 std::ref(jobQueue),
208 strUrl,
209 strUsername,
210 strPassword,
211 registry);
212}
213
214} // namespace xrpl
Represents a JSON value.
Definition json_value.h:130
A generic endpoint for log messages.
Definition Journal.h:40
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Abstracts the source of subscription data.
Definition InfoSub.h:47
Manages a client's subscription to data feeds.
Definition InfoSub.h:31
std::mutex mLock
Definition InfoSub.h:210
A pool of threads to perform work.
Definition JobQueue.h:38
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
Manages partitions for logging.
Definition Log.h:32
beast::Journal const j_
Definition RPCSub.cpp:184
void send(Json::Value const &jvObj, bool broadcast) override
Definition RPCSub.cpp:69
RPCSubImp(InfoSub::Source &source, boost::asio::io_context &io_context, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, ServiceRegistry &registry)
Definition RPCSub.cpp:17
std::string mIp
Definition RPCSub.cpp:171
std::string mUsername
Definition RPCSub.cpp:174
std::deque< std::pair< int, Json::Value > > mDeque
Definition RPCSub.cpp:182
std::string mPath
Definition RPCSub.cpp:176
void setUsername(std::string const &strUsername) override
Definition RPCSub.cpp:89
void setPassword(std::string const &strPassword) override
Definition RPCSub.cpp:97
boost::asio::io_context & m_io_context
Definition RPCSub.cpp:167
std::string mUrl
Definition RPCSub.cpp:170
std::string mPassword
Definition RPCSub.cpp:175
JobQueue & m_jobQueue
Definition RPCSub.cpp:168
~RPCSubImp()=default
void sendThread()
Definition RPCSub.cpp:108
std::uint16_t mPort
Definition RPCSub.cpp:172
Subscription object for JSON RPC.
Definition RPCSub.h:13
RPCSub(InfoSub::Source &source)
Definition RPCSub.cpp:190
An endpoint that consumes resources.
Definition Consumer.h:16
Service registry for dependency injection.
T empty(T... args)
T front(T... args)
T is_same_v
T make_pair(T... args)
void fromNetwork(boost::asio::io_context &io_context, std::string const &strIp, std::uint16_t const iPort, std::string const &strUsername, std::string const &strPassword, std::string const &strPath, std::string const &strMethod, Json::Value const &jvParams, bool const bSSL, bool const quiet, Logs &logs, std::function< void(Json::Value const &jvInput)> callbackFuncP, std::unordered_map< std::string, std::string > headers)
Definition RPCCall.cpp:1668
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ jtCLIENT_SUBSCRIBE
Definition Job.h:25
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
std::shared_ptr< RPCSub > make_RPCSub(InfoSub::Source &source, boost::asio::io_context &io_context, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, ServiceRegistry &registry)
Definition RPCSub.cpp:195
T pop_front(T... args)
T push_back(T... args)
T ref(T... args)
std::optional< std::uint16_t > port
T what(T... args)