rippled
Loading...
Searching...
No Matches
BaseWSPeer.h
1#pragma once
2
3#include <xrpl/basics/safe_cast.h>
4#include <xrpl/beast/utility/instrumentation.h>
5#include <xrpl/beast/utility/rngfill.h>
6#include <xrpl/crypto/csprng.h>
7#include <xrpl/protocol/BuildInfo.h>
8#include <xrpl/server/WSSession.h>
9#include <xrpl/server/detail/BasePeer.h>
10#include <xrpl/server/detail/LowestLayer.h>
11
12#include <boost/asio/error.hpp>
13#include <boost/beast/core/multi_buffer.hpp>
14#include <boost/beast/http/message.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/logic/tribool.hpp>
17
18#include <functional>
19#include <list>
20
21namespace xrpl {
22
24template <class Handler, class Impl>
25class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
26{
27protected:
29 using error_code = boost::system::error_code;
30 using endpoint_type = boost::asio::ip::tcp::endpoint;
31 using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
32 using BasePeer<Handler, Impl>::strand_;
33
34private:
35 friend class BasePeer<Handler, Impl>;
36
38 boost::beast::multi_buffer rb_;
39 boost::beast::multi_buffer wb_;
44 bool do_close_ = false;
45 boost::beast::websocket::close_reason cr_;
47 bool close_on_timer_ = false;
48 bool ping_active_ = false;
49 boost::beast::websocket::ping_data payload_;
51 std::function<void(boost::beast::websocket::frame_type, boost::beast::string_view)>
53
54public:
55 template <class Body, class Headers>
57 Port const& port,
58 Handler& handler,
59 boost::asio::executor const& executor,
60 waitable_timer timer,
61 endpoint_type remote_address,
62 boost::beast::http::request<Body, Headers>&& request,
63 beast::Journal journal);
64
65 void
66 run() override;
67
68 //
69 // WSSession
70 //
71
72 Port const&
73 port() const override
74 {
75 return this->port_;
76 }
77
79 request() const override
80 {
81 return this->request_;
82 }
83
84 boost::asio::ip::tcp::endpoint const&
85 remote_endpoint() const override
86 {
87 return this->remote_address_;
88 }
89
90 void
92
93 void
94 close() override;
95
96 void
97 close(boost::beast::websocket::close_reason const& reason) override;
98
99 void
100 complete() override;
101
102protected:
103 Impl&
105 {
106 return *static_cast<Impl*>(this);
107 }
108
109 void
111
112 void
114
115 void
117
118 void
120
121 void
123
124 void
125 on_read(error_code const& ec);
126
127 void
129
130 void
132
133 void
135
136 void
137 on_ping(error_code const& ec);
138
139 void
140 on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload);
141
142 void
144
145 template <class String>
146 void
147 fail(error_code ec, String const& what);
148};
149
150//------------------------------------------------------------------------------
151
152template <class Handler, class Impl>
153template <class Body, class Headers>
155 Port const& port,
156 Handler& handler,
157 boost::asio::executor const& executor,
158 waitable_timer timer,
159 endpoint_type remote_address,
160 boost::beast::http::request<Body, Headers>&& request,
161 beast::Journal journal)
162 : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
163 , request_(std::move(request))
164 , timer_(std::move(timer))
165 , payload_("12345678") // ensures size is 8 bytes
166{
167}
168
169template <class Handler, class Impl>
170void
172{
173 if (!strand_.running_in_this_thread())
174 return post(strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
175 impl().ws_.set_option(port().pmd_options);
176 // Must manage the control callback memory outside of the `control_callback`
177 // function
178 control_callback_ =
179 std::bind(&BaseWSPeer::on_ping_pong, this, std::placeholders::_1, std::placeholders::_2);
180 impl().ws_.control_callback(control_callback_);
181 start_timer();
182 close_on_timer_ = true;
183 impl().ws_.set_option(boost::beast::websocket::stream_base::decorator([](auto& res) {
184 res.set(boost::beast::http::field::server, BuildInfo::getFullVersionString());
185 }));
186 impl().ws_.async_accept(
187 request_,
188 bind_executor(
189 strand_,
190 std::bind(
191 &BaseWSPeer::on_ws_handshake, impl().shared_from_this(), std::placeholders::_1)));
192}
193
194template <class Handler, class Impl>
195void
197{
198 if (!strand_.running_in_this_thread())
199 return post(strand_, std::bind(&BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
200 if (do_close_)
201 return;
202 if (wq_.size() > port().ws_queue_limit)
203 {
204 cr_.code = safe_cast<decltype(cr_.code)>(boost::beast::websocket::close_code::policy_error);
205 cr_.reason = "Policy error: client is too slow.";
206 JLOG(this->j_.info()) << cr_.reason;
207 wq_.erase(std::next(wq_.begin()), wq_.end());
208 close(cr_);
209 return;
210 }
211 wq_.emplace_back(std::move(w));
212 if (wq_.size() == 1)
213 on_write({});
214}
215
216template <class Handler, class Impl>
217void
219{
220 close(boost::beast::websocket::close_reason{});
221}
222
223template <class Handler, class Impl>
224void
225BaseWSPeer<Handler, Impl>::close(boost::beast::websocket::close_reason const& reason)
226{
227 if (!strand_.running_in_this_thread())
228 return post(strand_, [self = impl().shared_from_this(), reason] { self->close(reason); });
229 if (do_close_)
230 return;
231 do_close_ = true;
232 if (wq_.empty())
233 {
234 impl().ws_.async_close(
235 reason,
236 bind_executor(
237 strand_, [self = impl().shared_from_this()](boost::beast::error_code const& ec) {
238 self->on_close(ec);
239 }));
240 }
241 else
242 {
243 cr_ = reason;
244 }
245}
246
247template <class Handler, class Impl>
248void
250{
251 if (!strand_.running_in_this_thread())
252 return post(strand_, std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
253 do_read();
254}
255
256template <class Handler, class Impl>
257void
259{
260 if (ec)
261 return fail(ec, "on_ws_handshake");
262 close_on_timer_ = false;
263 do_read();
264}
265
266template <class Handler, class Impl>
267void
269{
270 if (!strand_.running_in_this_thread())
271 return post(strand_, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
272 on_write({});
273}
274
275template <class Handler, class Impl>
276void
278{
279 if (ec)
280 return fail(ec, "write");
281 auto& w = *wq_.front();
282 auto const result =
283 w.prepare(65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
284 if (boost::indeterminate(result.first))
285 return;
286 start_timer();
287 if (!result.first)
288 impl().ws_.async_write_some(
289 static_cast<bool>(result.first),
290 result.second,
291 bind_executor(
292 strand_,
293 std::bind(
294 &BaseWSPeer::on_write, impl().shared_from_this(), std::placeholders::_1)));
295 else
296 impl().ws_.async_write_some(
297 static_cast<bool>(result.first),
298 result.second,
299 bind_executor(
300 strand_,
301 std::bind(
302 &BaseWSPeer::on_write_fin, impl().shared_from_this(), std::placeholders::_1)));
303}
304
305template <class Handler, class Impl>
306void
308{
309 if (ec)
310 return fail(ec, "write_fin");
311 wq_.pop_front();
312 if (do_close_)
313 {
314 impl().ws_.async_close(
315 cr_,
316 bind_executor(
317 strand_,
318 std::bind(
319 &BaseWSPeer::on_close, impl().shared_from_this(), std::placeholders::_1)));
320 }
321 else if (!wq_.empty())
322 on_write({});
323}
324
325template <class Handler, class Impl>
326void
328{
329 if (!strand_.running_in_this_thread())
330 return post(strand_, std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
331 impl().ws_.async_read(
332 rb_,
333 bind_executor(
334 strand_,
335 std::bind(&BaseWSPeer::on_read, impl().shared_from_this(), std::placeholders::_1)));
336}
337
338template <class Handler, class Impl>
339void
341{
342 if (ec == boost::beast::websocket::error::closed)
343 return on_close({});
344 if (ec)
345 return fail(ec, "read");
346 auto const& data = rb_.data();
348 b.reserve(std::distance(data.begin(), data.end()));
349 std::copy(data.begin(), data.end(), std::back_inserter(b));
350 this->handler_.onWSMessage(impl().shared_from_this(), b);
351 rb_.consume(rb_.size());
352}
353
354template <class Handler, class Impl>
355void
357{
358 cancel_timer();
359}
360
361template <class Handler, class Impl>
362void
364{
365 // Max seconds without completing a message
366 static constexpr std::chrono::seconds timeout{30};
367 static constexpr std::chrono::seconds timeoutLocal{3};
368
369 try
370 {
371 timer_.expires_after(remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
372 }
373 catch (boost::system::system_error const& e)
374 {
375 return fail(e.code(), "start_timer");
376 }
377
378 timer_.async_wait(bind_executor(
379 strand_,
380 std::bind(
382 impl().shared_from_this(),
383 std::placeholders::_1)));
384}
385
386// Convenience for discarding the error code
387template <class Handler, class Impl>
388void
390{
391 try
392 {
393 timer_.cancel();
394 }
395 catch (boost::system::system_error const&)
396 {
397 // ignored
398 }
399}
400
401template <class Handler, class Impl>
402void
404{
405 if (ec == boost::asio::error::operation_aborted)
406 return;
407 ping_active_ = false;
408 if (!ec)
409 return;
410 fail(ec, "on_ping");
411}
412
413template <class Handler, class Impl>
414void
416 boost::beast::websocket::frame_type kind,
417 boost::beast::string_view payload)
418{
419 if (kind == boost::beast::websocket::frame_type::pong)
420 {
421 boost::beast::string_view const p(payload_.begin());
422 if (payload == p)
423 {
424 close_on_timer_ = false;
425 JLOG(this->j_.trace()) << "got matching pong";
426 }
427 else
428 {
429 JLOG(this->j_.trace()) << "got pong";
430 }
431 }
432}
433
434template <class Handler, class Impl>
435void
437{
438 if (ec == boost::asio::error::operation_aborted)
439 return;
440 if (!ec)
441 {
442 if (!close_on_timer_ || !ping_active_)
443 {
444 start_timer();
445 close_on_timer_ = true;
446 ping_active_ = true;
447 // cryptographic is probably overkill..
448 beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
449 impl().ws_.async_ping(
450 payload_,
451 bind_executor(
452 strand_,
453 std::bind(
454 &BaseWSPeer::on_ping, impl().shared_from_this(), std::placeholders::_1)));
455 JLOG(this->j_.trace()) << "sent ping";
456 return;
457 }
458 ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
459 }
460 fail(ec, "timer");
461}
462
463template <class Handler, class Impl>
464template <class String>
465void
467{
468 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::BaseWSPeer::fail : strand in this thread");
469
470 cancel_timer();
471 if (!ec_ && ec != boost::asio::error::operation_aborted)
472 {
473 ec_ = ec;
474 JLOG(this->j_.trace()) << what << ": " << ec.message();
475 xrpl::get_lowest_layer(impl().ws_).socket().close(ec);
476 }
477}
478
479} // namespace xrpl
T back_inserter(T... args)
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:40
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
boost::asio::strand< boost::asio::executor > strand_
Definition BasePeer.h:34
Port const & port_
Definition BasePeer.h:27
endpoint_type remote_address_
Definition BasePeer.h:29
Represents an active WebSocket connection.
Definition BaseWSPeer.h:26
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition BaseWSPeer.h:415
boost::beast::websocket::close_reason cr_
Definition BaseWSPeer.h:45
void on_write_fin(error_code const &ec)
Definition BaseWSPeer.h:307
boost::beast::multi_buffer rb_
Definition BaseWSPeer.h:38
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Definition BaseWSPeer.h:85
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition BaseWSPeer.h:154
void on_ping(error_code const &ec)
Definition BaseWSPeer.h:403
error_code ec_
Definition BaseWSPeer.h:50
boost::asio::ip::tcp::endpoint endpoint_type
Definition BaseWSPeer.h:30
boost::beast::multi_buffer wb_
Definition BaseWSPeer.h:39
void fail(error_code ec, String const &what)
Definition BaseWSPeer.h:466
void on_close(error_code const &ec)
Definition BaseWSPeer.h:356
void close(boost::beast::websocket::close_reason const &reason) override
Definition BaseWSPeer.h:225
Port const & port() const override
Definition BaseWSPeer.h:73
bool do_close_
The socket has been closed, or will close after the next write finishes.
Definition BaseWSPeer.h:44
void on_read(error_code const &ec)
Definition BaseWSPeer.h:340
boost::system::error_code error_code
Definition BaseWSPeer.h:29
boost::asio::basic_waitable_timer< clock_type > waitable_timer
Definition BaseWSPeer.h:31
boost::beast::websocket::ping_data payload_
Definition BaseWSPeer.h:49
void on_ws_handshake(error_code const &ec)
Definition BaseWSPeer.h:258
void run() override
Definition BaseWSPeer.h:171
void close() override
Definition BaseWSPeer.h:218
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition BaseWSPeer.h:196
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
Definition BaseWSPeer.h:52
http_request_type request_
Definition BaseWSPeer.h:37
waitable_timer timer_
Definition BaseWSPeer.h:46
void on_write(error_code const &ec)
Definition BaseWSPeer.h:277
std::list< std::shared_ptr< WSMsg > > wq_
Definition BaseWSPeer.h:40
void complete() override
Indicate that the response is complete.
Definition BaseWSPeer.h:249
http_request_type const & request() const override
Definition BaseWSPeer.h:79
void on_timer(error_code ec)
Definition BaseWSPeer.h:436
T copy(T... args)
T distance(T... args)
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:14
STL namespace.
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:82
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
decltype(auto) get_lowest_layer(T &t) noexcept
Definition LowestLayer.h:14
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safe_cast(Src s) noexcept
Definition safe_cast.h:21
T next(T... args)
T reserve(T... args)
Configuration information for a Server listening port.
Definition Port.h:30