rippled
Loading...
Searching...
No Matches
WSClient.cpp
1#include <test/jtx.h>
2#include <test/jtx/WSClient.h>
3
4#include <xrpl/json/json_reader.h>
5#include <xrpl/json/to_string.h>
6#include <xrpl/protocol/jss.h>
7#include <xrpl/server/Port.h>
8
9#include <boost/asio/executor_work_guard.hpp>
10#include <boost/asio/io_context.hpp>
11#include <boost/asio/strand.hpp>
12#include <boost/beast/core/multi_buffer.hpp>
13#include <boost/beast/websocket.hpp>
14
15#include <iostream>
16#include <string>
17#include <unordered_map>
18
19namespace ripple {
20namespace test {
21
22class WSClientImpl : public WSClient
23{
24 using error_code = boost::system::error_code;
25
26 struct msg
27 {
29
30 explicit msg(Json::Value&& jv_) : jv(jv_)
31 {
32 }
33 };
34
35 static boost::asio::ip::tcp::endpoint
36 getEndpoint(BasicConfig const& cfg, bool v2)
37 {
38 auto& log = std::cerr;
39 ParsedPort common;
40 parse_Port(common, cfg["server"], log);
41 auto const ps = v2 ? "ws2" : "ws";
42 for (auto const& name : cfg.section("server").values())
43 {
44 if (!cfg.exists(name))
45 continue;
46 ParsedPort pp;
47 parse_Port(pp, cfg[name], log);
48 if (pp.protocol.count(ps) == 0)
49 continue;
50 using namespace boost::asio::ip;
51 if (pp.ip && pp.ip->is_unspecified())
52 *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
53 : address{address_v4::loopback()};
54
55 if (!pp.port)
56 Throw<std::runtime_error>("Use fixConfigPorts with auto ports");
57
58 return {*pp.ip, *pp.port};
59 }
60 Throw<std::runtime_error>("Missing WebSocket port");
61 return {}; // Silence compiler control paths return value warning
62 }
63
64 template <class ConstBuffers>
65 static std::string
66 buffer_string(ConstBuffers const& b)
67 {
68 using boost::asio::buffer;
69 using boost::asio::buffer_size;
71 s.resize(buffer_size(b));
72 buffer_copy(buffer(&s[0], s.size()), b);
73 return s;
74 }
75
76 boost::asio::io_context ios_;
77 std::optional<boost::asio::executor_work_guard<
78 boost::asio::io_context::executor_type>>
80 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
82 boost::asio::ip::tcp::socket stream_;
83 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
84 boost::beast::multi_buffer rb_;
85
86 bool peerClosed_ = false;
87
88 // synchronize destructor
89 bool b0_ = false;
92
93 // synchronize message queue
97
98 unsigned rpc_version_;
99
100 void
102 {
103 boost::asio::post(
104 ios_, boost::asio::bind_executor(strand_, [this] {
105 if (!peerClosed_)
106 {
107 ws_.async_close(
108 {},
109 boost::asio::bind_executor(strand_, [&](error_code) {
110 try
111 {
112 stream_.cancel();
113 }
114 catch (boost::system::system_error const&)
115 {
116 // ignored
117 }
118 }));
119 }
120 }));
122 thread_.join();
123 }
124
125public:
127 Config const& cfg,
128 bool v2,
129 unsigned rpc_version,
131 : work_(std::in_place, boost::asio::make_work_guard(ios_))
132 , strand_(boost::asio::make_strand(ios_))
133 , thread_([&] { ios_.run(); })
134 , stream_(ios_)
135 , ws_(stream_)
136 , rpc_version_(rpc_version)
137 {
138 try
139 {
140 auto const ep = getEndpoint(cfg, v2);
141 stream_.connect(ep);
142 ws_.set_option(boost::beast::websocket::stream_base::decorator(
143 [&](boost::beast::websocket::request_type& req) {
144 for (auto const& h : headers)
145 req.set(h.first, h.second);
146 }));
147 ws_.handshake(
148 ep.address().to_string() + ":" + std::to_string(ep.port()),
149 "/");
150 ws_.async_read(
151 rb_,
152 boost::asio::bind_executor(
153 strand_,
154 std::bind(
156 this,
157 std::placeholders::_1)));
158 }
159 catch (std::exception&)
160 {
161 cleanup();
162 Rethrow();
163 }
164 }
165
166 ~WSClientImpl() override
167 {
168 cleanup();
169 }
170
172 invoke(std::string const& cmd, Json::Value const& params) override
173 {
174 using boost::asio::buffer;
175 using namespace std::chrono_literals;
176
177 {
178 Json::Value jp;
179 if (params)
180 jp = params;
181 if (rpc_version_ == 2)
182 {
183 jp[jss::method] = cmd;
184 jp[jss::jsonrpc] = "2.0";
185 jp[jss::ripplerpc] = "2.0";
186 jp[jss::id] = 5;
187 }
188 else
189 jp[jss::command] = cmd;
190 auto const s = to_string(jp);
191 ws_.write_some(true, buffer(s));
192 }
193
194 auto jv = findMsg(5s, [&](Json::Value const& jval) {
195 return jval[jss::type] == jss::response;
196 });
197 if (jv)
198 {
199 // Normalize JSON output
200 jv->removeMember(jss::type);
201 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
202 {
203 Json::Value ret;
204 ret[jss::result] = *jv;
205 if ((*jv).isMember(jss::error))
206 ret[jss::error] = (*jv)[jss::error];
207 ret[jss::status] = jss::error;
208 return ret;
209 }
210 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
211 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
212 return *jv;
213 }
214 return {};
215 }
216
218 getMsg(std::chrono::milliseconds const& timeout) override
219 {
221 {
223 if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
224 return std::nullopt;
225 m = std::move(msgs_.back());
226 msgs_.pop_back();
227 }
228 return std::move(m->jv);
229 }
230
233 std::chrono::milliseconds const& timeout,
234 std::function<bool(Json::Value const&)> pred) override
235 {
237 {
239 if (!cv_.wait_for(lock, timeout, [&] {
240 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
241 {
242 if (pred((*it)->jv))
243 {
244 m = std::move(*it);
245 msgs_.erase(it);
246 return true;
247 }
248 }
249 return false;
250 }))
251 {
252 return std::nullopt;
253 }
254 }
255 return std::move(m->jv);
256 }
257
258 unsigned
259 version() const override
260 {
261 return rpc_version_;
262 }
263
264private:
265 void
267 {
268 if (ec)
269 {
270 if (ec == boost::beast::websocket::error::closed)
271 peerClosed_ = true;
272 return;
273 }
274
275 Json::Value jv;
276 Json::Reader jr;
277 jr.parse(buffer_string(rb_.data()), jv);
278 rb_.consume(rb_.size());
279 auto m = std::make_shared<msg>(std::move(jv));
280 {
281 std::lock_guard lock(m_);
282 msgs_.push_front(m);
283 cv_.notify_all();
284 }
285 ws_.async_read(
286 rb_,
287 boost::asio::bind_executor(
288 strand_,
289 std::bind(
290 &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
291 }
292
293 // Called when the read op terminates
294 void
296 {
297 std::lock_guard lock(m0_);
298 b0_ = true;
299 cv0_.notify_all();
300 }
301};
302
305 Config const& cfg,
306 bool v2,
307 unsigned rpc_version,
309{
310 return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
311}
312
313} // namespace test
314} // namespace ripple
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:20
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:130
Holds unparsed configuration information.
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:60
boost::asio::io_context ios_
Definition WSClient.cpp:76
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition WSClient.cpp:259
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
Definition WSClient.cpp:126
boost::system::error_code error_code
Definition WSClient.cpp:24
std::condition_variable cv0_
Definition WSClient.cpp:91
static std::string buffer_string(ConstBuffers const &b)
Definition WSClient.cpp:66
boost::beast::multi_buffer rb_
Definition WSClient.cpp:84
std::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition WSClient.cpp:218
Json::Value invoke(std::string const &cmd, Json::Value const &params) override
Submit a command synchronously.
Definition WSClient.cpp:172
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition WSClient.cpp:80
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition WSClient.cpp:36
void on_read_msg(error_code const &ec)
Definition WSClient.cpp:266
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition WSClient.cpp:83
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition WSClient.cpp:79
std::list< std::shared_ptr< msg > > msgs_
Definition WSClient.cpp:96
boost::asio::ip::tcp::socket stream_
Definition WSClient.cpp:82
std::condition_variable cv_
Definition WSClient.cpp:95
std::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition WSClient.cpp:232
T count(T... args)
T is_same_v
T join(T... args)
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition WSClient.cpp:304
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition Port.cpp:195
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
void Rethrow()
Rethrow the exception currently being handled.
Definition contract.h:29
STL namespace.
T push_front(T... args)
T resize(T... args)
T size(T... args)
std::optional< std::uint16_t > port
Definition Port.h:97
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:83
std::optional< boost::asio::ip::address > ip
Definition Port.h:96
T to_string(T... args)