1#ifndef XRPL_SERVER_BASEWSPEER_H_INCLUDED 
    2#define XRPL_SERVER_BASEWSPEER_H_INCLUDED 
    4#include <xrpl/basics/safe_cast.h> 
    5#include <xrpl/beast/utility/instrumentation.h> 
    6#include <xrpl/beast/utility/rngfill.h> 
    7#include <xrpl/crypto/csprng.h> 
    8#include <xrpl/protocol/BuildInfo.h> 
    9#include <xrpl/server/WSSession.h> 
   10#include <xrpl/server/detail/BasePeer.h> 
   11#include <xrpl/server/detail/LowestLayer.h> 
   13#include <boost/asio/error.hpp> 
   14#include <boost/beast/core/multi_buffer.hpp> 
   15#include <boost/beast/http/message.hpp> 
   16#include <boost/beast/websocket.hpp> 
   17#include <boost/logic/tribool.hpp> 
   25template <
class Handler, 
class Impl>
 
   36    friend class BasePeer<Handler, Impl>;
 
   39    boost::beast::multi_buffer 
rb_;
 
   40    boost::beast::multi_buffer 
wb_;
 
   46    boost::beast::websocket::close_reason 
cr_;
 
   53        void(boost::beast::websocket::frame_type, boost::beast::string_view)>
 
   57    template <
class Body, 
class Headers>
 
   61        boost::asio::executor 
const& executor,
 
   64        boost::beast::http::request<Body, Headers>&& 
request,
 
   86    boost::asio::ip::tcp::endpoint 
const&
 
   99    close(boost::beast::websocket::close_reason 
const& reason) 
override;
 
  108        return *
static_cast<Impl*
>(
this);
 
 
  143        boost::beast::websocket::frame_type kind,
 
  144        boost::beast::string_view payload);
 
  149    template <
class String>
 
  156template <
class Handler, 
class Impl>
 
  157template <
class Body, 
class Headers>
 
  161    boost::asio::executor 
const& executor,
 
  164    boost::beast::http::request<Body, Headers>&& request,
 
  166    : 
BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
 
  167    , request_(
std::move(request))
 
  168    , timer_(
std::move(timer))
 
  169    , payload_(
"12345678")  
 
 
  173template <
class Handler, 
class Impl>
 
  177    if (!strand_.running_in_this_thread())
 
  180    impl().ws_.set_option(port().pmd_options);
 
  186        std::placeholders::_1,
 
  187        std::placeholders::_2);
 
  188    impl().ws_.control_callback(control_callback_);
 
  190    close_on_timer_ = 
true;
 
  191    impl().ws_.set_option(
 
  192        boost::beast::websocket::stream_base::decorator([](
auto& res) {
 
  194                boost::beast::http::field::server,
 
  197    impl().ws_.async_accept(
 
  203                impl().shared_from_this(),
 
  204                std::placeholders::_1)));
 
 
  207template <
class Handler, 
class Impl>
 
  211    if (!strand_.running_in_this_thread())
 
  218    if (wq_.size() > port().ws_queue_limit)
 
  220        cr_.code = 
safe_cast<
decltype(cr_.code)>(
 
  221            boost::beast::websocket::close_code::policy_error);
 
  222        cr_.reason = 
"Policy error: client is too slow.";
 
  223        JLOG(this->j_.
info()) << cr_.reason;
 
  224        wq_.erase(
std::next(wq_.begin()), wq_.end());
 
  228    wq_.emplace_back(std::move(w));
 
 
  233template <
class Handler, 
class Impl>
 
  237    close(boost::beast::websocket::close_reason{});
 
 
  240template <
class Handler, 
class Impl>
 
  243    boost::beast::websocket::close_reason 
const& reason)
 
  245    if (!strand_.running_in_this_thread())
 
  246        return post(strand_, [self = impl().shared_from_this(), reason] {
 
  254        impl().ws_.async_close(
 
  258                [self = impl().shared_from_this()](
 
  259                    boost::beast::error_code 
const& ec) {
 
 
  269template <
class Handler, 
class Impl>
 
  273    if (!strand_.running_in_this_thread())
 
 
  280template <
class Handler, 
class Impl>
 
  285        return fail(ec, 
"on_ws_handshake");
 
  286    close_on_timer_ = 
false;
 
 
  290template <
class Handler, 
class Impl>
 
  294    if (!strand_.running_in_this_thread())
 
 
  301template <
class Handler, 
class Impl>
 
  306        return fail(ec, 
"write");
 
  307    auto& w = *wq_.front();
 
  308    auto const result = w.prepare(
 
  310    if (boost::indeterminate(result.first))
 
  314        impl().ws_.async_write_some(
 
  315            static_cast<bool>(result.first),
 
  321                    impl().shared_from_this(),
 
  322                    std::placeholders::_1)));
 
  324        impl().ws_.async_write_some(
 
  325            static_cast<bool>(result.first),
 
  331                    impl().shared_from_this(),
 
  332                    std::placeholders::_1)));
 
 
  335template <
class Handler, 
class Impl>
 
  340        return fail(ec, 
"write_fin");
 
  344        impl().ws_.async_close(
 
  350                    impl().shared_from_this(),
 
  351                    std::placeholders::_1)));
 
  353    else if (!wq_.empty())
 
 
  357template <
