75class WsBase :
public ConnectionBase,
public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
76 using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
78 boost::beast::flat_buffer buffer_;
79 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
80 bool sending_ =
false;
81 std::queue<std::shared_ptr<std::string>> messages_;
82 std::shared_ptr<HandlerType>
const handler_;
85 std::uint32_t maxSendingQueueSize_;
92 wsFail(boost::beast::error_code ec,
char const* what)
95 if (ec != boost::beast::websocket::error::closed)
96 LOG(log_.error()) <<
tag() <<
": " << what <<
": " << ec.message() <<
": " << ec.value();
98 if (!ec_ && ec != boost::asio::error::operation_aborted) {
100 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
107 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
108 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
109 std::shared_ptr<HandlerType>
const& handler,
110 boost::beast::flat_buffer&& buffer,
111 std::uint32_t maxSendingQueueSize
114 , buffer_(std::move(buffer))
115 , dosGuard_(dosGuard)
117 , maxSendingQueueSize_(maxSendingQueueSize)
121 LOG(perfLog_.debug()) <<
tag() <<
"session created";
126 LOG(perfLog_.debug()) <<
tag() <<
"session closed";
127 dosGuard_.get().decrement(clientIp_);
130 Derived<HandlerType>&
133 return static_cast<Derived<HandlerType>&
>(*this);
140 derived().ws().async_write(
141 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
142 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
147 onWrite(boost::system::error_code ec, std::size_t)
152 wsFail(ec,
"Failed to write");
161 if (ec_ || sending_ || messages_.empty())
170 sendError(rpc::RippledError::rpcSLOW_DOWN, request);
180 send(std::shared_ptr<std::string> msg)
override
184 derived().ws().get_executor(), [
this, self = derived().shared_from_this(), msg = std::move(msg)]() {
185 if (messages_.size() > maxSendingQueueSize_) {
186 wsFail(boost::asio::error::timed_out,
"Client is too slow");
205 if (subscriptionContext_ ==
nullptr) {
206 subscriptionContext_ = std::make_shared<SubscriptionContext>(factory, shared_from_this());
208 return subscriptionContext_;
218 send(std::string&& msg, http::status)
override
220 if (!dosGuard_.get().add(clientIp_, msg.size())) {
221 auto jsonResponse = boost::json::parse(msg).as_object();
222 jsonResponse[
"warning"] =
"load";
224 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
225 jsonResponse[
"warnings"].as_array().push_back(
rpc::makeWarning(rpc::WarnRpcRateLimit));
227 jsonResponse[
"warnings"] = boost::json::array{
rpc::makeWarning(rpc::WarnRpcRateLimit)};
231 msg = boost::json::serialize(jsonResponse);
233 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
234 send(std::move(sharedMsg));
241 run(http::request<http::string_body> req)
243 using namespace boost::beast;
245 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
248 derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
249 res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
252 derived().ws().async_accept(req, bind_front_handler(&WsBase::onAccept, this->shared_from_this()));
256 onAccept(boost::beast::error_code ec)
259 return wsFail(ec,
"accept");
261 LOG(perfLog_.
info()) <<
tag() <<
"accepting new connection";
273 buffer_ = boost::beast::flat_buffer{};
275 derived().ws().async_read(buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this()));
279 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
281 boost::ignore_unused(bytesTransferred);
284 return wsFail(ec,
"read");
286 LOG(perfLog_.info()) <<
tag() <<
"Received request from ip = " << clientIp_;
288 std::string requestStr{
static_cast<char const*
>(buffer_.data().data()), buffer_.size()};
291 (*handler_)(requestStr, shared_from_this());
292 }
catch (std::exception
const&) {
293 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
306 auto request = boost::json::parse(requestStr);
307 if (request.is_object() && request.as_object().contains(
"id"))
308 e[
"id"] = request.as_object().at(
"id");
309 e[
"request"] = std::move(request);
310 }
catch (std::exception
const&) {
311 e[
"request"] = requestStr;
314 this->
send(std::make_shared<std::string>(boost::json::serialize(e)));
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:157