1#include <test/jtx/WSClient.h>
3#include <xrpld/core/Config.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/config/BasicConfig.h>
7#include <xrpl/config/Constants.h>
8#include <xrpl/json/json_reader.h>
9#include <xrpl/json/json_value.h>
10#include <xrpl/json/to_string.h>
11#include <xrpl/protocol/jss.h>
12#include <xrpl/server/Port.h>
14#include <boost/asio/bind_executor.hpp>
15#include <boost/asio/buffer.hpp>
16#include <boost/asio/executor_work_guard.hpp>
17#include <boost/asio/io_context.hpp>
18#include <boost/asio/ip/address_v4.hpp>
19#include <boost/asio/ip/address_v6.hpp>
20#include <boost/asio/ip/tcp.hpp>
21#include <boost/asio/post.hpp>
22#include <boost/asio/strand.hpp>
23#include <boost/beast/core/multi_buffer.hpp>
24#include <boost/beast/websocket/error.hpp>
25#include <boost/beast/websocket/rfc6455.hpp>
26#include <boost/beast/websocket/stream.hpp>
27#include <boost/beast/websocket/stream_base.hpp>
28#include <boost/system/detail/error_code.hpp>
29#include <boost/system/system_error.hpp>
61 static boost::asio::ip::tcp::endpoint
67 auto const ps = v2 ?
"ws2" :
"ws";
76 using namespace boost::asio::ip;
77 if (pp.
ip && pp.
ip->is_unspecified())
79 *pp.
ip = pp.
ip->is_v6() ? address{address_v6::loopback()}
80 : address{address_v4::loopback()};
92 template <
class ConstBuffers>
96 using boost::asio::buffer;
97 using boost::asio::buffer_size;
100 buffer_copy(buffer(&s[0], s.
size()), b);
106 boost::asio::strand<boost::asio::io_context::executor_type>
strand_;
109 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&>
ws_;
110 boost::beast::multi_buffer
rb_;
129 boost::asio::post(
ios_, boost::asio::bind_executor(
strand_, [
this] {
139 catch (boost::system::system_error
const&)
146 work_ = std::nullopt;
168 boost::beast::websocket::stream_base::decorator(
169 [&](boost::beast::websocket::request_type& req) {
170 for (
auto const& h : headers)
171 req.set(h.first, h.second);
176 boost::asio::bind_executor(
179 catch (std::exception&)
194 using boost::asio::buffer;
195 using namespace std::chrono_literals;
203 jp[jss::method] = cmd;
204 jp[jss::jsonrpc] =
"2.0";
205 jp[jss::ripplerpc] =
"2.0";
210 jp[jss::command] = cmd;
218 ws_.write_some(
true, buffer(s), ec);
224 findMsg(5s, [&](
json::Value const& jval) {
return jval[jss::type] == jss::response; });
228 jv->removeMember(jss::type);
229 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
232 ret[jss::result] = *jv;
233 if ((*jv).isMember(jss::error))
234 ret[jss::error] = (*jv)[jss::error];
235 ret[jss::status] = jss::error;
238 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
239 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
251 if (!
cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
253 m = std::move(
msgs_.back());
256 return std::move(m->jv);
266 if (!
cv_.wait_for(lock, timeout, [&] {
267 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
282 return std::move(m->jv);
285 [[nodiscard]]
unsigned
297 if (ec == boost::beast::websocket::error::closed)
314 boost::asio::bind_executor(
Unserialize a JSON document into a Value.
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Holds unparsed configuration information.
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
std::list< std::shared_ptr< Msg > > msgs_
void onReadMsg(error_code const &ec)
boost::beast::multi_buffer rb_
boost::asio::io_context ios_
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
boost::asio::ip::tcp::socket stream_
std::optional< json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
json::Value invoke(std::string const &cmd, json::Value const ¶ms) override
Submit a command synchronously.
std::optional< json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
std::condition_variable cv0_
WSClientImpl(Config const &cfg, bool v2, unsigned rpcVersion, std::unordered_map< std::string, std::string > const &headers={})
std::condition_variable cv_
boost::system::error_code error_code
static std::string bufferString(ConstBuffers const &b)
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
boost::asio::strand< boost::asio::io_context::executor_type > strand_
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpcVersion, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
void parsePort(ParsedPort &port, Section const §ion, std::ostream &log)
std::string to_string(BaseUInt< Bits, Tag > const &a)
XRPL_NO_SANITIZE_ADDRESS void rethrow()
Rethrow the exception currently being handled.
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
std::set< std::string, boost::beast::iless > protocol
std::optional< boost::asio::ip::address > ip
std::optional< std::uint16_t > port
static constexpr auto kServer