rippled
Loading...
Searching...
No Matches
WSClient.cpp
1#include <test/jtx.h>
2#include <test/jtx/WSClient.h>
3
4#include <xrpl/json/json_reader.h>
5#include <xrpl/json/to_string.h>
6#include <xrpl/protocol/jss.h>
7#include <xrpl/server/Port.h>
8
9#include <boost/asio/executor_work_guard.hpp>
10#include <boost/asio/io_context.hpp>
11#include <boost/asio/strand.hpp>
12#include <boost/beast/core/multi_buffer.hpp>
13#include <boost/beast/websocket.hpp>
14
15#include <iostream>
16#include <string>
17#include <unordered_map>
18
19namespace xrpl {
20namespace test {
21
22class WSClientImpl : public WSClient
23{
24 using error_code = boost::system::error_code;
25
26 struct msg
27 {
29
30 explicit msg(Json::Value&& jv_) : jv(std::move(jv_))
31 {
32 }
33 };
34
35 static boost::asio::ip::tcp::endpoint
36 getEndpoint(BasicConfig const& cfg, bool v2)
37 {
38 auto& log = std::cerr;
39 ParsedPort common;
40 parse_Port(common, cfg["server"], log);
41 auto const ps = v2 ? "ws2" : "ws";
42 for (auto const& name : cfg.section("server").values())
43 {
44 if (!cfg.exists(name))
45 continue;
46 ParsedPort pp;
47 parse_Port(pp, cfg[name], log);
48 if (pp.protocol.count(ps) == 0)
49 continue;
50 using namespace boost::asio::ip;
51 if (pp.ip && pp.ip->is_unspecified())
52 {
53 *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
54 : address{address_v4::loopback()};
55 }
56
57 if (!pp.port)
58 Throw<std::runtime_error>("Use fixConfigPorts with auto ports");
59
60 return {*pp.ip, *pp.port}; // NOLINT(bugprone-unchecked-optional-access)
61 }
62 Throw<std::runtime_error>("Missing WebSocket port");
63 return {}; // Silence compiler control paths return value warning
64 }
65
66 template <class ConstBuffers>
67 static std::string
68 buffer_string(ConstBuffers const& b)
69 {
70 using boost::asio::buffer;
71 using boost::asio::buffer_size;
73 s.resize(buffer_size(b));
74 buffer_copy(buffer(&s[0], s.size()), b);
75 return s;
76 }
77
78 boost::asio::io_context ios_;
80 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
82 boost::asio::ip::tcp::socket stream_;
83 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
84 boost::beast::multi_buffer rb_;
85
86 bool peerClosed_ = false;
87
88 // synchronize destructor
89 bool b0_ = false;
92
93 // synchronize message queue
97
98 unsigned rpc_version_;
99
100 void
102 {
103 boost::asio::post(ios_, boost::asio::bind_executor(strand_, [this] {
104 if (!peerClosed_)
105 {
106 ws_.async_close(
107 {}, boost::asio::bind_executor(strand_, [&](error_code) {
108 try
109 {
110 stream_.cancel();
111 }
112 // NOLINTNEXTLINE(bugprone-empty-catch)
113 catch (boost::system::system_error const&)
114 {
115 // ignored
116 }
117 }));
118 }
119 }));
121 thread_.join();
122 }
123
124public:
126 Config const& cfg,
127 bool v2,
128 unsigned rpc_version,
130 : work_(std::in_place, boost::asio::make_work_guard(ios_))
131 , strand_(boost::asio::make_strand(ios_))
132 , thread_([&] { ios_.run(); })
133 , stream_(ios_)
134 , ws_(stream_)
135 , rpc_version_(rpc_version)
136 {
137 try
138 {
139 auto const ep = getEndpoint(cfg, v2);
140 stream_.connect(ep);
141 ws_.set_option(
142 boost::beast::websocket::stream_base::decorator(
143 [&](boost::beast::websocket::request_type& req) {
144 for (auto const& h : headers)
145 req.set(h.first, h.second);
146 }));
147 ws_.handshake(ep.address().to_string() + ":" + std::to_string(ep.port()), "/");
148 ws_.async_read(
149 rb_,
150 boost::asio::bind_executor(
151 strand_, std::bind(&WSClientImpl::on_read_msg, this, std::placeholders::_1)));
152 }
153 catch (std::exception&)
154 {
155 cleanup();
156 Rethrow();
157 }
158 }
159
160 ~WSClientImpl() override
161 {
162 cleanup();
163 }
164
166 invoke(std::string const& cmd, Json::Value const& params) override
167 {
168 using boost::asio::buffer;
169 using namespace std::chrono_literals;
170
171 {
172 Json::Value jp;
173 if (params)
174 jp = params;
175 if (rpc_version_ == 2)
176 {
177 jp[jss::method] = cmd;
178 jp[jss::jsonrpc] = "2.0";
179 jp[jss::ripplerpc] = "2.0";
180 jp[jss::id] = 5;
181 }
182 else
183 {
184 jp[jss::command] = cmd;
185 }
186 auto const s = to_string(jp);
187 ws_.write_some(true, buffer(s));
188 }
189
190 auto jv =
191 findMsg(5s, [&](Json::Value const& jval) { return jval[jss::type] == jss::response; });
192 if (jv)
193 {
194 // Normalize JSON output
195 jv->removeMember(jss::type);
196 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
197 {
198 Json::Value ret;
199 ret[jss::result] = *jv;
200 if ((*jv).isMember(jss::error))
201 ret[jss::error] = (*jv)[jss::error];
202 ret[jss::status] = jss::error;
203 return ret;
204 }
205 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
206 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
207 return *jv;
208 }
209 return {};
210 }
211
213 getMsg(std::chrono::milliseconds const& timeout) override
214 {
216 {
218 if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
219 return std::nullopt;
220 m = std::move(msgs_.back());
221 msgs_.pop_back();
222 }
223 return std::move(m->jv);
224 }
225
227 findMsg(std::chrono::milliseconds const& timeout, std::function<bool(Json::Value const&)> pred)
228 override
229 {
231 {
233 if (!cv_.wait_for(lock, timeout, [&] {
234 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
235 {
236 if (pred((*it)->jv))
237 {
238 m = std::move(*it);
239 msgs_.erase(it);
240 return true;
241 }
242 }
243 return false;
244 }))
245 {
246 return std::nullopt;
247 }
248 }
249 return std::move(m->jv);
250 }
251
252 unsigned
253 version() const override
254 {
255 return rpc_version_;
256 }
257
258private:
259 void
261 {
262 if (ec)
263 {
264 if (ec == boost::beast::websocket::error::closed)
265 peerClosed_ = true;
266 return;
267 }
268
269 Json::Value jv;
270 Json::Reader jr;
271 jr.parse(buffer_string(rb_.data()), jv);
272 rb_.consume(rb_.size());
273 auto m = std::make_shared<msg>(std::move(jv));
274 {
275 std::lock_guard const lock(m_);
276 msgs_.push_front(m);
277 cv_.notify_all();
278 }
279 ws_.async_read(
280 rb_,
281 boost::asio::bind_executor(
282 strand_, std::bind(&WSClientImpl::on_read_msg, this, std::placeholders::_1)));
283 }
284
285 // Called when the read op terminates
286 void
288 {
289 std::lock_guard const lock(m0_);
290 b0_ = true;
291 cv0_.notify_all();
292 }
293};
294
297 Config const& cfg,
298 bool v2,
299 unsigned rpc_version,
301{
302 return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
303}
304
305} // namespace test
306} // namespace xrpl
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:17
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:130
Holds unparsed configuration information.
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:58
std::list< std::shared_ptr< msg > > msgs_
Definition WSClient.cpp:96
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
Definition WSClient.cpp:125
boost::beast::multi_buffer rb_
Definition WSClient.cpp:84
Json::Value invoke(std::string const &cmd, Json::Value const &params) override
Submit a command synchronously.
Definition WSClient.cpp:166
std::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition WSClient.cpp:213
boost::asio::io_context ios_
Definition WSClient.cpp:78
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition WSClient.cpp:253
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition WSClient.cpp:79
static std::string buffer_string(ConstBuffers const &b)
Definition WSClient.cpp:68
boost::asio::ip::tcp::socket stream_
Definition WSClient.cpp:82
std::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition WSClient.cpp:227
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition WSClient.cpp:83
std::condition_variable cv0_
Definition WSClient.cpp:91
std::condition_variable cv_
Definition WSClient.cpp:95
void on_read_msg(error_code const &ec)
Definition WSClient.cpp:260
boost::system::error_code error_code
Definition WSClient.cpp:24
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition WSClient.cpp:36
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition WSClient.cpp:80
T count(T... args)
T is_same_v
T join(T... args)
STL namespace.
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition WSClient.cpp:296
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition Port.cpp:194
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
XRPL_NO_SANITIZE_ADDRESS void Rethrow()
Rethrow the exception currently being handled.
Definition contract.h:33
T push_front(T... args)
T resize(T... args)
T size(T... args)
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:82
std::optional< boost::asio::ip::address > ip
Definition Port.h:95
std::optional< std::uint16_t > port
Definition Port.h:96
msg(Json::Value &&jv_)
Definition WSClient.cpp:30
T to_string(T... args)