xrpld
Loading...
Searching...
No Matches
WSClient.cpp
1#include <test/jtx/WSClient.h>
2
3#include <xrpld/core/Config.h>
4
5#include <xrpl/basics/contract.h>
6#include <xrpl/config/BasicConfig.h>
7#include <xrpl/config/Constants.h>
8#include <xrpl/json/json_reader.h>
9#include <xrpl/json/json_value.h>
10#include <xrpl/json/to_string.h>
11#include <xrpl/protocol/jss.h>
12#include <xrpl/server/Port.h>
13
14#include <boost/asio/bind_executor.hpp>
15#include <boost/asio/buffer.hpp>
16#include <boost/asio/executor_work_guard.hpp>
17#include <boost/asio/io_context.hpp>
18#include <boost/asio/ip/address_v4.hpp>
19#include <boost/asio/ip/address_v6.hpp>
20#include <boost/asio/ip/tcp.hpp>
21#include <boost/asio/post.hpp>
22#include <boost/asio/strand.hpp>
23#include <boost/beast/core/multi_buffer.hpp>
24#include <boost/beast/websocket/error.hpp>
25#include <boost/beast/websocket/rfc6455.hpp>
26#include <boost/beast/websocket/stream.hpp>
27#include <boost/beast/websocket/stream_base.hpp>
28#include <boost/system/detail/error_code.hpp>
29#include <boost/system/system_error.hpp>
30
31#include <chrono>
32#include <condition_variable>
33#include <exception>
34#include <functional>
35#include <iostream>
36#include <list>
37#include <memory>
38#include <mutex>
39#include <optional>
40#include <stdexcept>
41#include <string>
42#include <thread>
43#include <unordered_map>
44#include <utility>
45
46namespace xrpl::test {
47
48class WSClientImpl : public WSClient
49{
50 using error_code = boost::system::error_code;
51
52 struct Msg
53 {
55
56 explicit Msg(json::Value&& jv) : jv(std::move(jv))
57 {
58 }
59 };
60
61 static boost::asio::ip::tcp::endpoint
62 getEndpoint(BasicConfig const& cfg, bool v2)
63 {
64 auto& log = std::cerr;
65 ParsedPort common;
66 parsePort(common, cfg[Sections::kServer], log);
67 auto const ps = v2 ? "ws2" : "ws";
68 for (auto const& name : cfg.section(Sections::kServer).values())
69 {
70 if (!cfg.exists(name))
71 continue;
72 ParsedPort pp;
73 parsePort(pp, cfg[name], log);
74 if (!pp.protocol.contains(ps))
75 continue;
76 using namespace boost::asio::ip;
77 if (pp.ip && pp.ip->is_unspecified())
78 {
79 *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
80 : address{address_v4::loopback()};
81 }
82
83 if (!pp.port)
84 Throw<std::runtime_error>("Use fixConfigPorts with auto ports");
85
86 return {*pp.ip, *pp.port}; // NOLINT(bugprone-unchecked-optional-access)
87 }
88 Throw<std::runtime_error>("Missing WebSocket port");
89 return {}; // Silence compiler control paths return value warning
90 }
91
92 template <class ConstBuffers>
93 static std::string
94 bufferString(ConstBuffers const& b)
95 {
96 using boost::asio::buffer;
97 using boost::asio::buffer_size;
99 s.resize(buffer_size(b));
100 buffer_copy(buffer(&s[0], s.size()), b);
101 return s;
102 }
103
104 boost::asio::io_context ios_;
106 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
108 boost::asio::ip::tcp::socket stream_;
109 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
110 boost::beast::multi_buffer rb_;
111
112 bool peerClosed_ = false;
113
114 // synchronize destructor
115 bool b0_ = false;
118
119 // synchronize message queue
123
124 unsigned rpcVersion_;
125
126 void
128 {
129 boost::asio::post(ios_, boost::asio::bind_executor(strand_, [this] {
130 if (!peerClosed_)
131 {
132 ws_.async_close(
133 {}, boost::asio::bind_executor(strand_, [&](error_code) {
134 try
135 {
136 stream_.cancel();
137 }
138 // NOLINTNEXTLINE(bugprone-empty-catch)
139 catch (boost::system::system_error const&)
140 {
141 // ignored
142 }
143 }));
144 }
145 }));
146 work_ = std::nullopt;
147 thread_.join();
148 }
149
150public:
152 Config const& cfg,
153 bool v2,
154 unsigned rpcVersion,
156 : work_(std::in_place, boost::asio::make_work_guard(ios_))
157 , strand_(boost::asio::make_strand(ios_))
158 , thread_([&] { ios_.run(); })
159 , stream_(ios_)
160 , ws_(stream_)
161 , rpcVersion_(rpcVersion)
162 {
163 try
164 {
165 auto const ep = getEndpoint(cfg, v2);
166 stream_.connect(ep);
167 ws_.set_option(
168 boost::beast::websocket::stream_base::decorator(
169 [&](boost::beast::websocket::request_type& req) {
170 for (auto const& h : headers)
171 req.set(h.first, h.second);
172 }));
173 ws_.handshake(ep.address().to_string() + ":" + std::to_string(ep.port()), "/");
174 ws_.async_read(
175 rb_,
176 boost::asio::bind_executor(
177 strand_, std::bind(&WSClientImpl::onReadMsg, this, std::placeholders::_1)));
178 }
179 catch (std::exception&)
180 {
181 cleanup();
182 rethrow();
183 }
184 }
185
186 ~WSClientImpl() override
187 {
188 cleanup();
189 }
190
192 invoke(std::string const& cmd, json::Value const& params) override
193 {
194 using boost::asio::buffer;
195 using namespace std::chrono_literals;
196
197 {
198 json::Value jp;
199 if (params)
200 jp = params;
201 if (rpcVersion_ == 2)
202 {
203 jp[jss::method] = cmd;
204 jp[jss::jsonrpc] = "2.0";
205 jp[jss::ripplerpc] = "2.0";
206 jp[jss::id] = 5;
207 }
208 else
209 {
210 jp[jss::command] = cmd;
211 }
212 auto const s = to_string(jp);
213
214 // Use the error_code overload to avoid an unhandled exception
215 // when the server closes the WebSocket connection (e.g. after
216 // booting a client that exceeded resource thresholds).
217 error_code ec;
218 ws_.write_some(true, buffer(s), ec);
219 if (ec)
220 return {};
221 }
222
223 auto jv =
224 findMsg(5s, [&](json::Value const& jval) { return jval[jss::type] == jss::response; });
225 if (jv)
226 {
227 // Normalize JSON output
228 jv->removeMember(jss::type);
229 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
230 {
231 json::Value ret;
232 ret[jss::result] = *jv;
233 if ((*jv).isMember(jss::error))
234 ret[jss::error] = (*jv)[jss::error];
235 ret[jss::status] = jss::error;
236 return ret;
237 }
238 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
239 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
240 return *jv;
241 }
242 return {};
243 }
244
246 getMsg(std::chrono::milliseconds const& timeout) override
247 {
249 {
251 if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
252 return std::nullopt;
253 m = std::move(msgs_.back());
254 msgs_.pop_back();
255 }
256 return std::move(m->jv);
257 }
258
260 findMsg(std::chrono::milliseconds const& timeout, std::function<bool(json::Value const&)> pred)
261 override
262 {
264 {
266 if (!cv_.wait_for(lock, timeout, [&] {
267 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
268 {
269 if (pred((*it)->jv))
270 {
271 m = std::move(*it);
272 msgs_.erase(it);
273 return true;
274 }
275 }
276 return false;
277 }))
278 {
279 return std::nullopt;
280 }
281 }
282 return std::move(m->jv);
283 }
284
285 [[nodiscard]] unsigned
286 version() const override
287 {
288 return rpcVersion_;
289 }
290
291private:
292 void
294 {
295 if (ec)
296 {
297 if (ec == boost::beast::websocket::error::closed)
298 peerClosed_ = true;
299 return;
300 }
301
302 json::Value jv;
303 json::Reader jr;
304 jr.parse(bufferString(rb_.data()), jv);
305 rb_.consume(rb_.size());
306 auto m = std::make_shared<Msg>(std::move(jv));
307 {
308 std::scoped_lock const lock(m_);
309 msgs_.push_front(m);
310 cv_.notify_all();
311 }
312 ws_.async_read(
313 rb_,
314 boost::asio::bind_executor(
315 strand_, std::bind(&WSClientImpl::onReadMsg, this, std::placeholders::_1)));
316 }
317
318 // Called when the read op terminates
319 void
321 {
322 std::scoped_lock const lock(m0_);
323 b0_ = true;
324 cv0_.notify_all();
325 }
326};
327
330 Config const& cfg,
331 bool v2,
332 unsigned rpcVersion,
334{
335 return std::make_unique<WSClientImpl>(cfg, v2, rpcVersion, headers);
336}
337
338} // namespace xrpl::test
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:17
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:130
Holds unparsed configuration information.
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:58
std::list< std::shared_ptr< Msg > > msgs_
Definition WSClient.cpp:122
void onReadMsg(error_code const &ec)
Definition WSClient.cpp:293
boost::beast::multi_buffer rb_
Definition WSClient.cpp:110
boost::asio::io_context ios_
Definition WSClient.cpp:104
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition WSClient.cpp:286
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition WSClient.cpp:105
boost::asio::ip::tcp::socket stream_
Definition WSClient.cpp:108
std::optional< json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition WSClient.cpp:246
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition WSClient.cpp:109
json::Value invoke(std::string const &cmd, json::Value const &params) override
Submit a command synchronously.
Definition WSClient.cpp:192
std::optional< json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition WSClient.cpp:260
std::condition_variable cv0_
Definition WSClient.cpp:117
WSClientImpl(Config const &cfg, bool v2, unsigned rpcVersion, std::unordered_map< std::string, std::string > const &headers={})
Definition WSClient.cpp:151
std::condition_variable cv_
Definition WSClient.cpp:121
boost::system::error_code error_code
Definition WSClient.cpp:50
static std::string bufferString(ConstBuffers const &b)
Definition WSClient.cpp:94
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition WSClient.cpp:62
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition WSClient.cpp:106
T contains(T... args)
T make_shared(T... args)
T make_unique(T... args)
STL namespace.
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpcVersion, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition WSClient.cpp:329
void parsePort(ParsedPort &port, Section const &section, std::ostream &log)
Definition Port.cpp:195
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
XRPL_NO_SANITIZE_ADDRESS void rethrow()
Rethrow the exception currently being handled.
Definition contract.h:33
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
T resize(T... args)
T size(T... args)
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:80
std::optional< boost::asio::ip::address > ip
Definition Port.h:93
std::optional< std::uint16_t > port
Definition Port.h:94
static constexpr auto kServer
Definition Constants.h:55
T to_string(T... args)