Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WsBase.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "rpc/Errors.hpp"
23#include "rpc/common/Types.hpp"
24#include "util/Taggable.hpp"
25#include "util/log/Logger.hpp"
26#include "web/SubscriptionContext.hpp"
27#include "web/SubscriptionContextInterface.hpp"
28#include "web/dosguard/DOSGuardInterface.hpp"
29#include "web/interface/Concepts.hpp"
30#include "web/interface/ConnectionBase.hpp"
31
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/error.hpp>
34#include <boost/beast/core.hpp>
35#include <boost/beast/core/error.hpp>
36#include <boost/beast/core/flat_buffer.hpp>
37#include <boost/beast/core/role.hpp>
38#include <boost/beast/http/field.hpp>
39#include <boost/beast/http/message.hpp>
40#include <boost/beast/http/status.hpp>
41#include <boost/beast/http/string_body.hpp>
42#include <boost/beast/version.hpp>
43#include <boost/beast/websocket/error.hpp>
44#include <boost/beast/websocket/rfc6455.hpp>
45#include <boost/beast/websocket/stream_base.hpp>
46#include <boost/core/ignore_unused.hpp>
47#include <boost/json/array.hpp>
48#include <boost/json/parse.hpp>
49#include <boost/json/serialize.hpp>
50#include <xrpl/protocol/ErrorCodes.h>
51
52#include <cstddef>
53#include <cstdint>
54#include <exception>
55#include <functional>
56#include <memory>
57#include <queue>
58#include <string>
59#include <utility>
60
61namespace web::impl {
62
74template <template <typename> typename Derived, SomeServerHandler HandlerType>
75class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
76 using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
77
78 boost::beast::flat_buffer buffer_;
79 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
80 bool sending_ = false;
81 std::queue<std::shared_ptr<std::string>> messages_;
82 std::shared_ptr<HandlerType> const handler_;
83
84 SubscriptionContextPtr subscriptionContext_;
85 std::uint32_t maxSendingQueueSize_;
86
87protected:
88 util::Logger log_{"WebServer"};
89 util::Logger perfLog_{"Performance"};
90
91 void
92 wsFail(boost::beast::error_code ec, char const* what)
93 {
94 // Don't log if the WebSocket stream was gracefully closed at both endpoints
95 if (ec != boost::beast::websocket::error::closed)
96 LOG(log_.error()) << tag() << ": " << what << ": " << ec.message() << ": " << ec.value();
97
98 if (!ec_ && ec != boost::asio::error::operation_aborted) {
99 ec_ = ec;
100 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
101 }
102 }
103
104public:
105 explicit WsBase(
106 std::string ip,
107 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
108 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
109 std::shared_ptr<HandlerType> const& handler,
110 boost::beast::flat_buffer&& buffer,
111 std::uint32_t maxSendingQueueSize
112 )
113 : ConnectionBase(tagFactory, ip)
114 , buffer_(std::move(buffer))
115 , dosGuard_(dosGuard)
116 , handler_(handler)
117 , maxSendingQueueSize_(maxSendingQueueSize)
118 {
119 upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init)
120
121 LOG(perfLog_.debug()) << tag() << "session created";
122 }
123
124 ~WsBase() override
125 {
126 LOG(perfLog_.debug()) << tag() << "session closed";
127 dosGuard_.get().decrement(clientIp);
128 }
129
130 Derived<HandlerType>&
131 derived()
132 {
133 return static_cast<Derived<HandlerType>&>(*this);
134 }
135
136 void
137 doWrite()
138 {
139 sending_ = true;
140 derived().ws().async_write(
141 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
142 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
143 );
144 }
145
146 void
147 onWrite(boost::system::error_code ec, std::size_t)
148 {
149 messages_.pop();
150 sending_ = false;
151 if (ec) {
152 wsFail(ec, "Failed to write");
153 } else {
154 maybeSendNext();
155 }
156 }
157
158 void
159 maybeSendNext()
160 {
161 if (ec_ || sending_ || messages_.empty())
162 return;
163
164 doWrite();
165 }
166
167 void
168 sendSlowDown(std::string const& request) override
169 {
170 sendError(rpc::RippledError::rpcSLOW_DOWN, request);
171 }
172
179 void
180 send(std::shared_ptr<std::string> msg) override
181 {
182 // Note: post used instead of dispatch to guarantee async behavior of wsFail and maybeSendNext
183 boost::asio::post(
184 derived().ws().get_executor(),
185 [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
186 if (messages_.size() > maxSendingQueueSize_) {
187 wsFail(boost::asio::error::timed_out, "Client is too slow");
188 return;
189 }
190
191 messages_.push(msg);
192 maybeSendNext();
193 }
194 );
195 }
196
205 {
206 if (subscriptionContext_ == nullptr) {
207 subscriptionContext_ = std::make_shared<SubscriptionContext>(factory, shared_from_this());
208 }
209 return subscriptionContext_;
210 }
211
218 void
219 send(std::string&& msg, http::status) override
220 {
221 if (!dosGuard_.get().add(clientIp, msg.size())) {
222 auto jsonResponse = boost::json::parse(msg).as_object();
223 jsonResponse["warning"] = "load";
224
225 if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) {
226 jsonResponse["warnings"].as_array().push_back(rpc::makeWarning(rpc::WarnRpcRateLimit));
227 } else {
228 jsonResponse["warnings"] = boost::json::array{rpc::makeWarning(rpc::WarnRpcRateLimit)};
229 }
230
231 // Reserialize when we need to include this warning
232 msg = boost::json::serialize(jsonResponse);
233 }
234 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
235 send(std::move(sharedMsg));
236 }
237
241 void
242 run(http::request<http::string_body> req)
243 {
244 using namespace boost::beast;
245
246 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
247
248 // Set a decorator to change the Server of the handshake
249 derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
250 res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
251 }));
252
253 derived().ws().async_accept(req, bind_front_handler(&WsBase::onAccept, this->shared_from_this()));
254 }
255
256 void
257 onAccept(boost::beast::error_code ec)
258 {
259 if (ec)
260 return wsFail(ec, "accept");
261
262 LOG(perfLog_.info()) << tag() << "accepting new connection";
263
264 doRead();
265 }
266
267 void
268 doRead()
269 {
270 if (dead())
271 return;
272
273 // Note: use entirely new buffer so previously used, potentially large, capacity is deallocated
274 buffer_ = boost::beast::flat_buffer{};
275
276 derived().ws().async_read(buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this()));
277 }
278
279 void
280 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
281 {
282 boost::ignore_unused(bytesTransferred);
283
284 if (ec)
285 return wsFail(ec, "read");
286
287 LOG(perfLog_.info()) << tag() << "Received request from ip = " << this->clientIp;
288
289 std::string requestStr{static_cast<char const*>(buffer_.data().data()), buffer_.size()};
290
291 try {
292 (*handler_)(requestStr, shared_from_this());
293 } catch (std::exception const&) {
294 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
295 }
296
297 doRead();
298 }
299
300private:
301 void
302 sendError(rpc::RippledError error, std::string requestStr)
303 {
304 auto e = rpc::makeError(error);
305
306 try {
307 auto request = boost::json::parse(requestStr);
308 if (request.is_object() && request.as_object().contains("id"))
309 e["id"] = request.as_object().at("id");
310 e["request"] = std::move(request);
311 } catch (std::exception const&) {
312 e["request"] = requestStr;
313 }
314
315 this->send(std::make_shared<std::string>(boost::json::serialize(e)));
316 }
317};
318} // namespace web::impl
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
A factory for TagDecorator instantiation.
Definition Taggable.hpp:182
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:280
Web socket implementation. This class is the base class of the web socket session,...
Definition WsBase.hpp:75
void sendSlowDown(std::string const &request) override
Send a "slow down" error response to the client.
Definition WsBase.hpp:168
SubscriptionContextPtr makeSubscriptionContext(util::TagDecoratorFactory const &factory) override
Get the subscription context for this connection.
Definition WsBase.hpp:204
void run(http::request< http::string_body > req)
Accept the session asynchronously.
Definition WsBase.hpp:242
void send(std::shared_ptr< std::string > msg) override
Send a message to the client.
Definition WsBase.hpp:180
void send(std::string &&msg, http::status) override
Send a message to the client.
Definition WsBase.hpp:219
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:120
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
ripple::error_code_i RippledError
Clio uses compatible Rippled error codes for most RPC errors.
Definition Errors.hpp:71
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86
Base class for all connections.
Definition ConnectionBase.hpp:44
ConnectionBase(util::TagDecoratorFactory const &tagFactory, std::string ip)
Create a new connection base.
Definition ConnectionBase.hpp:59
bool dead()
Indicates whether the connection had an error and is considered dead.
Definition ConnectionBase.hpp:107