57 public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
58 using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
60 boost::beast::flat_buffer buffer_;
61 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
62 bool sending_ =
false;
63 std::queue<std::shared_ptr<std::string>> messages_;
64 std::shared_ptr<HandlerType>
const handler_;
67 std::uint32_t maxSendingQueueSize_;
74 wsFail(boost::beast::error_code ec,
char const* what)
77 if (ec != boost::beast::websocket::error::closed) {
78 LOG(log_.error()) <<
tag() <<
": " << what <<
": " << ec.message() <<
": "
82 if (!ec_ && ec != boost::asio::error::operation_aborted) {
84 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
91 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
92 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
93 std::shared_ptr<HandlerType>
const& handler,
94 boost::beast::flat_buffer&& buffer,
95 std::uint32_t maxSendingQueueSize
98 , buffer_(std::move(buffer))
101 , maxSendingQueueSize_(maxSendingQueueSize)
105 LOG(perfLog_.debug()) <<
tag() <<
"session created";
110 LOG(perfLog_.debug()) <<
tag() <<
"session closed";
111 dosGuard_.get().decrement(clientIp_);
114 Derived<HandlerType>&
117 return static_cast<Derived<HandlerType>&
>(*this);
124 derived().ws().async_write(
125 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
126 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
131 onWrite(boost::system::error_code ec, std::size_t)
136 wsFail(ec,
"Failed to write");
145 if (ec_ || sending_ || messages_.empty())
154 sendError(rpc::RippledError::rpcSLOW_DOWN, request);
164 send(std::shared_ptr<std::string> msg)
override
169 derived().ws().get_executor(),
170 [
this, self = derived().shared_from_this(), msg = std::move(msg)]() {
171 if (messages_.size() > maxSendingQueueSize_) {
172 wsFail(boost::asio::error::timed_out,
"Client is too slow");
191 if (subscriptionContext_ ==
nullptr) {
192 subscriptionContext_ =
193 std::make_shared<SubscriptionContext>(factory, shared_from_this());
195 return subscriptionContext_;
205 send(std::string&& msg, http::status)
override
207 if (!dosGuard_.get().add(clientIp_, msg.size())) {
208 auto jsonResponse = boost::json::parse(msg).as_object();
209 jsonResponse[
"warning"] =
"load";
211 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
212 jsonResponse[
"warnings"].as_array().push_back(
216 jsonResponse[
"warnings"] =
221 msg = boost::json::serialize(jsonResponse);
223 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
224 send(std::move(sharedMsg));
231 run(http::request<http::string_body> req)
233 using namespace boost::beast;
235 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
238 derived().ws().set_option(
239 websocket::stream_base::decorator([](websocket::response_type& res) {
242 std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async"
247 derived().ws().async_accept(
248 req, bind_front_handler(&WsBase::onAccept, this->shared_from_this())
253 onAccept(boost::beast::error_code ec)
256 return wsFail(ec,
"accept");
258 LOG(perfLog_.
info()) <<
tag() <<
"accepting new connection";
271 buffer_ = boost::beast::flat_buffer{};
273 derived().ws().async_read(
274 buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this())
279 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
281 boost::ignore_unused(bytesTransferred);
284 return wsFail(ec,
"read");
286 LOG(perfLog_.info()) <<
tag() <<
"Received request from ip = " << clientIp_;
288 std::string requestStr{
static_cast<char const*
>(buffer_.data().data()), buffer_.size()};
291 (*handler_)(requestStr, shared_from_this());
292 }
catch (std::exception
const&) {
293 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
306 auto request = boost::json::parse(requestStr);
307 if (request.is_object() && request.as_object().contains(
"id"))
308 e[
"id"] = request.as_object().at(
"id");
309 e[
"request"] = std::move(request);
310 }
catch (std::exception
const&) {
311 e[
"request"] = requestStr;
314 this->
send(std::make_shared<std::string>(boost::json::serialize(e)));
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:160