3#include <xrpl/basics/safe_cast.h>
4#include <xrpl/beast/utility/instrumentation.h>
5#include <xrpl/beast/utility/rngfill.h>
6#include <xrpl/crypto/csprng.h>
7#include <xrpl/protocol/BuildInfo.h>
8#include <xrpl/server/WSSession.h>
9#include <xrpl/server/detail/BasePeer.h>
10#include <xrpl/server/detail/LowestLayer.h>
12#include <boost/asio/error.hpp>
13#include <boost/beast/core/multi_buffer.hpp>
14#include <boost/beast/http/message.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/logic/tribool.hpp>
24template <
class Handler,
class Impl>
35 friend class BasePeer<Handler, Impl>;
38 boost::beast::multi_buffer
rb_;
39 boost::beast::multi_buffer
wb_;
45 boost::beast::websocket::close_reason
cr_;
51 std::function<void(boost::beast::websocket::frame_type, boost::beast::string_view)>
55 template <
class Body,
class Headers>
59 boost::asio::executor
const& executor,
62 boost::beast::http::request<Body, Headers>&&
request,
84 boost::asio::ip::tcp::endpoint
const&
97 close(boost::beast::websocket::close_reason
const& reason)
override;
106 return *
static_cast<Impl*
>(
this);
140 on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload);
145 template <
class String>
152template <
class Handler,
class Impl>
153template <
class Body,
class Headers>
157 boost::asio::executor
const& executor,
160 boost::beast::http::request<Body, Headers>&& request,
162 :
BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
163 , request_(
std::move(request))
164 , timer_(
std::move(timer))
165 , payload_(
"12345678")
169template <
class Handler,
class Impl>
173 if (!strand_.running_in_this_thread())
175 impl().ws_.set_option(port().pmd_options);
180 impl().ws_.control_callback(control_callback_);
182 close_on_timer_ =
true;
183 impl().ws_.set_option(boost::beast::websocket::stream_base::decorator([](
auto& res) {
186 impl().ws_.async_accept(
194template <
class Handler,
class Impl>
198 if (!strand_.running_in_this_thread())
202 if (wq_.size() > port().ws_queue_limit)
204 cr_.code =
safe_cast<
decltype(cr_.code)>(boost::beast::websocket::close_code::policy_error);
205 cr_.reason =
"Policy error: client is too slow.";
206 JLOG(this->j_.
info()) << cr_.reason;
207 wq_.erase(
std::next(wq_.begin()), wq_.end());
211 wq_.emplace_back(std::move(w));
216template <
class Handler,
class Impl>
220 close(boost::beast::websocket::close_reason{});
223template <
class Handler,
class Impl>
227 if (!strand_.running_in_this_thread())
228 return post(strand_, [self = impl().shared_from_this(), reason] { self->close(reason); });
234 impl().ws_.async_close(
237 strand_, [self = impl().shared_from_this()](boost::beast::error_code
const& ec) {
247template <
class Handler,
class Impl>
251 if (!strand_.running_in_this_thread())
256template <
class Handler,
class Impl>
261 return fail(ec,
"on_ws_handshake");
262 close_on_timer_ =
false;
266template <
class Handler,
class Impl>
270 if (!strand_.running_in_this_thread())
275template <
class Handler,
class Impl>
280 return fail(ec,
"write");
281 auto& w = *wq_.front();
284 if (boost::indeterminate(result.first))
288 impl().ws_.async_write_some(
289 static_cast<bool>(result.first),
296 impl().ws_.async_write_some(
297 static_cast<bool>(result.first),
305template <
class Handler,
class Impl>
310 return fail(ec,
"write_fin");
314 impl().ws_.async_close(
321 else if (!wq_.empty())
325template <
class Handler,
class Impl>
329 if (!strand_.running_in_this_thread())
331 impl().ws_.async_read(
338template <
class Handler,
class Impl>
342 if (ec == boost::beast::websocket::error::closed)
345 return fail(ec,
"read");
346 auto const& data = rb_.data();
350 this->handler_.onWSMessage(impl().shared_from_this(), b);
351 rb_.consume(rb_.size());
354template <
class Handler,
class Impl>
361template <
class Handler,
class Impl>
371 timer_.expires_after(remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
373 catch (boost::system::system_error
const& e)
375 return fail(e.code(),
"start_timer");
378 timer_.async_wait(bind_executor(
382 impl().shared_from_this(),
383 std::placeholders::_1)));
387template <
class Handler,
class Impl>
395 catch (boost::system::system_error
const&)
401template <
class Handler,
class Impl>
405 if (ec == boost::asio::error::operation_aborted)
407 ping_active_ =
false;
413template <
class Handler,
class Impl>
416 boost::beast::websocket::frame_type kind,
417 boost::beast::string_view payload)
419 if (kind == boost::beast::websocket::frame_type::pong)
421 boost::beast::string_view
const p(payload_.begin());
424 close_on_timer_ =
false;
425 JLOG(this->j_.
trace()) <<
"got matching pong";
429 JLOG(this->j_.
trace()) <<
"got pong";
434template <
class Handler,
class Impl>
438 if (ec == boost::asio::error::operation_aborted)
442 if (!close_on_timer_ || !ping_active_)
445 close_on_timer_ =
true;
449 impl().ws_.async_ping(
455 JLOG(this->j_.
trace()) <<
"sent ping";
458 ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
463template <
class Handler,
class Impl>
464template <
class String>
468 XRPL_ASSERT(strand_.running_in_this_thread(),
"xrpl::BaseWSPeer::fail : strand in this thread");
471 if (!ec_ && ec != boost::asio::error::operation_aborted)
474 JLOG(this->j_.
trace()) << what <<
": " << ec.message();
T back_inserter(T... args)
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
boost::asio::strand< boost::asio::executor > strand_
endpoint_type remote_address_
Represents an active WebSocket connection.
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
boost::beast::websocket::close_reason cr_
void on_write_fin(error_code const &ec)
boost::beast::multi_buffer rb_
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
void on_ping(error_code const &ec)
boost::asio::ip::tcp::endpoint endpoint_type
boost::beast::multi_buffer wb_
void fail(error_code ec, String const &what)
void on_close(error_code const &ec)
void close(boost::beast::websocket::close_reason const &reason) override
Port const & port() const override
bool do_close_
The socket has been closed, or will close after the next write finishes.
void on_read(error_code const &ec)
boost::system::error_code error_code
boost::asio::basic_waitable_timer< clock_type > waitable_timer
boost::beast::websocket::ping_data payload_
void on_ws_handshake(error_code const &ec)
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
http_request_type request_
void on_write(error_code const &ec)
std::list< std::shared_ptr< WSMsg > > wq_
void complete() override
Indicate that the response is complete.
http_request_type const & request() const override
void on_timer(error_code ec)
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
std::string const & getFullVersionString()
Full server version string.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
decltype(auto) get_lowest_layer(T &t) noexcept
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safe_cast(Src s) noexcept
Configuration information for a Server listening port.