23#include "rpc/common/Types.hpp"
24#include "util/Taggable.hpp"
25#include "util/log/Logger.hpp"
26#include "web/SubscriptionContext.hpp"
27#include "web/SubscriptionContextInterface.hpp"
28#include "web/dosguard/DOSGuardInterface.hpp"
29#include "web/interface/Concepts.hpp"
30#include "web/interface/ConnectionBase.hpp"
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/error.hpp>
34#include <boost/beast/core.hpp>
35#include <boost/beast/core/error.hpp>
36#include <boost/beast/core/flat_buffer.hpp>
37#include <boost/beast/core/role.hpp>
38#include <boost/beast/http/field.hpp>
39#include <boost/beast/http/message.hpp>
40#include <boost/beast/http/status.hpp>
41#include <boost/beast/http/string_body.hpp>
42#include <boost/beast/version.hpp>
43#include <boost/beast/websocket/error.hpp>
44#include <boost/beast/websocket/rfc6455.hpp>
45#include <boost/beast/websocket/stream_base.hpp>
46#include <boost/core/ignore_unused.hpp>
47#include <boost/json/array.hpp>
48#include <boost/json/parse.hpp>
49#include <boost/json/serialize.hpp>
50#include <xrpl/protocol/ErrorCodes.h>
74template <
template <
typename>
typename Derived, SomeServerHandler HandlerType>
78 boost::beast::flat_buffer buffer_;
79 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
80 bool sending_ =
false;
81 std::queue<std::shared_ptr<std::string>> messages_;
82 std::shared_ptr<HandlerType>
const handler_;
85 std::uint32_t maxSendingQueueSize_;
92 wsFail(boost::beast::error_code ec,
char const* what)
95 if (ec != boost::beast::websocket::error::closed)
96 LOG(log_.
error()) <<
tag() <<
": " << what <<
": " << ec.message() <<
": " << ec.value();
98 if (!ec_ && ec != boost::asio::error::operation_aborted) {
100 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
107 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
108 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
109 std::shared_ptr<HandlerType>
const& handler,
110 boost::beast::flat_buffer&& buffer,
111 std::uint32_t maxSendingQueueSize
114 , buffer_(std::move(buffer))
115 , dosGuard_(dosGuard)
117 , maxSendingQueueSize_(maxSendingQueueSize)
121 LOG(perfLog_.
debug()) <<
tag() <<
"session created";
126 LOG(perfLog_.
debug()) <<
tag() <<
"session closed";
127 dosGuard_.get().decrement(clientIp);
130 Derived<HandlerType>&
133 return static_cast<Derived<HandlerType>&
>(*this);
140 derived().ws().async_write(
141 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
142 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
147 onWrite(boost::system::error_code ec, std::size_t)
152 wsFail(ec,
"Failed to write");
161 if (ec_ || sending_ || messages_.empty())
170 sendError(rpc::RippledError::rpcSLOW_DOWN, request);
180 send(std::shared_ptr<std::string> msg)
override
184 derived().ws().get_executor(), [
this, self = derived().shared_from_this(), msg = std::move(msg)]() {
185 if (messages_.size() > maxSendingQueueSize_) {
186 wsFail(boost::asio::error::timed_out,
"Client is too slow");
205 if (subscriptionContext_ ==
nullptr) {
206 subscriptionContext_ = std::make_shared<SubscriptionContext>(factory, shared_from_this());
208 return subscriptionContext_;
218 send(std::string&& msg, http::status)
override
220 if (!dosGuard_.get().add(clientIp, msg.size())) {
221 auto jsonResponse = boost::json::parse(msg).as_object();
222 jsonResponse[
"warning"] =
"load";
224 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
225 jsonResponse[
"warnings"].as_array().push_back(
rpc::makeWarning(rpc::WarnRpcRateLimit));
227 jsonResponse[
"warnings"] = boost::json::array{
rpc::makeWarning(rpc::WarnRpcRateLimit)};
231 msg = boost::json::serialize(jsonResponse);
233 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
234 send(std::move(sharedMsg));
241 run(http::request<http::string_body> req)
243 using namespace boost::beast;
245 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
248 derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
249 res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
252 derived().ws().async_accept(req, bind_front_handler(&WsBase::onAccept, this->shared_from_this()));
256 onAccept(boost::beast::error_code ec)
259 return wsFail(ec,
"accept");
261 LOG(perfLog_.
info()) <<
tag() <<
"accepting new connection";
273 buffer_ = boost::beast::flat_buffer{};
275 derived().ws().async_read(buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this()));
279 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
281 boost::ignore_unused(bytesTransferred);
284 return wsFail(ec,
"read");
286 LOG(perfLog_.
info()) <<
tag() <<
"Received request from ip = " << this->clientIp;
288 std::string requestStr{
static_cast<char const*
>(buffer_.data().
data()), buffer_.size()};
291 (*handler_)(requestStr, shared_from_this());
292 }
catch (std::exception
const&) {
293 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
306 auto request = boost::json::parse(requestStr);
307 if (request.is_object() && request.as_object().contains(
"id"))
308 e[
"id"] = request.as_object().at(
"id");
309 e[
"request"] = std::move(request);
310 }
catch (std::exception
const&) {
311 e[
"request"] = requestStr;
314 this->
send(std::make_shared<std::string>(boost::json::serialize(e)));
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:322
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:307
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:312
A factory for TagDecorator instantiation.
Definition Taggable.hpp:182
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:280
Web socket implementation. This class is the base class of the web socket session,...
Definition WsBase.hpp:75
void sendSlowDown(std::string const &request) override
Send a "slow down" error response to the client.
Definition WsBase.hpp:168
SubscriptionContextPtr makeSubscriptionContext(util::TagDecoratorFactory const &factory) override
Get the subscription context for this connection.
Definition WsBase.hpp:203
void run(http::request< http::string_body > req)
Accept the session asynchronously.
Definition WsBase.hpp:241
void send(std::shared_ptr< std::string > msg) override
Send a message to the client.
Definition WsBase.hpp:180
void send(std::string &&msg, http::status) override
Send a message to the client.
Definition WsBase.hpp:218
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:120
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
ripple::error_code_i RippledError
Clio uses compatible Rippled error codes for most RPC errors.
Definition Errors.hpp:72
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86
Base class for all connections.
Definition ConnectionBase.hpp:44
ConnectionBase(util::TagDecoratorFactory const &tagFactory, std::string ip)
Create a new connection base.
Definition ConnectionBase.hpp:59
bool dead()
Indicates whether the connection had an error and is considered dead.
Definition ConnectionBase.hpp:107