Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WsBase.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "rpc/Errors.hpp"
23#include "rpc/common/Types.hpp"
24#include "util/Taggable.hpp"
25#include "util/log/Logger.hpp"
26#include "web/SubscriptionContext.hpp"
27#include "web/SubscriptionContextInterface.hpp"
28#include "web/dosguard/DOSGuardInterface.hpp"
29#include "web/interface/Concepts.hpp"
30#include "web/interface/ConnectionBase.hpp"
31
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/error.hpp>
34#include <boost/beast/core.hpp>
35#include <boost/beast/core/error.hpp>
36#include <boost/beast/core/flat_buffer.hpp>
37#include <boost/beast/core/role.hpp>
38#include <boost/beast/http/field.hpp>
39#include <boost/beast/http/message.hpp>
40#include <boost/beast/http/status.hpp>
41#include <boost/beast/http/string_body.hpp>
42#include <boost/beast/version.hpp>
43#include <boost/beast/websocket/error.hpp>
44#include <boost/beast/websocket/rfc6455.hpp>
45#include <boost/beast/websocket/stream_base.hpp>
46#include <boost/core/ignore_unused.hpp>
47#include <boost/json/array.hpp>
48#include <boost/json/parse.hpp>
49#include <boost/json/serialize.hpp>
50#include <xrpl/protocol/ErrorCodes.h>
51
52#include <cstddef>
53#include <cstdint>
54#include <exception>
55#include <functional>
56#include <memory>
57#include <queue>
58#include <string>
59#include <utility>
60
61namespace web::impl {
62
74template <template <typename> typename Derived, SomeServerHandler HandlerType>
75class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
76 using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
77
78 boost::beast::flat_buffer buffer_;
79 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
80 bool sending_ = false;
81 std::queue<std::shared_ptr<std::string>> messages_;
82 std::shared_ptr<HandlerType> const handler_;
83
84 SubscriptionContextPtr subscriptionContext_;
85 std::uint32_t maxSendingQueueSize_;
86
87protected:
88 util::Logger log_{"WebServer"};
89 util::Logger perfLog_{"Performance"};
90
91 void
92 wsFail(boost::beast::error_code ec, char const* what)
93 {
94 // Don't log if the WebSocket stream was gracefully closed at both endpoints
95 if (ec != boost::beast::websocket::error::closed)
96 LOG(log_.error()) << tag() << ": " << what << ": " << ec.message() << ": " << ec.value();
97
98 if (!ec_ && ec != boost::asio::error::operation_aborted) {
99 ec_ = ec;
100 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
101 }
102 }
103
104public:
105 explicit WsBase(
106 std::string ip,
107 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
108 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
109 std::shared_ptr<HandlerType> const& handler,
110 boost::beast::flat_buffer&& buffer,
111 std::uint32_t maxSendingQueueSize
112 )
113 : ConnectionBase(tagFactory, ip)
114 , buffer_(std::move(buffer))
115 , dosGuard_(dosGuard)
116 , handler_(handler)
117 , maxSendingQueueSize_(maxSendingQueueSize)
118 {
119 upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init)
120
121 LOG(perfLog_.debug()) << tag() << "session created";
122 }
123
124 ~WsBase() override
125 {
126 LOG(perfLog_.debug()) << tag() << "session closed";
127 dosGuard_.get().decrement(clientIp);
128 }
129
130 Derived<HandlerType>&
131 derived()
132 {
133 return static_cast<Derived<HandlerType>&>(*this);
134 }
135
136 void
137 doWrite()
138 {
139 sending_ = true;
140 derived().ws().async_write(
141 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
142 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
143 );
144 }
145
146 void
147 onWrite(boost::system::error_code ec, std::size_t)
148 {
149 messages_.pop();
150 sending_ = false;
151 if (ec) {
152 wsFail(ec, "Failed to write");
153 } else {
154 maybeSendNext();
155 }
156 }
157
158 void
159 maybeSendNext()
160 {
161 if (ec_ || sending_ || messages_.empty())
162 return;
163
164 doWrite();
165 }
166
173 void
174 send(std::shared_ptr<std::string> msg) override
175 {
176 boost::asio::dispatch(
177 derived().ws().get_executor(),
178 [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
179 if (messages_.size() > maxSendingQueueSize_) {
180 wsFail(boost::asio::error::timed_out, "Client is too slow");
181 return;
182 }
183
184 messages_.push(msg);
185 maybeSendNext();
186 }
187 );
188 }
189
198 {
199 if (subscriptionContext_ == nullptr) {
200 subscriptionContext_ = std::make_shared<SubscriptionContext>(factory, shared_from_this());
201 }
202 return subscriptionContext_;
203 }
204
211 void
212 send(std::string&& msg, http::status) override
213 {
214 if (!dosGuard_.get().add(clientIp, msg.size())) {
215 auto jsonResponse = boost::json::parse(msg).as_object();
216 jsonResponse["warning"] = "load";
217
218 if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) {
219 jsonResponse["warnings"].as_array().push_back(rpc::makeWarning(rpc::WarnRpcRateLimit));
220 } else {
221 jsonResponse["warnings"] = boost::json::array{rpc::makeWarning(rpc::WarnRpcRateLimit)};
222 }
223
224 // Reserialize when we need to include this warning
225 msg = boost::json::serialize(jsonResponse);
226 }
227 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
228 send(std::move(sharedMsg));
229 }
230
234 void
235 run(http::request<http::string_body> req)
236 {
237 using namespace boost::beast;
238
239 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
240
241 // Set a decorator to change the Server of the handshake
242 derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
243 res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
244 }));
245
246 derived().ws().async_accept(req, bind_front_handler(&WsBase::onAccept, this->shared_from_this()));
247 }
248
249 void
250 onAccept(boost::beast::error_code ec)
251 {
252 if (ec)
253 return wsFail(ec, "accept");
254
255 LOG(perfLog_.info()) << tag() << "accepting new connection";
256
257 doRead();
258 }
259
260 void
261 doRead()
262 {
263 if (dead())
264 return;
265
266 // Note: use entirely new buffer so previously used, potentially large, capacity is deallocated
267 buffer_ = boost::beast::flat_buffer{};
268
269 derived().ws().async_read(buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this()));
270 }
271
272 void
273 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
274 {
275 boost::ignore_unused(bytesTransferred);
276
277 if (ec)
278 return wsFail(ec, "read");
279
280 LOG(perfLog_.info()) << tag() << "Received request from ip = " << this->clientIp;
281
282 auto sendError = [this](auto error, std::string&& requestStr) {
283 auto e = rpc::makeError(error);
284
285 try {
286 auto request = boost::json::parse(requestStr);
287 if (request.is_object() && request.as_object().contains("id"))
288 e["id"] = request.as_object().at("id");
289 e["request"] = std::move(request);
290 } catch (std::exception const&) {
291 e["request"] = std::move(requestStr);
292 }
293
294 this->send(std::make_shared<std::string>(boost::json::serialize(e)));
295 };
296
297 std::string requestStr{static_cast<char const*>(buffer_.data().data()), buffer_.size()};
298
299 // dosGuard served request++ and check ip address
300 if (!dosGuard_.get().request(clientIp)) {
301 // TODO: could be useful to count in counters in the future too
302 sendError(rpc::RippledError::rpcSLOW_DOWN, std::move(requestStr));
303 } else {
304 try {
305 (*handler_)(requestStr, shared_from_this());
306 } catch (std::exception const&) {
307 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
308 }
309 }
310
311 doRead();
312 }
313};
314} // namespace web::impl
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:215
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:200
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:205
A factory for TagDecorator instantiation.
Definition Taggable.hpp:169
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:267
Web socket implementation. This class is the base class of the web socket session,...
Definition WsBase.hpp:75
SubscriptionContextPtr makeSubscriptionContext(util::TagDecoratorFactory const &factory) override
Get the subscription context for this connection.
Definition WsBase.hpp:197
void run(http::request< http::string_body > req)
Accept the session asynchroniously.
Definition WsBase.hpp:235
void send(std::shared_ptr< std::string > msg) override
Send a message to the client.
Definition WsBase.hpp:174
void send(std::string &&msg, http::status) override
Send a message to the client.
Definition WsBase.hpp:212
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:120
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86
Base class for all connections.
Definition ConnectionBase.hpp:44
ConnectionBase(util::TagDecoratorFactory const &tagFactory, std::string ip)
Create a new connection base.
Definition ConnectionBase.hpp:59
bool dead()
Indicates whether the connection had an error and is considered dead.
Definition ConnectionBase.hpp:100