23#include "rpc/common/Types.hpp"
24#include "util/Taggable.hpp"
25#include "util/log/Logger.hpp"
26#include "web/SubscriptionContext.hpp"
27#include "web/SubscriptionContextInterface.hpp"
28#include "web/dosguard/DOSGuardInterface.hpp"
29#include "web/interface/Concepts.hpp"
30#include "web/interface/ConnectionBase.hpp"
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/error.hpp>
34#include <boost/beast/core.hpp>
35#include <boost/beast/core/error.hpp>
36#include <boost/beast/core/flat_buffer.hpp>
37#include <boost/beast/core/role.hpp>
38#include <boost/beast/http/field.hpp>
39#include <boost/beast/http/message.hpp>
40#include <boost/beast/http/status.hpp>
41#include <boost/beast/http/string_body.hpp>
42#include <boost/beast/version.hpp>
43#include <boost/beast/websocket/error.hpp>
44#include <boost/beast/websocket/rfc6455.hpp>
45#include <boost/beast/websocket/stream_base.hpp>
46#include <boost/core/ignore_unused.hpp>
47#include <boost/json/array.hpp>
48#include <boost/json/parse.hpp>
49#include <boost/json/serialize.hpp>
50#include <xrpl/protocol/ErrorCodes.h>
74template <
template <
typename>
typename Derived, SomeServerHandler HandlerType>
78 boost::beast::flat_buffer buffer_;
79 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
80 bool sending_ =
false;
81 std::queue<std::shared_ptr<std::string>> messages_;
82 std::shared_ptr<HandlerType>
const handler_;
85 std::uint32_t maxSendingQueueSize_;
92 wsFail(boost::beast::error_code ec,
char const* what)
95 if (ec != boost::beast::websocket::error::closed)
96 LOG(log_.
error()) <<
tag() <<
": " << what <<
": " << ec.message() <<
": " << ec.value();
98 if (!ec_ && ec != boost::asio::error::operation_aborted) {
100 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
107 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
108 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
109 std::shared_ptr<HandlerType>
const& handler,
110 boost::beast::flat_buffer&& buffer,
111 std::uint32_t maxSendingQueueSize
114 , buffer_(std::move(buffer))
115 , dosGuard_(dosGuard)
117 , maxSendingQueueSize_(maxSendingQueueSize)
121 LOG(perfLog_.
debug()) <<
tag() <<
"session created";
126 LOG(perfLog_.
debug()) <<
tag() <<
"session closed";
127 dosGuard_.get().decrement(clientIp);
130 Derived<HandlerType>&
133 return static_cast<Derived<HandlerType>&
>(*this);
140 derived().ws().async_write(
141 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
142 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
147 onWrite(boost::system::error_code ec, std::size_t)
152 wsFail(ec,
"Failed to write");
161 if (ec_ || sending_ || messages_.empty())
174 send(std::shared_ptr<std::string> msg)
override
176 boost::asio::dispatch(
177 derived().ws().get_executor(),
178 [
this, self = derived().shared_from_this(), msg = std::move(msg)]() {
179 if (messages_.size() > maxSendingQueueSize_) {
180 wsFail(boost::asio::error::timed_out,
"Client is too slow");
199 if (subscriptionContext_ ==
nullptr) {
200 subscriptionContext_ = std::make_shared<SubscriptionContext>(factory, shared_from_this());
202 return subscriptionContext_;
212 send(std::string&& msg, http::status)
override
214 if (!dosGuard_.get().add(clientIp, msg.size())) {
215 auto jsonResponse = boost::json::parse(msg).as_object();
216 jsonResponse[
"warning"] =
"load";
218 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
219 jsonResponse[
"warnings"].as_array().push_back(
rpc::makeWarning(rpc::WarnRpcRateLimit));
221 jsonResponse[
"warnings"] = boost::json::array{
rpc::makeWarning(rpc::WarnRpcRateLimit)};
225 msg = boost::json::serialize(jsonResponse);
227 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
228 send(std::move(sharedMsg));
235 run(http::request<http::string_body> req)
237 using namespace boost::beast;
239 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
242 derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
243 res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
246 derived().ws().async_accept(req, bind_front_handler(&WsBase::onAccept, this->shared_from_this()));
250 onAccept(boost::beast::error_code ec)
253 return wsFail(ec,
"accept");
255 LOG(perfLog_.
info()) <<
tag() <<
"accepting new connection";
267 buffer_ = boost::beast::flat_buffer{};
269 derived().ws().async_read(buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this()));
273 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
275 boost::ignore_unused(bytesTransferred);
278 return wsFail(ec,
"read");
280 LOG(perfLog_.
info()) <<
tag() <<
"Received request from ip = " << this->clientIp;
282 auto sendError = [
this](
auto error, std::string&& requestStr) {
286 auto request = boost::json::parse(requestStr);
287 if (request.is_object() && request.as_object().contains(
"id"))
288 e[
"id"] = request.as_object().at(
"id");
289 e[
"request"] = std::move(request);
290 }
catch (std::exception
const&) {
291 e[
"request"] = std::move(requestStr);
294 this->
send(std::make_shared<std::string>(boost::json::serialize(e)));
297 std::string requestStr{
static_cast<char const*
>(buffer_.data().
data()), buffer_.size()};
300 if (!dosGuard_.get().request(clientIp)) {
302 sendError(rpc::RippledError::rpcSLOW_DOWN, std::move(requestStr));
305 (*handler_)(requestStr, shared_from_this());
306 }
catch (std::exception
const&) {
307 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:215
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
A factory for TagDecorator instantiation.
Definition Taggable.hpp:169
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:267
Web socket implementation. This class is the base class of the web socket session,...
Definition WsBase.hpp:75
SubscriptionContextPtr makeSubscriptionContext(util::TagDecoratorFactory const &factory) override
Get the subscription context for this connection.
Definition WsBase.hpp:197
void run(http::request< http::string_body > req)
Accept the session asynchroniously.
Definition WsBase.hpp:235
void send(std::shared_ptr< std::string > msg) override
Send a message to the client.
Definition WsBase.hpp:174
void send(std::string &&msg, http::status) override
Send a message to the client.
Definition WsBase.hpp:212
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:120
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86
Base class for all connections.
Definition ConnectionBase.hpp:44
ConnectionBase(util::TagDecoratorFactory const &tagFactory, std::string ip)
Create a new connection base.
Definition ConnectionBase.hpp:59
bool dead()
Indicates whether the connection had an error and is considered dead.
Definition ConnectionBase.hpp:100