rippled
Loading...
Searching...
No Matches
BaseWSPeer.h
1#ifndef XRPL_SERVER_BASEWSPEER_H_INCLUDED
2#define XRPL_SERVER_BASEWSPEER_H_INCLUDED
3
4#include <xrpl/basics/safe_cast.h>
5#include <xrpl/beast/utility/instrumentation.h>
6#include <xrpl/beast/utility/rngfill.h>
7#include <xrpl/crypto/csprng.h>
8#include <xrpl/protocol/BuildInfo.h>
9#include <xrpl/server/WSSession.h>
10#include <xrpl/server/detail/BasePeer.h>
11#include <xrpl/server/detail/LowestLayer.h>
12
13#include <boost/asio/error.hpp>
14#include <boost/beast/core/multi_buffer.hpp>
15#include <boost/beast/http/message.hpp>
16#include <boost/beast/websocket.hpp>
17#include <boost/logic/tribool.hpp>
18
19#include <functional>
20#include <list>
21
22namespace ripple {
23
25template <class Handler, class Impl>
26class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
27{
28protected:
30 using error_code = boost::system::error_code;
31 using endpoint_type = boost::asio::ip::tcp::endpoint;
32 using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
33 using BasePeer<Handler, Impl>::strand_;
34
35private:
36 friend class BasePeer<Handler, Impl>;
37
39 boost::beast::multi_buffer rb_;
40 boost::beast::multi_buffer wb_;
45 bool do_close_ = false;
46 boost::beast::websocket::close_reason cr_;
48 bool close_on_timer_ = false;
49 bool ping_active_ = false;
50 boost::beast::websocket::ping_data payload_;
53 void(boost::beast::websocket::frame_type, boost::beast::string_view)>
55
56public:
57 template <class Body, class Headers>
59 Port const& port,
60 Handler& handler,
61 boost::asio::executor const& executor,
62 waitable_timer timer,
63 endpoint_type remote_address,
64 boost::beast::http::request<Body, Headers>&& request,
65 beast::Journal journal);
66
67 void
68 run() override;
69
70 //
71 // WSSession
72 //
73
74 Port const&
75 port() const override
76 {
77 return this->port_;
78 }
79
81 request() const override
82 {
83 return this->request_;
84 }
85
86 boost::asio::ip::tcp::endpoint const&
87 remote_endpoint() const override
88 {
89 return this->remote_address_;
90 }
91
92 void
94
95 void
96 close() override;
97
98 void
99 close(boost::beast::websocket::close_reason const& reason) override;
100
101 void
102 complete() override;
103
104protected:
105 Impl&
107 {
108 return *static_cast<Impl*>(this);
109 }
110
111 void
113
114 void
116
117 void
119
120 void
122
123 void
125
126 void
127 on_read(error_code const& ec);
128
129 void
131
132 void
134
135 void
137
138 void
139 on_ping(error_code const& ec);
140
141 void
143 boost::beast::websocket::frame_type kind,
144 boost::beast::string_view payload);
145
146 void
148
149 template <class String>
150 void
151 fail(error_code ec, String const& what);
152};
153
154//------------------------------------------------------------------------------
155
156template <class Handler, class Impl>
157template <class Body, class Headers>
159 Port const& port,
160 Handler& handler,
161 boost::asio::executor const& executor,
162 waitable_timer timer,
163 endpoint_type remote_address,
164 boost::beast::http::request<Body, Headers>&& request,
165 beast::Journal journal)
166 : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
167 , request_(std::move(request))
168 , timer_(std::move(timer))
169 , payload_("12345678") // ensures size is 8 bytes
170{
171}
172
173template <class Handler, class Impl>
174void
176{
177 if (!strand_.running_in_this_thread())
178 return post(
179 strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
180 impl().ws_.set_option(port().pmd_options);
181 // Must manage the control callback memory outside of the `control_callback`
182 // function
183 control_callback_ = std::bind(
185 this,
186 std::placeholders::_1,
187 std::placeholders::_2);
188 impl().ws_.control_callback(control_callback_);
189 start_timer();
190 close_on_timer_ = true;
191 impl().ws_.set_option(
192 boost::beast::websocket::stream_base::decorator([](auto& res) {
193 res.set(
194 boost::beast::http::field::server,
196 }));
197 impl().ws_.async_accept(
198 request_,
199 bind_executor(
200 strand_,
201 std::bind(
203 impl().shared_from_this(),
204 std::placeholders::_1)));
205}
206
207template <class Handler, class Impl>
208void
210{
211 if (!strand_.running_in_this_thread())
212 return post(
213 strand_,
214 std::bind(
215 &BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
216 if (do_close_)
217 return;
218 if (wq_.size() > port().ws_queue_limit)
219 {
220 cr_.code = safe_cast<decltype(cr_.code)>(
221 boost::beast::websocket::close_code::policy_error);
222 cr_.reason = "Policy error: client is too slow.";
223 JLOG(this->j_.info()) << cr_.reason;
224 wq_.erase(std::next(wq_.begin()), wq_.end());
225 close(cr_);
226 return;
227 }
228 wq_.emplace_back(std::move(w));
229 if (wq_.size() == 1)
230 on_write({});
231}
232
233template <class Handler, class Impl>
234void
236{
237 close(boost::beast::websocket::close_reason{});
238}
239
240template <class Handler, class Impl>
241void
243 boost::beast::websocket::close_reason const& reason)
244{
245 if (!strand_.running_in_this_thread())
246 return post(strand_, [self = impl().shared_from_this(), reason] {
247 self->close(reason);
248 });
249 if (do_close_)
250 return;
251 do_close_ = true;
252 if (wq_.empty())
253 {
254 impl().ws_.async_close(
255 reason,
256 bind_executor(
257 strand_,
258 [self = impl().shared_from_this()](
259 boost::beast::error_code const& ec) {
260 self->on_close(ec);
261 }));
262 }
263 else
264 {
265 cr_ = reason;
266 }
267}
268
269template <class Handler, class Impl>
270void
272{
273 if (!strand_.running_in_this_thread())
274 return post(
275 strand_,
276 std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
277 do_read();
278}
279
280template <class Handler, class Impl>
281void
283{
284 if (ec)
285 return fail(ec, "on_ws_handshake");
286 close_on_timer_ = false;
287 do_read();
288}
289
290template <class Handler, class Impl>
291void
293{
294 if (!strand_.running_in_this_thread())
295 return post(
296 strand_,
297 std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
298 on_write({});
299}
300
301template <class Handler, class Impl>
302void
304{
305 if (ec)
306 return fail(ec, "write");
307 auto& w = *wq_.front();
308 auto const result = w.prepare(
309 65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
310 if (boost::indeterminate(result.first))
311 return;
312 start_timer();
313 if (!result.first)
314 impl().ws_.async_write_some(
315 static_cast<bool>(result.first),
316 result.second,
317 bind_executor(
318 strand_,
319 std::bind(
321 impl().shared_from_this(),
322 std::placeholders::_1)));
323 else
324 impl().ws_.async_write_some(
325 static_cast<bool>(result.first),
326 result.second,
327 bind_executor(
328 strand_,
329 std::bind(
331 impl().shared_from_this(),
332 std::placeholders::_1)));
333}
334
335template <class Handler, class Impl>
336void
338{
339 if (ec)
340 return fail(ec, "write_fin");
341 wq_.pop_front();
342 if (do_close_)
343 {
344 impl().ws_.async_close(
345 cr_,
346 bind_executor(
347 strand_,
348 std::bind(
350 impl().shared_from_this(),
351 std::placeholders::_1)));
352 }
353 else if (!wq_.empty())
354 on_write({});
355}
356
357template <class Handler, class Impl>
358void
360{
361 if (!strand_.running_in_this_thread())
362 return post(
363 strand_,
364 std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
365 impl().ws_.async_read(
366 rb_,
367 bind_executor(
368 strand_,
369 std::bind(
371 impl().shared_from_this(),
372 std::placeholders::_1)));
373}
374
375template <class Handler, class Impl>
376void
378{
379 if (ec == boost::beast::websocket::error::closed)
380 return on_close({});
381 if (ec)
382 return fail(ec, "read");
383 auto const& data = rb_.data();
385 b.reserve(std::distance(data.begin(), data.end()));
386 std::copy(data.begin(), data.end(), std::back_inserter(b));
387 this->handler_.onWSMessage(impl().shared_from_this(), b);
388 rb_.consume(rb_.size());
389}
390
391template <class Handler, class Impl>
392void
394{
395 cancel_timer();
396}
397
398template <class Handler, class Impl>
399void
401{
402 // Max seconds without completing a message
403 static constexpr std::chrono::seconds timeout{30};
404 static constexpr std::chrono::seconds timeoutLocal{3};
405
406 try
407 {
408 timer_.expires_after(
409 remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
410 }
411 catch (boost::system::system_error const& e)
412 {
413 return fail(e.code(), "start_timer");
414 }
415
416 timer_.async_wait(bind_executor(
417 strand_,
418 std::bind(
420 impl().shared_from_this(),
421 std::placeholders::_1)));
422}
423
424// Convenience for discarding the error code
425template <class Handler, class Impl>
426void
428{
429 try
430 {
431 timer_.cancel();
432 }
433 catch (boost::system::system_error const&)
434 {
435 // ignored
436 }
437}
438
439template <class Handler, class Impl>
440void
442{
443 if (ec == boost::asio::error::operation_aborted)
444 return;
445 ping_active_ = false;
446 if (!ec)
447 return;
448 fail(ec, "on_ping");
449}
450
451template <class Handler, class Impl>
452void
454 boost::beast::websocket::frame_type kind,
455 boost::beast::string_view payload)
456{
457 if (kind == boost::beast::websocket::frame_type::pong)
458 {
459 boost::beast::string_view p(payload_.begin());
460 if (payload == p)
461 {
462 close_on_timer_ = false;
463 JLOG(this->j_.trace()) << "got matching pong";
464 }
465 else
466 {
467 JLOG(this->j_.trace()) << "got pong";
468 }
469 }
470}
471
472template <class Handler, class Impl>
473void
475{
476 if (ec == boost::asio::error::operation_aborted)
477 return;
478 if (!ec)
479 {
480 if (!close_on_timer_ || !ping_active_)
481 {
482 start_timer();
483 close_on_timer_ = true;
484 ping_active_ = true;
485 // cryptographic is probably overkill..
486 beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
487 impl().ws_.async_ping(
488 payload_,
489 bind_executor(
490 strand_,
491 std::bind(
493 impl().shared_from_this(),
494 std::placeholders::_1)));
495 JLOG(this->j_.trace()) << "sent ping";
496 return;
497 }
498 ec = boost::system::errc::make_error_code(
499 boost::system::errc::timed_out);
500 }
501 fail(ec, "timer");
502}
503
504template <class Handler, class Impl>
505template <class String>
506void
508{
509 XRPL_ASSERT(
510 strand_.running_in_this_thread(),
511 "ripple::BaseWSPeer::fail : strand in this thread");
512
513 cancel_timer();
514 if (!ec_ && ec != boost::asio::error::operation_aborted)
515 {
516 ec_ = ec;
517 JLOG(this->j_.trace()) << what << ": " << ec.message();
518 ripple::get_lowest_layer(impl().ws_).socket().close(ec);
519 }
520}
521
522} // namespace ripple
523
524#endif
T back_inserter(T... args)
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:41
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Port const & port_
Definition BasePeer.h:28
endpoint_type remote_address_
Definition BasePeer.h:30
boost::asio::strand< boost::asio::executor > strand_
Definition BasePeer.h:35
Represents an active WebSocket connection.
Definition BaseWSPeer.h:27
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition BaseWSPeer.h:158
boost::asio::ip::tcp::endpoint endpoint_type
Definition BaseWSPeer.h:31
boost::system::error_code error_code
Definition BaseWSPeer.h:30
http_request_type const & request() const override
Definition BaseWSPeer.h:81
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
Definition BaseWSPeer.h:54
void fail(error_code ec, String const &what)
Definition BaseWSPeer.h:507
bool do_close_
The socket has been closed, or will close after the next write finishes.
Definition BaseWSPeer.h:45
void close(boost::beast::websocket::close_reason const &reason) override
Definition BaseWSPeer.h:242
http_request_type request_
Definition BaseWSPeer.h:38
void on_read(error_code const &ec)
Definition BaseWSPeer.h:377
void on_ws_handshake(error_code const &ec)
Definition BaseWSPeer.h:282
boost::beast::multi_buffer wb_
Definition BaseWSPeer.h:40
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition BaseWSPeer.h:453
boost::asio::basic_waitable_timer< clock_type > waitable_timer
Definition BaseWSPeer.h:32
void on_close(error_code const &ec)
Definition BaseWSPeer.h:393
void on_write_fin(error_code const &ec)
Definition BaseWSPeer.h:337
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Definition BaseWSPeer.h:87
Port const & port() const override
Definition BaseWSPeer.h:75
void complete() override
Indicate that the response is complete.
Definition BaseWSPeer.h:271
boost::beast::websocket::close_reason cr_
Definition BaseWSPeer.h:46
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition BaseWSPeer.h:209
void close() override
Definition BaseWSPeer.h:235
std::list< std::shared_ptr< WSMsg > > wq_
Definition BaseWSPeer.h:41
void run() override
Definition BaseWSPeer.h:175
void on_timer(error_code ec)
Definition BaseWSPeer.h:474
void on_ping(error_code const &ec)
Definition BaseWSPeer.h:441
boost::beast::multi_buffer rb_
Definition BaseWSPeer.h:39
waitable_timer timer_
Definition BaseWSPeer.h:47
void on_write(error_code const &ec)
Definition BaseWSPeer.h:303
boost::beast::websocket::ping_data payload_
Definition BaseWSPeer.h:50
T copy(T... args)
T distance(T... args)
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:15
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:62
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safe_cast(Src s) noexcept
Definition safe_cast.h:22
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:14
decltype(auto) get_lowest_layer(T &t) noexcept
Definition LowestLayer.h:15
STL namespace.
T next(T... args)
T reserve(T... args)
Configuration information for a Server listening port.
Definition Port.h:31