Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WsBase.hpp
1#pragma once
2
3#include "rpc/Errors.hpp"
4#include "rpc/common/Types.hpp"
5#include "util/Taggable.hpp"
6#include "util/log/Logger.hpp"
7#include "web/SubscriptionContext.hpp"
8#include "web/SubscriptionContextInterface.hpp"
9#include "web/dosguard/DOSGuardInterface.hpp"
10#include "web/interface/Concepts.hpp"
11#include "web/interface/ConnectionBase.hpp"
12
13#include <boost/asio/buffer.hpp>
14#include <boost/asio/error.hpp>
15#include <boost/beast/core.hpp>
16#include <boost/beast/core/error.hpp>
17#include <boost/beast/core/flat_buffer.hpp>
18#include <boost/beast/core/role.hpp>
19#include <boost/beast/http/field.hpp>
20#include <boost/beast/http/message.hpp>
21#include <boost/beast/http/status.hpp>
22#include <boost/beast/http/string_body.hpp>
23#include <boost/beast/version.hpp>
24#include <boost/beast/websocket/error.hpp>
25#include <boost/beast/websocket/rfc6455.hpp>
26#include <boost/beast/websocket/stream_base.hpp>
27#include <boost/core/ignore_unused.hpp>
28#include <boost/json/array.hpp>
29#include <boost/json/parse.hpp>
30#include <boost/json/serialize.hpp>
31#include <xrpl/protocol/ErrorCodes.h>
32
33#include <cstddef>
34#include <cstdint>
35#include <exception>
36#include <functional>
37#include <memory>
38#include <queue>
39#include <string>
40#include <utility>
41
42namespace web::impl {
43
55template <template <typename> typename Derived, SomeServerHandler HandlerType>
56class WsBase : public ConnectionBase,
57 public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
58 using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
59
60 boost::beast::flat_buffer buffer_;
61 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
62 bool sending_ = false;
63 std::queue<std::shared_ptr<std::string>> messages_;
64 std::shared_ptr<HandlerType> const handler_;
65
66 SubscriptionContextPtr subscriptionContext_;
67 std::uint32_t maxSendingQueueSize_;
68
69protected:
70 util::Logger log_{"WebServer"};
71 util::Logger perfLog_{"Performance"};
72
73 void
74 wsFail(boost::beast::error_code ec, char const* what)
75 {
76 // Don't log if the WebSocket stream was gracefully closed at both endpoints
77 if (ec != boost::beast::websocket::error::closed) {
78 LOG(log_.error()) << tag() << ": " << what << ": " << ec.message() << ": "
79 << ec.value();
80 }
81
82 if (!ec_ && ec != boost::asio::error::operation_aborted) {
83 ec_ = ec;
84 boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
85 }
86 }
87
88public:
89 explicit WsBase(
90 std::string ip,
91 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
92 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
93 std::shared_ptr<HandlerType> const& handler,
94 boost::beast::flat_buffer&& buffer,
95 std::uint32_t maxSendingQueueSize
96 )
97 : ConnectionBase(tagFactory, ip)
98 , buffer_(std::move(buffer))
99 , dosGuard_(dosGuard)
100 , handler_(handler)
101 , maxSendingQueueSize_(maxSendingQueueSize)
102 {
103 upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init)
104
105 LOG(perfLog_.debug()) << tag() << "session created";
106 }
107
108 ~WsBase() override
109 {
110 LOG(perfLog_.debug()) << tag() << "session closed";
111 dosGuard_.get().decrement(clientIp_);
112 }
113
114 Derived<HandlerType>&
115 derived()
116 {
117 return static_cast<Derived<HandlerType>&>(*this);
118 }
119
120 void
121 doWrite()
122 {
123 sending_ = true;
124 derived().ws().async_write(
125 boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
126 boost::beast::bind_front_handler(&WsBase::onWrite, derived().shared_from_this())
127 );
128 }
129
130 void
131 onWrite(boost::system::error_code ec, std::size_t)
132 {
133 messages_.pop();
134 sending_ = false;
135 if (ec) {
136 wsFail(ec, "Failed to write");
137 } else {
138 maybeSendNext();
139 }
140 }
141
142 void
143 maybeSendNext()
144 {
145 if (ec_ || sending_ || messages_.empty())
146 return;
147
148 doWrite();
149 }
150
151 void
152 sendSlowDown(std::string const& request) override
153 {
154 sendError(rpc::RippledError::rpcSLOW_DOWN, request);
155 }
156
163 void
164 send(std::shared_ptr<std::string> msg) override
165 {
166 // Note: post used instead of dispatch to guarantee async behavior of wsFail and
167 // maybeSendNext
168 boost::asio::post(
169 derived().ws().get_executor(),
170 [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
171 if (messages_.size() > maxSendingQueueSize_) {
172 wsFail(boost::asio::error::timed_out, "Client is too slow");
173 return;
174 }
175
176 messages_.push(msg);
177 maybeSendNext();
178 }
179 );
180 }
181
190 {
191 if (subscriptionContext_ == nullptr) {
192 subscriptionContext_ =
193 std::make_shared<SubscriptionContext>(factory, shared_from_this());
194 }
195 return subscriptionContext_;
196 }
197
204 void
205 send(std::string&& msg, http::status) override
206 {
207 if (!dosGuard_.get().add(clientIp_, msg.size())) {
208 auto jsonResponse = boost::json::parse(msg).as_object();
209 jsonResponse["warning"] = "load";
210
211 if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) {
212 jsonResponse["warnings"].as_array().push_back(
213 rpc::makeWarning(rpc::WarnRpcRateLimit)
214 );
215 } else {
216 jsonResponse["warnings"] =
217 boost::json::array{rpc::makeWarning(rpc::WarnRpcRateLimit)};
218 }
219
220 // Reserialize when we need to include this warning
221 msg = boost::json::serialize(jsonResponse);
222 }
223 auto sharedMsg = std::make_shared<std::string>(std::move(msg));
224 send(std::move(sharedMsg));
225 }
226
230 void
231 run(http::request<http::string_body> req)
232 {
233 using namespace boost::beast;
234
235 derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
236
237 // Set a decorator to change the Server of the handshake
238 derived().ws().set_option(
239 websocket::stream_base::decorator([](websocket::response_type& res) {
240 res.set(
241 http::field::server,
242 std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async"
243 );
244 })
245 );
246
247 derived().ws().async_accept(
248 req, bind_front_handler(&WsBase::onAccept, this->shared_from_this())
249 );
250 }
251
252 void
253 onAccept(boost::beast::error_code ec)
254 {
255 if (ec)
256 return wsFail(ec, "accept");
257
258 LOG(perfLog_.info()) << tag() << "accepting new connection";
259
260 doRead();
261 }
262
263 void
264 doRead()
265 {
266 if (dead())
267 return;
268
269 // Note: use entirely new buffer so previously used, potentially large, capacity is
270 // deallocated
271 buffer_ = boost::beast::flat_buffer{};
272
273 derived().ws().async_read(
274 buffer_, boost::beast::bind_front_handler(&WsBase::onRead, this->shared_from_this())
275 );
276 }
277
278 void
279 onRead(boost::beast::error_code ec, std::size_t bytesTransferred)
280 {
281 boost::ignore_unused(bytesTransferred);
282
283 if (ec)
284 return wsFail(ec, "read");
285
286 LOG(perfLog_.info()) << tag() << "Received request from ip = " << clientIp_;
287
288 std::string requestStr{static_cast<char const*>(buffer_.data().data()), buffer_.size()};
289
290 try {
291 (*handler_)(requestStr, shared_from_this());
292 } catch (std::exception const&) {
293 sendError(rpc::RippledError::rpcINTERNAL, std::move(requestStr));
294 }
295
296 doRead();
297 }
298
299private:
300 void
301 sendError(rpc::RippledError error, std::string requestStr)
302 {
303 auto e = rpc::makeError(error);
304
305 try {
306 auto request = boost::json::parse(requestStr);
307 if (request.is_object() && request.as_object().contains("id"))
308 e["id"] = request.as_object().at("id");
309 e["request"] = std::move(request);
310 } catch (std::exception const&) {
311 e["request"] = requestStr;
312 }
313
314 this->send(std::make_shared<std::string>(boost::json::serialize(e)));
315 }
316};
317} // namespace web::impl
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:488
A factory for TagDecorator instantiation.
Definition Taggable.hpp:165
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:264
void sendSlowDown(std::string const &request) override
Send a "slow down" error response to the client.
Definition WsBase.hpp:152
SubscriptionContextPtr makeSubscriptionContext(util::TagDecoratorFactory const &factory) override
Get the subscription context for this connection.
Definition WsBase.hpp:189
void run(http::request< http::string_body > req)
Accept the session asynchronously.
Definition WsBase.hpp:231
void send(std::shared_ptr< std::string > msg) override
Send a message to the client.
Definition WsBase.hpp:164
void send(std::string &&msg, http::status) override
Send a message to the client.
Definition WsBase.hpp:205
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:160
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:84
ripple::error_code_i RippledError
Clio uses compatible Rippled error codes for most RPC errors.
Definition Errors.hpp:54
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:64
ConnectionBase(util::TagDecoratorFactory const &tagFactory, std::string ip)
Create a new connection base.
Definition ConnectionBase.hpp:40
bool dead()
Indicates whether the connection had an error and is considered dead.
Definition ConnectionBase.hpp:88