class Handler, 
class Impl>
 
  361    if (!strand_.running_in_this_thread())
 
  365    impl().ws_.async_read(
 
  371                impl().shared_from_this(),
 
  372                std::placeholders::_1)));
 
 
  375template <
class Handler, 
class Impl>
 
  379    if (ec == boost::beast::websocket::error::closed)
 
  382        return fail(ec, 
"read");
 
  383    auto const& data = rb_.data();
 
  387    this->handler_.onWSMessage(impl().shared_from_this(), b);
 
  388    rb_.consume(rb_.size());
 
 
  391template <
class Handler, 
class Impl>
 
  398template <
class Handler, 
class Impl>
 
  408        timer_.expires_after(
 
  409            remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
 
  411    catch (boost::system::system_error 
const& e)
 
  413        return fail(e.code(), 
"start_timer");
 
  416    timer_.async_wait(bind_executor(
 
  420            impl().shared_from_this(),
 
  421            std::placeholders::_1)));
 
 
  425template <
class Handler, 
class Impl>
 
  433    catch (boost::system::system_error 
const&)
 
 
  439template <
class Handler, 
class Impl>
 
  443    if (ec == boost::asio::error::operation_aborted)
 
  445    ping_active_ = 
false;
 
 
  451template <
class Handler, 
class Impl>
 
  454    boost::beast::websocket::frame_type kind,
 
  455    boost::beast::string_view payload)
 
  457    if (kind == boost::beast::websocket::frame_type::pong)
 
  459        boost::beast::string_view p(payload_.begin());
 
  462            close_on_timer_ = 
false;
 
  463            JLOG(this->j_.
trace()) << 
"got matching pong";
 
  467            JLOG(this->j_.
trace()) << 
"got pong";
 
 
  472template <
class Handler, 
class Impl>
 
  476    if (ec == boost::asio::error::operation_aborted)
 
  480        if (!close_on_timer_ || !ping_active_)
 
  483            close_on_timer_ = 
true;
 
  487            impl().ws_.async_ping(
 
  493                        impl().shared_from_this(),
 
  494                        std::placeholders::_1)));
 
  495            JLOG(this->j_.
trace()) << 
"sent ping";
 
  498        ec = boost::system::errc::make_error_code(
 
  499            boost::system::errc::timed_out);
 
 
  504template <
class Handler, 
class Impl>
 
  505template <
class String>
 
  510        strand_.running_in_this_thread(),
 
  511        "ripple::BaseWSPeer::fail : strand in this thread");
 
  514    if (!ec_ && ec != boost::asio::error::operation_aborted)
 
  517        JLOG(this->j_.
trace()) << what << 
": " << ec.message();
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
T back_inserter(T... args)
 
A generic endpoint for log messages.
 
Stream trace() const
Severity stream access functions.
 
endpoint_type remote_address_
 
boost::asio::strand< boost::asio::executor > strand_
 
Represents an active WebSocket connection.
 
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
 
boost::asio::ip::tcp::endpoint endpoint_type
 
boost::system::error_code error_code
 
http_request_type const & request() const override
 
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
 
void fail(error_code ec, String const &what)
 
bool do_close_
The socket has been closed, or will close after the next write finishes.
 
void close(boost::beast::websocket::close_reason const &reason) override
 
http_request_type request_
 
void on_read(error_code const &ec)
 
void on_ws_handshake(error_code const &ec)
 
boost::beast::multi_buffer wb_
 
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
 
boost::asio::basic_waitable_timer< clock_type > waitable_timer
 
void on_close(error_code const &ec)
 
void on_write_fin(error_code const &ec)
 
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
 
Port const & port() const override
 
void complete() override
Indicate that the response is complete.
 
boost::beast::websocket::close_reason cr_
 
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
 
std::list< std::shared_ptr< WSMsg > > wq_
 
void on_timer(error_code ec)
 
void on_ping(error_code const &ec)
 
boost::beast::multi_buffer rb_
 
void on_write(error_code const &ec)
 
boost::beast::websocket::ping_data payload_
 
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
 
std::string const & getFullVersionString()
Full server version string.
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safe_cast(Src s) noexcept
 
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
 
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
 
decltype(auto) get_lowest_layer(T &t) noexcept
 
Configuration information for a Server listening port.