3#include <xrpl/basics/safe_cast.h>
4#include <xrpl/beast/utility/instrumentation.h>
5#include <xrpl/beast/utility/rngfill.h>
6#include <xrpl/crypto/csprng.h>
7#include <xrpl/protocol/BuildInfo.h>
8#include <xrpl/server/WSSession.h>
9#include <xrpl/server/detail/BasePeer.h>
10#include <xrpl/server/detail/LowestLayer.h>
12#include <boost/asio/error.hpp>
13#include <boost/beast/core/multi_buffer.hpp>
14#include <boost/beast/http/message.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/logic/tribool.hpp>
25template <
class Handler,
class Impl>
36 friend class BasePeer<Handler, Impl>;
39 boost::beast::multi_buffer
rb_;
40 boost::beast::multi_buffer
wb_;
46 boost::beast::websocket::close_reason
cr_;
52 std::function<void(boost::beast::websocket::frame_type, boost::beast::string_view)>
56 template <
class Body,
class Headers>
60 boost::asio::executor
const& executor,
63 boost::beast::http::request<Body, Headers>&&
request,
73 [[nodiscard]]
Port const&
82 return this->request_;
85 [[nodiscard]] boost::asio::ip::tcp::endpoint
const&
98 close(boost::beast::websocket::close_reason
const& reason)
override;
107 return *
static_cast<Impl*
>(
this);
141 onPingPong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload);
146 template <
class String>
153template <
class Handler,
class Impl>
154template <
class Body,
class Headers>
158 boost::asio::executor
const& executor,
161 boost::beast::http::request<Body, Headers>&&
request,
163 :
BasePeer<Handler, Impl>(
port, handler, executor, remoteAddress, journal)
170template <
class Handler,
class Impl>
174 if (!
strand_.running_in_this_thread())
176 impl().ws_.set_option(
port().pmdOptions);
184 impl().ws_.set_option(boost::beast::websocket::stream_base::decorator([](
auto& res) {
187 impl().ws_.async_accept(
195template <
class Handler,
class Impl>
199 if (!
strand_.running_in_this_thread())
203 if (
wq_.size() >
port().wsQueueLimit)
205 cr_.code =
safeCast<
decltype(
cr_.code)>(boost::beast::websocket::close_code::policy_error);
206 cr_.reason =
"Policy error: client is too slow.";
207 JLOG(this->
j_.info()) <<
cr_.reason;
212 wq_.emplace_back(std::move(w));
217template <
class Handler,
class Impl>
221 close(boost::beast::websocket::close_reason{});
224template <
class Handler,
class Impl>
228 if (!
strand_.running_in_this_thread())
229 return post(
strand_, [self =
impl().shared_from_this(), reason] { self->close(reason); });
235 impl().ws_.async_close(
238 strand_, [self =
impl().shared_from_this()](boost::beast::error_code
const& ec) {
248template <
class Handler,
class Impl>
252 if (!
strand_.running_in_this_thread())
257template <
class Handler,
class Impl>
262 return fail(ec,
"on_ws_handshake");
267template <
class Handler,
class Impl>
271 if (!
strand_.running_in_this_thread())
276template <
class Handler,
class Impl>
281 return fail(ec,
"write");
282 auto& w = *
wq_.front();
285 if (boost::indeterminate(result.first))
290 impl().ws_.async_write_some(
291 static_cast<bool>(result.first),
299 impl().ws_.async_write_some(
300 static_cast<bool>(result.first),
309template <
class Handler,
class Impl>
314 return fail(ec,
"write_fin");
318 impl().ws_.async_close(
324 else if (!
wq_.empty())
330template <
class Handler,
class Impl>
334 if (!
strand_.running_in_this_thread())
336 impl().ws_.async_read(
343template <
class Handler,
class Impl>
347 if (ec == boost::beast::websocket::error::closed)
350 return fail(ec,
"read");
351 auto const& data =
rb_.data();
355 this->
handler_.onWSMessage(
impl().shared_from_this(), b);
359template <
class Handler,
class Impl>
366template <
class Handler,
class Impl>
378 catch (boost::system::system_error
const& e)
380 return fail(e.code(),
"start_timer");
383 timer_.async_wait(bind_executor(
387 impl().shared_from_this(),
388 std::placeholders::_1)));
392template <
class Handler,
class Impl>
400 catch (boost::system::system_error
const&)
406template <
class Handler,
class Impl>
410 if (ec == boost::asio::error::operation_aborted)
418template <
class Handler,
class Impl>
421 boost::beast::websocket::frame_type kind,
422 boost::beast::string_view payload)
424 if (kind == boost::beast::websocket::frame_type::pong)
426 boost::beast::string_view
const p(
payload_.begin());
430 JLOG(this->
j_.trace()) <<
"got matching pong";
434 JLOG(this->
j_.trace()) <<
"got pong";
439template <
class Handler,
class Impl>
443 if (ec == boost::asio::error::operation_aborted)
454 impl().ws_.async_ping(
460 JLOG(this->
j_.trace()) <<
"sent ping";
463 ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
468template <
class Handler,
class Impl>
469template <
class String>
473 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::BaseWSPeer::fail : strand in this thread");
476 if (!
ec_ && ec != boost::asio::error::operation_aborted)
479 JLOG(this->
j_.trace()) << what <<
": " << ec.message();
T back_inserter(T... args)
A generic endpoint for log messages.
boost::asio::strand< boost::asio::executor > strand_
endpoint_type remoteAddress_
BasePeer(Port const &port, Handler &handler, boost::asio::executor const &executor, endpoint_type remoteAddress, beast::Journal journal)
void onPingPong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
boost::beast::websocket::close_reason cr_
boost::beast::multi_buffer rb_
void onTimer(error_code ec)
boost::asio::ip::tcp::endpoint endpoint_type
boost::beast::multi_buffer wb_
void fail(error_code ec, String const &what)
void onWriteFin(error_code const &ec)
Port const & port() const override
boost::system::error_code error_code
boost::asio::basic_waitable_timer< clock_type > waitable_timer
void onPing(error_code const &ec)
boost::beast::websocket::ping_data payload_
void onWsHandshake(error_code const &ec)
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remoteAddress, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> controlCallback_
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
void onClose(error_code const &ec)
http_request_type request_
std::chrono::system_clock clock_type
void onRead(error_code const &ec)
bool doClose_
The socket has been closed, or will close after the next write finishes.
boost::asio::ip::tcp::endpoint const & remoteEndpoint() const override
std::list< std::shared_ptr< WSMsg > > wq_
void onWrite(error_code const &ec)
void complete() override
Indicate that the response is complete.
http_request_type const & request() const override
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
std::string const & getFullVersionString()
Full server version string.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safeCast(Src s) noexcept
CsprngEngine & cryptoPrng()
The default cryptographically secure PRNG.
decltype(auto) getLowestLayer(T &t) noexcept
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Configuration information for a Server listening port.