23#include "rpc/common/Types.hpp"
24#include "util/Taggable.hpp"
25#include "util/log/Logger.hpp"
26#include "web/SubscriptionContext.hpp"
27#include "web/SubscriptionContextInterface.hpp"
28#include "web/dosguard/DOSGuardInterface.hpp"
29#include "web/interface/Concepts.hpp"
30#include "web/interface/ConnectionBase.hpp"
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/error.hpp>
34#include <boost/beast/core.hpp>
35#include <boost/beast/core/error.hpp>
36#include <boost/beast/core/flat_buffer.hpp>
37#include <boost/beast/core/role.hpp>
38#include <boost/beast/http/field.hpp>
39#include <boost/beast/http/message.hpp>
40#include <boost/beast/http/status.hpp>
41#include <boost/beast/http/string_body.hpp>
42#include <boost/beast/version.hpp>
43#include <boost/beast/websocket/error.hpp>
44#include <boost/beast/websocket/rfc6455.hpp>
45#include <boost/beast/websocket/stream_base.hpp>
46#include <boost/core/ignore_unused.hpp>
47#include <boost/json/array.hpp>
48#include <boost/json/parse.hpp>
49#include <boost/json/serialize.hpp>
50#include <xrpl/protocol/ErrorCodes.h>
74template <
template <
typename>
typename Derived, SomeServerHandler HandlerType>
78 boost::beast::flat_buffer buffer_;
79 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
80 bool sending_ =
false;
81 std::queue<std::shared_ptr<std::string>> messages_;
82 std::shared_ptr<HandlerType>
const handler_;
85 std::uint32_t maxSendingQueueSize_;
92 wsFail(boost::beast::error_code ec,
char const* what)
95 if (ec != boost::beast::websocket::error::closed)
96 LOG(log_.
error()) <<
tag() <<
": " << what <<
": " << ec.message() <<
": " << ec.value();
98 if (!ec_ && ec != boost::asio::error::operation_aborted) {
100 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
107 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
108 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
109 std::shared_ptr<HandlerType>
const& handler,
110 boost::beast::flat_buffer&& buffer,
111 std::uint32_t maxSendingQueueSize
114 , buffer_(std::move(buffer))
115 , dosGuard_(dosGuard)
117 , maxSendingQueueSize_(maxSendingQueueSize)
121 LOG(perfLog_.
debug()) <<
tag() <<
"session created";
126 LOG(perfLog_.
debug()) <<
tag() <<
"session closed";
127 dosGuard_.get().decrement(clientIp);
130 Derived<HandlerType>&
133 return static_cast<Derived<HandlerType>&
>(*this);
140 derived().ws().async_write(
141 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
142 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
147 onWrite(boost::system::error_code ec, std::size_t)
152 wsFail(ec,
"Failed to write");
161 if (ec_ || sending_ || messages_.empty())
170 sendError(rpc::RippledError::rpcSLOW_DOWN, request);
180 send(std::shared_ptr<std::string> msg)
override
184 derived().ws().get_executor(),
185 [
this, self = derived().shared_from_this(), msg = std::move(msg)]() {
186 if (messages_.size() > maxSendingQueueSize_) {
187 wsFail(boost::asio::error::timed_out,
"Client is too slow");
206 if (subscriptionContext_ ==
nullptr) {
207 subscriptionContext_ = std::make_shared<SubscriptionContext>(factory, shared_from_this());
209 return subscriptionContext_;
219 send(std::string&& msg, http::status)
override
221 if (!dosGuard_.get().add(clientIp, msg.size())) {
222 auto jsonResponse = boost::json::parse(msg).as_object();
223 jsonResponse[
"warning"] =
"load";
225 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
226 jsonResponse[
"warnings"].as_array().push_back(
rpc::makeWarning(rpc::WarnRpcRateLimit));
228 jsonResponse[
"warnings"] = boost::json::array{
rpc::makeWarning(rpc::WarnRpcRateLimit)};
232 msg = boost::json::serialize(jsonResponse);
234 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
235 send(std::move(sharedMsg));
242 run(http::request<http::string_body> req)
244 using namespace boost::beast;
246 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
249 derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
250 res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
253 derived().ws().async_accept(req, bind_front_handler(&WsBase::onAccept, this->shared_from_this()));
257 onAccept(boost::beast::error_code ec)
260 return wsFail(ec,
"accept");
262 LOG(perfLog_.
info()) <<
tag() <<
"accepting new connection";
274 buffer_ = boost::beast::flat_buffer{};
276 derived().ws().async_read(buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this()));
280 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
282 boost::ignore_unused(bytesTransferred);
285 return wsFail(ec,
"read");
287 LOG(perfLog_.
info()) <<
tag() <<
"Received request from ip = " << this->clientIp;
289 std::string requestStr{
static_cast<char const*
>(buffer_.data().
data()), buffer_.size()};
292 (*handler_)(requestStr, shared_from_this());
293 }
catch (std::exception
const&) {
294 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
307 auto request = boost::json::parse(requestStr);
308 if (request.is_object() && request.as_object().contains(
"id"))
309 e[
"id"] = request.as_object().at(
"id");
310 e[
"request"] = std::move(request);
311 }
catch (std::exception
const&) {
312 e[
"request"] = requestStr;
315 this->
send(std::make_shared<std::string>(boost::json::serialize(e)));
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A factory for TagDecorator instantiation.
Definition Taggable.hpp:182
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:280
Web socket implementation. This class is the base class of the web socket session,...
Definition WsBase.hpp:75
void sendSlowDown(std::string const &request) override
Send a "slow down" error response to the client.
Definition WsBase.hpp:168
SubscriptionContextPtr makeSubscriptionContext(util::TagDecoratorFactory const &factory) override
Get the subscription context for this connection.
Definition WsBase.hpp:204
void run(http::request< http::string_body > req)
Accept the session asynchronously.
Definition WsBase.hpp:242
void send(std::shared_ptr< std::string > msg) override
Send a message to the client.
Definition WsBase.hpp:180
void send(std::string &&msg, http::status) override
Send a message to the client.
Definition WsBase.hpp:219
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:120
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
ripple::error_code_i RippledError
Clio uses compatible Rippled error codes for most RPC errors.
Definition Errors.hpp:71
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86
Base class for all connections.
Definition ConnectionBase.hpp:44
ConnectionBase(util::TagDecoratorFactory const &tagFactory, std::string ip)
Create a new connection base.
Definition ConnectionBase.hpp:59
bool dead()
Indicates whether the connection had an error and is considered dead.
Definition ConnectionBase.hpp:107