xrpld
Loading...
Searching...
No Matches
RPCSub.cpp
1#include <xrpld/rpc/RPCSub.h>
2
3#include <xrpld/rpc/RPCCall.h>
4
5#include <xrpl/basics/Log.h>
6#include <xrpl/basics/StringUtilities.h>
7#include <xrpl/basics/contract.h>
8#include <xrpl/core/Job.h>
9#include <xrpl/core/JobQueue.h>
10#include <xrpl/core/ServiceRegistry.h>
11#include <xrpl/json/json_value.h>
12#include <xrpl/json/to_string.h> // IWYU pragma: keep
13#include <xrpl/server/InfoSub.h>
14
15#include <boost/asio/io_context.hpp>
16
17#include <cstdint>
18#include <deque>
19#include <exception>
20#include <functional>
21#include <memory>
22#include <mutex>
23#include <stdexcept>
24#include <string>
25#include <utility>
26
27namespace xrpl {
28
29// Subscription object for JSON-RPC
30class RPCSubImp : public RPCSub
31{
32public:
34 InfoSub::Source& source,
35 boost::asio::io_context& ioContext,
36 JobQueue& jobQueue,
37 std::string const& strUrl,
38 std::string strUsername,
39 std::string strPassword,
40 ServiceRegistry& registry)
41 : RPCSub(source)
42 , ioContext_(ioContext)
43 , jobQueue_(jobQueue)
44 , url_(strUrl)
45 , username_(std::move(strUsername))
46 , password_(std::move(strPassword))
47 , j_(registry.getJournal("RPCSub"))
48 , logs_(registry.getLogs())
49 {
50 ParsedUrl pUrl;
51
52 if (!parseUrl(pUrl, strUrl))
53 {
54 Throw<std::runtime_error>("Failed to parse url.");
55 }
56 else if (pUrl.scheme == "https")
57 {
58 ssl_ = true;
59 }
60 else if (pUrl.scheme != "http")
61 {
62 Throw<std::runtime_error>("Only http and https is supported.");
63 }
64
65 seq_ = 1;
66
67 ip_ = pUrl.domain;
68 if (!pUrl.port)
69 {
70 port_ = ssl_ ? 443 : 80;
71 }
72 else
73 {
74 port_ = *pUrl.port;
75 }
76 path_ = pUrl.path;
77
78 JLOG(j_.info()) << "RPCCall::fromNetwork sub: ip=" << ip_ << " port=" << port_
79 << " ssl= " << (ssl_ ? "yes" : "no") << " path='" << path_ << "'";
80 }
81
82 ~RPCSubImp() override = default;
83
84 void
85 send(json::Value const& jvObj, bool broadcast) override
86 {
87 std::scoped_lock const sl(lock_);
88
89 auto jm = broadcast ? j_.debug() : j_.info();
90 JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
91
92 deque_.emplace_back(seq_++, jvObj);
93
94 if (!sending_)
95 {
96 // Start a sending thread.
97 JLOG(j_.info()) << "RPCCall::fromNetwork start";
98
99 sending_ =
100 jobQueue_.addJob(JtClientSubscribe, "RPCSubSendThr", [this]() { sendThread(); });
101 }
102 }
103
104 void
105 setUsername(std::string const& strUsername) override
106 {
107 std::scoped_lock const sl(lock_);
108
109 username_ = strUsername;
110 }
111
112 void
113 setPassword(std::string const& strPassword) override
114 {
115 std::scoped_lock const sl(lock_);
116
117 password_ = strPassword;
118 }
119
120private:
121 // XXX Could probably create a bunch of send jobs in a single get of the
122 // lock.
123 void
125 {
126 json::Value jvEvent;
127 bool bSend = false;
128
129 do
130 {
131 {
132 // Obtain the lock to manipulate the queue and change sending.
133 std::scoped_lock const sl(lock_);
134
135 if (deque_.empty())
136 {
137 sending_ = false;
138 bSend = false;
139 }
140 else
141 {
142 auto const [seq, env] = deque_.front();
143
144 deque_.pop_front();
145
146 jvEvent = env;
147 jvEvent["seq"] = seq;
148
149 bSend = true;
150 }
151 }
152
153 // Send outside of the lock.
154 if (bSend)
155 {
156 // XXX Might not need this in a try.
157 try
158 {
159 JLOG(j_.info()) << "RPCCall::fromNetwork: " << ip_;
160
163 ip_,
164 port_,
165 username_,
166 password_,
167 path_,
168 "event",
169 jvEvent,
170 ssl_,
171 true,
172 logs_);
173 }
174 catch (std::exception const& e)
175 {
176 JLOG(j_.info()) << "RPCCall::fromNetwork exception: " << e.what();
177 }
178 }
179 } while (bSend);
180 }
181
182private:
183 boost::asio::io_context& ioContext_;
185
189 bool ssl_{false};
193
194 int seq_; // Next id to allocate.
195
196 bool sending_{false}; // Sending thread is active.
197
199
202};
203
204//------------------------------------------------------------------------------
205
207{
208}
209
212 InfoSub::Source& source,
213 boost::asio::io_context& ioContext,
214 JobQueue& jobQueue,
215 std::string const& strUrl,
216 std::string const& strUsername,
217 std::string const& strPassword,
218 ServiceRegistry& registry)
219{
221 std::ref(source),
222 std::ref(ioContext),
223 std::ref(jobQueue),
224 strUrl,
225 strUsername,
226 strPassword,
227 registry);
228}
229
230} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:38
Represents a JSON value.
Definition json_value.h:130
Abstracts the source of subscription data.
Definition InfoSub.h:61
InfoSub(Source &source)
Definition InfoSub.cpp:56
Resource::Consumer Consumer
Definition InfoSub.h:55
std::mutex lock_
Definition InfoSub.h:291
A pool of threads to perform work.
Definition JobQueue.h:43
Manages partitions for logging.
Definition Log.h:20
beast::Journal const j_
Definition RPCSub.cpp:200
void send(json::Value const &jvObj, bool broadcast) override
Definition RPCSub.cpp:85
std::string ip_
Definition RPCSub.cpp:187
~RPCSubImp() override=default
std::string url_
Definition RPCSub.cpp:186
RPCSubImp(InfoSub::Source &source, boost::asio::io_context &ioContext, JobQueue &jobQueue, std::string const &strUrl, std::string strUsername, std::string strPassword, ServiceRegistry &registry)
Definition RPCSub.cpp:33
void setUsername(std::string const &strUsername) override
Definition RPCSub.cpp:105
std::string path_
Definition RPCSub.cpp:192
void setPassword(std::string const &strPassword) override
Definition RPCSub.cpp:113
boost::asio::io_context & ioContext_
Definition RPCSub.cpp:183
std::string password_
Definition RPCSub.cpp:191
std::uint16_t port_
Definition RPCSub.cpp:188
std::string username_
Definition RPCSub.cpp:190
void sendThread()
Definition RPCSub.cpp:124
std::deque< std::pair< int, json::Value > > deque_
Definition RPCSub.cpp:198
JobQueue & jobQueue_
Definition RPCSub.cpp:184
RPCSub(InfoSub::Source &source)
Definition RPCSub.cpp:206
Service registry for dependency injection.
T make_shared(T... args)
STL namespace.
void fromNetwork(boost::asio::io_context &ioContext, std::string const &strIp, std::uint16_t const iPort, std::string const &strUsername, std::string const &strPassword, std::string const &strPath, std::string const &strMethod, json::Value const &jvParams, bool const bSSL, bool const quiet, Logs &logs, std::function< void(json::Value const &jvInput)> callbackFuncP, std::unordered_map< std::string, std::string > headers)
Definition RPCCall.cpp:1831
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::shared_ptr< RPCSub > makeRPCSub(InfoSub::Source &source, boost::asio::io_context &ioContext, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, ServiceRegistry &registry)
Definition RPCSub.cpp:211
@ JtClientSubscribe
Definition Job.h:27
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
bool parseUrl(ParsedUrl &pUrl, std::string const &strUrl)
T ref(T... args)
std::optional< std::uint16_t > port
T what(T... args)