3#include <xrpl/basics/safe_cast.h>
4#include <xrpl/beast/utility/instrumentation.h>
5#include <xrpl/beast/utility/rngfill.h>
6#include <xrpl/crypto/csprng.h>
7#include <xrpl/protocol/BuildInfo.h>
8#include <xrpl/server/WSSession.h>
9#include <xrpl/server/detail/BasePeer.h>
10#include <xrpl/server/detail/LowestLayer.h>
12#include <boost/asio/error.hpp>
13#include <boost/beast/core/multi_buffer.hpp>
14#include <boost/beast/http/message.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/logic/tribool.hpp>
24template <
class Handler,
class Impl>
35 friend class BasePeer<Handler, Impl>;
38 boost::beast::multi_buffer
rb_;
39 boost::beast::multi_buffer
wb_;
45 boost::beast::websocket::close_reason
cr_;
54 template <
class Body,
class Headers>
58 boost::asio::executor
const& executor,
61 boost::beast::http::request<Body, Headers>&&
request,
83 boost::asio::ip::tcp::endpoint
const&
96 close(boost::beast::websocket::close_reason
const& reason)
override;
105 return *
static_cast<Impl*
>(
this);
139 on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload);
144 template <
class String>
151template <
class Handler,
class Impl>
152template <
class Body,
class Headers>
156 boost::asio::executor
const& executor,
159 boost::beast::http::request<Body, Headers>&& request,
161 :
BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
162 , request_(
std::move(request))
163 , timer_(
std::move(timer))
164 , payload_(
"12345678")
168template <
class Handler,
class Impl>
172 if (!strand_.running_in_this_thread())
174 impl().ws_.set_option(port().pmd_options);
178 impl().ws_.control_callback(control_callback_);
180 close_on_timer_ =
true;
181 impl().ws_.set_option(boost::beast::websocket::stream_base::decorator(
183 impl().ws_.async_accept(
189template <
class Handler,
class Impl>
193 if (!strand_.running_in_this_thread())
197 if (wq_.size() > port().ws_queue_limit)
199 cr_.code =
safe_cast<
decltype(cr_.code)>(boost::beast::websocket::close_code::policy_error);
200 cr_.reason =
"Policy error: client is too slow.";
201 JLOG(this->j_.
info()) << cr_.reason;
202 wq_.erase(
std::next(wq_.begin()), wq_.end());
206 wq_.emplace_back(std::move(w));
211template <
class Handler,
class Impl>
215 close(boost::beast::websocket::close_reason{});
218template <
class Handler,
class Impl>
222 if (!strand_.running_in_this_thread())
223 return post(strand_, [self = impl().shared_from_this(), reason] { self->close(reason); });
229 impl().ws_.async_close(
230 reason, bind_executor(strand_, [self = impl().shared_from_this()](boost::beast::error_code
const& ec) {
240template <
class Handler,
class Impl>
244 if (!strand_.running_in_this_thread())
249template <
class Handler,
class Impl>
254 return fail(ec,
"on_ws_handshake");
255 close_on_timer_ =
false;
259template <
class Handler,
class Impl>
263 if (!strand_.running_in_this_thread())
268template <
class Handler,
class Impl>
273 return fail(ec,
"write");
274 auto& w = *wq_.front();
276 if (boost::indeterminate(result.first))
280 impl().ws_.async_write_some(
281 static_cast<bool>(result.first),
285 impl().ws_.async_write_some(
286 static_cast<bool>(result.first),
292template <
class Handler,
class Impl>
297 return fail(ec,
"write_fin");
301 impl().ws_.async_close(
305 else if (!wq_.empty())
309template <
class Handler,
class Impl>
313 if (!strand_.running_in_this_thread())
315 impl().ws_.async_read(
319template <
class Handler,
class Impl>
323 if (ec == boost::beast::websocket::error::closed)
326 return fail(ec,
"read");
327 auto const& data = rb_.data();
331 this->handler_.onWSMessage(impl().shared_from_this(), b);
332 rb_.consume(rb_.size());
335template <
class Handler,
class Impl>
342template <
class Handler,
class Impl>
352 timer_.expires_after(remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
354 catch (boost::system::system_error
const& e)
356 return fail(e.code(),
"start_timer");
359 timer_.async_wait(bind_executor(
364template <
class Handler,
class Impl>
372 catch (boost::system::system_error
const&)
378template <
class Handler,
class Impl>
382 if (ec == boost::asio::error::operation_aborted)
384 ping_active_ =
false;
390template <
class Handler,
class Impl>
394 if (kind == boost::beast::websocket::frame_type::pong)
396 boost::beast::string_view p(payload_.begin());
399 close_on_timer_ =
false;
400 JLOG(this->j_.
trace()) <<
"got matching pong";
404 JLOG(this->j_.
trace()) <<
"got pong";
409template <
class Handler,
class Impl>
413 if (ec == boost::asio::error::operation_aborted)
417 if (!close_on_timer_ || !ping_active_)
420 close_on_timer_ =
true;
424 impl().ws_.async_ping(
428 JLOG(this->j_.
trace()) <<
"sent ping";
431 ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
436template <
class Handler,
class Impl>
437template <
class String>
441 XRPL_ASSERT(strand_.running_in_this_thread(),
"xrpl::BaseWSPeer::fail : strand in this thread");
444 if (!ec_ && ec != boost::asio::error::operation_aborted)
447 JLOG(this->j_.
trace()) << what <<
": " << ec.message();
T back_inserter(T... args)
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
boost::asio::strand< boost::asio::executor > strand_
endpoint_type remote_address_
Represents an active WebSocket connection.
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
boost::beast::websocket::close_reason cr_
void on_write_fin(error_code const &ec)
boost::beast::multi_buffer rb_
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
void on_ping(error_code const &ec)
boost::asio::ip::tcp::endpoint endpoint_type
boost::beast::multi_buffer wb_
void fail(error_code ec, String const &what)
void on_close(error_code const &ec)
void close(boost::beast::websocket::close_reason const &reason) override
Port const & port() const override
bool do_close_
The socket has been closed, or will close after the next write finishes.
void on_read(error_code const &ec)
boost::system::error_code error_code
boost::asio::basic_waitable_timer< clock_type > waitable_timer
boost::beast::websocket::ping_data payload_
void on_ws_handshake(error_code const &ec)
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
http_request_type request_
void on_write(error_code const &ec)
std::list< std::shared_ptr< WSMsg > > wq_
void complete() override
Indicate that the response is complete.
http_request_type const & request() const override
void on_timer(error_code ec)
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
std::string const & getFullVersionString()
Full server version string.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
decltype(auto) get_lowest_layer(T &t) noexcept
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safe_cast(Src s) noexcept
Configuration information for a Server listening port.