xrpld
Loading...
Searching...
No Matches
BaseWSPeer.h
1#pragma once
2
3#include <xrpl/basics/safe_cast.h>
4#include <xrpl/beast/utility/instrumentation.h>
5#include <xrpl/beast/utility/rngfill.h>
6#include <xrpl/crypto/csprng.h>
7#include <xrpl/protocol/BuildInfo.h>
8#include <xrpl/server/WSSession.h>
9#include <xrpl/server/detail/BasePeer.h>
10#include <xrpl/server/detail/LowestLayer.h>
11
12#include <boost/asio/error.hpp>
13#include <boost/beast/core/multi_buffer.hpp>
14#include <boost/beast/http/message.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/logic/tribool.hpp>
17
18#include <algorithm>
19#include <functional>
20#include <list>
21
22namespace xrpl {
23
25template <class Handler, class Impl>
26class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
27{
28protected:
30 using error_code = boost::system::error_code;
31 using endpoint_type = boost::asio::ip::tcp::endpoint;
32 using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
33 using BasePeer<Handler, Impl>::strand_;
34
35private:
36 friend class BasePeer<Handler, Impl>;
37
39 boost::beast::multi_buffer rb_;
40 boost::beast::multi_buffer wb_;
45 bool doClose_ = false;
46 boost::beast::websocket::close_reason cr_;
48 bool closeOnTimer_ = false;
49 bool pingActive_ = false;
50 boost::beast::websocket::ping_data payload_;
52 std::function<void(boost::beast::websocket::frame_type, boost::beast::string_view)>
54
55public:
56 template <class Body, class Headers>
58 Port const& port,
59 Handler& handler,
60 boost::asio::executor const& executor,
61 waitable_timer timer,
62 endpoint_type remoteAddress,
63 boost::beast::http::request<Body, Headers>&& request,
64 beast::Journal journal);
65
66 void
67 run() override;
68
69 //
70 // WSSession
71 //
72
73 [[nodiscard]] Port const&
74 port() const override
75 {
76 return this->port_;
77 }
78
79 [[nodiscard]] http_request_type const&
80 request() const override
81 {
82 return this->request_;
83 }
84
85 [[nodiscard]] boost::asio::ip::tcp::endpoint const&
86 remoteEndpoint() const override
87 {
88 return this->remoteAddress_;
89 }
90
91 void
93
94 void
95 close() override;
96
97 void
98 close(boost::beast::websocket::close_reason const& reason) override;
99
100 void
101 complete() override;
102
103protected:
104 Impl&
106 {
107 return *static_cast<Impl*>(this);
108 }
109
110 void
112
113 void
115
116 void
117 onWrite(error_code const& ec);
118
119 void
121
122 void
124
125 void
126 onRead(error_code const& ec);
127
128 void
129 onClose(error_code const& ec);
130
131 void
133
134 void
136
137 void
138 onPing(error_code const& ec);
139
140 void
141 onPingPong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload);
142
143 void
145
146 template <class String>
147 void
148 fail(error_code ec, String const& what);
149};
150
151//------------------------------------------------------------------------------
152
153template <class Handler, class Impl>
154template <class Body, class Headers>
156 Port const& port,
157 Handler& handler,
158 boost::asio::executor const& executor,
159 waitable_timer timer,
160 endpoint_type remoteAddress,
161 boost::beast::http::request<Body, Headers>&& request,
162 beast::Journal journal)
163 : BasePeer<Handler, Impl>(port, handler, executor, remoteAddress, journal)
164 , request_(std::move(request))
165 , timer_(std::move(timer))
166 , payload_("12345678") // ensures size is 8 bytes
167{
168}
169
170template <class Handler, class Impl>
171void
173{
174 if (!strand_.running_in_this_thread())
175 return post(strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
176 impl().ws_.set_option(port().pmdOptions);
177 // Must manage the control callback memory outside of the `control_callback`
178 // function
180 std::bind(&BaseWSPeer::onPingPong, this, std::placeholders::_1, std::placeholders::_2);
181 impl().ws_.control_callback(controlCallback_);
182 startTimer();
183 closeOnTimer_ = true;
184 impl().ws_.set_option(boost::beast::websocket::stream_base::decorator([](auto& res) {
185 res.set(boost::beast::http::field::server, BuildInfo::getFullVersionString());
186 }));
187 impl().ws_.async_accept(
188 request_,
189 bind_executor(
190 strand_,
191 std::bind(
192 &BaseWSPeer::onWsHandshake, impl().shared_from_this(), std::placeholders::_1)));
193}
194
195template <class Handler, class Impl>
196void
198{
199 if (!strand_.running_in_this_thread())
200 return post(strand_, std::bind(&BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
201 if (doClose_)
202 return;
203 if (wq_.size() > port().wsQueueLimit)
204 {
205 cr_.code = safeCast<decltype(cr_.code)>(boost::beast::websocket::close_code::policy_error);
206 cr_.reason = "Policy error: client is too slow.";
207 JLOG(this->j_.info()) << cr_.reason;
208 wq_.erase(std::next(wq_.begin()), wq_.end());
209 close(cr_);
210 return;
211 }
212 wq_.emplace_back(std::move(w));
213 if (wq_.size() == 1)
214 onWrite({});
215}
216
217template <class Handler, class Impl>
218void
220{
221 close(boost::beast::websocket::close_reason{});
222}
223
224template <class Handler, class Impl>
225void
226BaseWSPeer<Handler, Impl>::close(boost::beast::websocket::close_reason const& reason)
227{
228 if (!strand_.running_in_this_thread())
229 return post(strand_, [self = impl().shared_from_this(), reason] { self->close(reason); });
230 if (doClose_)
231 return;
232 doClose_ = true;
233 if (wq_.empty())
234 {
235 impl().ws_.async_close(
236 reason,
237 bind_executor(
238 strand_, [self = impl().shared_from_this()](boost::beast::error_code const& ec) {
239 self->onClose(ec);
240 }));
241 }
242 else
243 {
244 cr_ = reason;
245 }
246}
247
248template <class Handler, class Impl>
249void
251{
252 if (!strand_.running_in_this_thread())
253 return post(strand_, std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
254 doRead();
255}
256
257template <class Handler, class Impl>
258void
260{
261 if (ec)
262 return fail(ec, "on_ws_handshake");
263 closeOnTimer_ = false;
264 doRead();
265}
266
267template <class Handler, class Impl>
268void
270{
271 if (!strand_.running_in_this_thread())
272 return post(strand_, std::bind(&BaseWSPeer::doWrite, impl().shared_from_this()));
273 onWrite({});
274}
275
276template <class Handler, class Impl>
277void
279{
280 if (ec)
281 return fail(ec, "write");
282 auto& w = *wq_.front();
283 auto const result =
284 w.prepare(65536, std::bind(&BaseWSPeer::doWrite, impl().shared_from_this()));
285 if (boost::indeterminate(result.first))
286 return;
287 startTimer();
288 if (!result.first)
289 {
290 impl().ws_.async_write_some(
291 static_cast<bool>(result.first),
292 result.second,
293 bind_executor(
294 strand_,
295 std::bind(&BaseWSPeer::onWrite, impl().shared_from_this(), std::placeholders::_1)));
296 }
297 else
298 {
299 impl().ws_.async_write_some(
300 static_cast<bool>(result.first),
301 result.second,
302 bind_executor(
303 strand_,
304 std::bind(
305 &BaseWSPeer::onWriteFin, impl().shared_from_this(), std::placeholders::_1)));
306 }
307}
308
309template <class Handler, class Impl>
310void
312{
313 if (ec)
314 return fail(ec, "write_fin");
315 wq_.pop_front();
316 if (doClose_)
317 {
318 impl().ws_.async_close(
319 cr_,
320 bind_executor(
321 strand_,
322 std::bind(&BaseWSPeer::onClose, impl().shared_from_this(), std::placeholders::_1)));
323 }
324 else if (!wq_.empty())
325 {
326 onWrite({});
327 }
328}
329
330template <class Handler, class Impl>
331void
333{
334 if (!strand_.running_in_this_thread())
335 return post(strand_, std::bind(&BaseWSPeer::doRead, impl().shared_from_this()));
336 impl().ws_.async_read(
337 rb_,
338 bind_executor(
339 strand_,
340 std::bind(&BaseWSPeer::onRead, impl().shared_from_this(), std::placeholders::_1)));
341}
342
343template <class Handler, class Impl>
344void
346{
347 if (ec == boost::beast::websocket::error::closed)
348 return onClose({});
349 if (ec)
350 return fail(ec, "read");
351 auto const& data = rb_.data();
353 b.reserve(std::distance(data.begin(), data.end()));
355 this->handler_.onWSMessage(impl().shared_from_this(), b);
356 rb_.consume(rb_.size());
357}
358
359template <class Handler, class Impl>
360void
365
366template <class Handler, class Impl>
367void
369{
370 // Max seconds without completing a message
371 static constexpr std::chrono::seconds kTimeout{30};
372 static constexpr std::chrono::seconds kTimeoutLocal{3};
373
374 try
375 {
376 timer_.expires_after(remoteEndpoint().address().is_loopback() ? kTimeoutLocal : kTimeout);
377 }
378 catch (boost::system::system_error const& e)
379 {
380 return fail(e.code(), "start_timer");
381 }
382
383 timer_.async_wait(bind_executor(
384 strand_,
385 std::bind(
387 impl().shared_from_this(),
388 std::placeholders::_1)));
389}
390
391// Convenience for discarding the error code
392template <class Handler, class Impl>
393void
395{
396 try
397 {
398 timer_.cancel();
399 }
400 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
401 {
402 // ignored
403 }
404}
405
406template <class Handler, class Impl>
407void
409{
410 if (ec == boost::asio::error::operation_aborted)
411 return;
412 pingActive_ = false;
413 if (!ec)
414 return;
415 fail(ec, "on_ping");
416}
417
418template <class Handler, class Impl>
419void
421 boost::beast::websocket::frame_type kind,
422 boost::beast::string_view payload)
423{
424 if (kind == boost::beast::websocket::frame_type::pong)
425 {
426 boost::beast::string_view const p(payload_.begin());
427 if (payload == p)
428 {
429 closeOnTimer_ = false;
430 JLOG(this->j_.trace()) << "got matching pong";
431 }
432 else
433 {
434 JLOG(this->j_.trace()) << "got pong";
435 }
436 }
437}
438
439template <class Handler, class Impl>
440void
442{
443 if (ec == boost::asio::error::operation_aborted)
444 return;
445 if (!ec)
446 {
447 if (!closeOnTimer_ || !pingActive_)
448 {
449 startTimer();
450 closeOnTimer_ = true;
451 pingActive_ = true;
452 // cryptographic is probably overkill..
453 beast::rngfill(payload_.begin(), payload_.size(), cryptoPrng());
454 impl().ws_.async_ping(
455 payload_,
456 bind_executor(
457 strand_,
458 std::bind(
459 &BaseWSPeer::onPing, impl().shared_from_this(), std::placeholders::_1)));
460 JLOG(this->j_.trace()) << "sent ping";
461 return;
462 }
463 ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
464 }
465 fail(ec, "timer");
466}
467
468template <class Handler, class Impl>
469template <class String>
470void
472{
473 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::BaseWSPeer::fail : strand in this thread");
474
475 cancelTimer();
476 if (!ec_ && ec != boost::asio::error::operation_aborted)
477 {
478 ec_ = ec;
479 JLOG(this->j_.trace()) << what << ": " << ec.message();
480 xrpl::getLowestLayer(impl().ws_).socket().close(ec);
481 }
482}
483
484} // namespace xrpl
T back_inserter(T... args)
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:38
boost::asio::strand< boost::asio::executor > strand_
Definition BasePeer.h:35
Port const & port_
Definition BasePeer.h:28
endpoint_type remoteAddress_
Definition BasePeer.h:30
BasePeer(Port const &port, Handler &handler, boost::asio::executor const &executor, endpoint_type remoteAddress, beast::Journal journal)
Definition BasePeer.h:60
Handler & handler_
Definition BasePeer.h:29
beast::Journal const j_
Definition BasePeer.h:32
void onPingPong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition BaseWSPeer.h:420
boost::beast::websocket::close_reason cr_
Definition BaseWSPeer.h:46
boost::beast::multi_buffer rb_
Definition BaseWSPeer.h:39
void onTimer(error_code ec)
Definition BaseWSPeer.h:441
error_code ec_
Definition BaseWSPeer.h:51
boost::asio::ip::tcp::endpoint endpoint_type
Definition BaseWSPeer.h:31
boost::beast::multi_buffer wb_
Definition BaseWSPeer.h:40
void fail(error_code ec, String const &what)
Definition BaseWSPeer.h:471
void onWriteFin(error_code const &ec)
Definition BaseWSPeer.h:311
boost::system::error_code error_code
Definition BaseWSPeer.h:30
boost::asio::basic_waitable_timer< clock_type > waitable_timer
Definition BaseWSPeer.h:32
void onPing(error_code const &ec)
Definition BaseWSPeer.h:408
boost::beast::websocket::ping_data payload_
Definition BaseWSPeer.h:50
void run() override
Definition BaseWSPeer.h:172
void onWsHandshake(error_code const &ec)
Definition BaseWSPeer.h:259
void close() override
Definition BaseWSPeer.h:219
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remoteAddress, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition BaseWSPeer.h:155
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> controlCallback_
Definition BaseWSPeer.h:53
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition BaseWSPeer.h:197
void onClose(error_code const &ec)
Definition BaseWSPeer.h:361
http_request_type request_
Definition BaseWSPeer.h:38
std::chrono::system_clock clock_type
Definition BaseWSPeer.h:29
waitable_timer timer_
Definition BaseWSPeer.h:47
void onRead(error_code const &ec)
Definition BaseWSPeer.h:345
bool doClose_
The socket has been closed, or will close after the next write finishes.
Definition BaseWSPeer.h:45
boost::asio::ip::tcp::endpoint const & remoteEndpoint() const override
Definition BaseWSPeer.h:86
std::list< std::shared_ptr< WSMsg > > wq_
Definition BaseWSPeer.h:41
void onWrite(error_code const &ec)
Definition BaseWSPeer.h:278
void complete() override
Indicate that the response is complete.
Definition BaseWSPeer.h:250
http_request_type const & request() const override
Definition BaseWSPeer.h:80
T copy(T... args)
T distance(T... args)
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:14
STL namespace.
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:82
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safeCast(Src s) noexcept
Definition safe_cast.h:21
CsprngEngine & cryptoPrng()
The default cryptographically secure PRNG.
decltype(auto) getLowestLayer(T &t) noexcept
Definition LowestLayer.h:14
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
T next(T... args)
T reserve(T... args)
Configuration information for a Server listening port.
Definition Port.h:28
WSSession()=default