Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WsBase.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "rpc/Errors.hpp"
23#include "rpc/common/Types.hpp"
24#include "util/Taggable.hpp"
25#include "util/log/Logger.hpp"
26#include "web/SubscriptionContext.hpp"
27#include "web/SubscriptionContextInterface.hpp"
28#include "web/dosguard/DOSGuardInterface.hpp"
29#include "web/interface/Concepts.hpp"
30#include "web/interface/ConnectionBase.hpp"
31
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/error.hpp>
34#include <boost/beast/core.hpp>
35#include <boost/beast/core/error.hpp>
36#include <boost/beast/core/flat_buffer.hpp>
37#include <boost/beast/core/role.hpp>
38#include <boost/beast/http/field.hpp>
39#include <boost/beast/http/message.hpp>
40#include <boost/beast/http/status.hpp>
41#include <boost/beast/http/string_body.hpp>
42#include <boost/beast/version.hpp>
43#include <boost/beast/websocket/error.hpp>
44#include <boost/beast/websocket/rfc6455.hpp>
45#include <boost/beast/websocket/stream_base.hpp>
46#include <boost/core/ignore_unused.hpp>
47#include <boost/json/array.hpp>
48#include <boost/json/parse.hpp>
49#include <boost/json/serialize.hpp>
50#include <xrpl/protocol/ErrorCodes.h>
51
52#include <cstddef>
53#include <cstdint>
54#include <exception>
55#include <functional>
56#include <memory>
57#include <queue>
58#include <string>
59#include <utility>
60
61namespace web::impl {
62
74template <template <typename> typename Derived, SomeServerHandler HandlerType>
75class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
76 using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
77
78 boost::beast::flat_buffer buffer_;
79 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
80 bool sending_ = false;
81 std::queue<std::shared_ptr<std::string>> messages_;
82 std::shared_ptr<HandlerType> const handler_;
83
84 SubscriptionContextPtr subscriptionContext_;
85 std::uint32_t maxSendingQueueSize_;
86
87protected:
88 util::Logger log_{"WebServer"};
89 util::Logger perfLog_{"Performance"};
90
91 void
92 wsFail(boost::beast::error_code ec, char const* what)
93 {
94 // Don't log if the WebSocket stream was gracefully closed at both endpoints
95 if (ec != boost::beast::websocket::error::closed)
96 LOG(log_.error()) << tag() << ": " << what << ": " << ec.message() << ": " << ec.value();
97
98 if (!ec_ && ec != boost::asio::error::operation_aborted) {
99 ec_ = ec;
100 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
101 }
102 }
103
104public:
105 explicit WsBase(
106 std::string ip,
107 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
108 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
109 std::shared_ptr<HandlerType> const& handler,
110 boost::beast::flat_buffer&& buffer,
111 std::uint32_t maxSendingQueueSize
112 )
113 : ConnectionBase(tagFactory, ip)
114 , buffer_(std::move(buffer))
115 , dosGuard_(dosGuard)
116 , handler_(handler)
117 , maxSendingQueueSize_(maxSendingQueueSize)
118 {
119 upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init)
120
121 LOG(perfLog_.debug()) << tag() << "session created";
122 }
123
124 ~WsBase() override
125 {
126 LOG(perfLog_.debug()) << tag() << "session closed";
127 dosGuard_.get().decrement(clientIp);
128 }
129
130 Derived<HandlerType>&
131 derived()
132 {
133 return static_cast<Derived<HandlerType>&>(*this);
134 }
135
136 void
137 doWrite()
138 {
139 sending_ = true;
140 derived().ws().async_write(
141 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
142 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
143 );
144 }
145
146 void
147 onWrite(boost::system::error_code ec, std::size_t)
148 {
149 messages_.pop();
150 sending_ = false;
151 if (ec) {
152 wsFail(ec, "Failed to write");
153 } else {
154 maybeSendNext();
155 }
156 }
157
158 void
159 maybeSendNext()
160 {
161 if (ec_ || sending_ || messages_.empty())
162 return;
163
164 doWrite();
165 }
166
167 void
168 sendSlowDown(std::string const& request) override
169 {
170 sendError(rpc::RippledError::rpcSLOW_DOWN, request);
171 }
172
179 void
180 send(std::shared_ptr<std::string> msg) override
181 {
182 // Note: post used instead of dispatch to guarantee async behavior of wsFail and maybeSendNext
183 boost::asio::post(
184 derived().ws().get_executor(), [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
185 if (messages_.size() > maxSendingQueueSize_) {
186 wsFail(boost::asio::error::timed_out, "Client is too slow");
187 return;
188 }
189
190 messages_.push(msg);
191 maybeSendNext();
192 }
193 );
194 }
195
204 {
205 if (subscriptionContext_ == nullptr) {
206 subscriptionContext_ = std::make_shared<SubscriptionContext>(factory, shared_from_this());
207 }
208 return subscriptionContext_;
209 }
210
217 void
218 send(std::string&& msg, http::status) override
219 {
220 if (!dosGuard_.get().add(clientIp, msg.size())) {
221 auto jsonResponse = boost::json::parse(msg).as_object();
222 jsonResponse["warning"] = "load";
223
224 if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) {
225 jsonResponse["warnings"].as_array().push_back(rpc::makeWarning(rpc::WarnRpcRateLimit));
226 } else {
227 jsonResponse["warnings"] = boost::json::array{rpc::makeWarning(rpc::WarnRpcRateLimit)};
228 }
229
230 // Reserialize when we need to include this warning
231 msg = boost::json::serialize(jsonResponse);
232 }
233 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
234 send(std::move(sharedMsg));
235 }
236
240 void
241 run(http::request<http::string_body> req)
242 {
243 using namespace boost::beast;
244
245 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
246
247 // Set a decorator to change the Server of the handshake
248 derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
249 res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
250 }));
251
252 derived().ws().async_accept(req, bind_front_handler(&WsBase::onAccept, this->shared_from_this()));
253 }
254
255 void
256 onAccept(boost::beast::error_code ec)
257 {
258 if (ec)
259 return wsFail(ec, "accept");
260
261 LOG(perfLog_.info()) << tag() << "accepting new connection";
262
263 doRead();
264 }
265
266 void
267 doRead()
268 {
269 if (dead())
270 return;
271
272 // Note: use entirely new buffer so previously used, potentially large, capacity is deallocated
273 buffer_ = boost::beast::flat_buffer{};
274
275 derived().ws().async_read(buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this()));
276 }
277
278 void
279 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
280 {
281 boost::ignore_unused(bytesTransferred);
282
283 if (ec)
284 return wsFail(ec, "read");
285
286 LOG(perfLog_.info()) << tag() << "Received request from ip = " << this->clientIp;
287
288 std::string requestStr{static_cast<char const*>(buffer_.data().data()), buffer_.size()};
289
290 try {
291 (*handler_)(requestStr, shared_from_this());
292 } catch (std::exception const&) {
293 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
294 }
295
296 doRead();
297 }
298
299private:
300 void
301 sendError(rpc::RippledError error, std::string requestStr)
302 {
303 auto e = rpc::makeError(error);
304
305 try {
306 auto request = boost::json::parse(requestStr);
307 if (request.is_object() && request.as_object().contains("id"))
308 e["id"] = request.as_object().at("id");
309 e["request"] = std::move(request);
310 } catch (std::exception const&) {
311 e["request"] = requestStr;
312 }
313
314 this->send(std::make_shared<std::string>(boost::json::serialize(e)));
315 }
316};
317} // namespace web::impl
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:322
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:307
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:312
A factory for TagDecorator instantiation.
Definition Taggable.hpp:182
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:280
Web socket implementation. This class is the base class of the web socket session,...
Definition WsBase.hpp:75
void sendSlowDown(std::string const &request) override
Send a "slow down" error response to the client.
Definition WsBase.hpp:168
SubscriptionContextPtr makeSubscriptionContext(util::TagDecoratorFactory const &factory) override
Get the subscription context for this connection.
Definition WsBase.hpp:203
void run(http::request< http::string_body > req)
Accept the session asynchronously.
Definition WsBase.hpp:241
void send(std::shared_ptr< std::string > msg) override
Send a message to the client.
Definition WsBase.hpp:180
void send(std::string &&msg, http::status) override
Send a message to the client.
Definition WsBase.hpp:218
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:120
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
ripple::error_code_i RippledError
Clio uses compatible Rippled error codes for most RPC errors.
Definition Errors.hpp:72
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86
Base class for all connections.
Definition ConnectionBase.hpp:44
ConnectionBase(util::TagDecoratorFactory const &tagFactory, std::string ip)
Create a new connection base.
Definition ConnectionBase.hpp:59
bool dead()
Indicates whether the connection had an error and is considered dead.
Definition ConnectionBase.hpp:107