rippled
Loading...
Searching...
No Matches
BaseWSPeer.h
1#pragma once
2
3#include <xrpl/basics/safe_cast.h>
4#include <xrpl/beast/utility/instrumentation.h>
5#include <xrpl/beast/utility/rngfill.h>
6#include <xrpl/crypto/csprng.h>
7#include <xrpl/protocol/BuildInfo.h>
8#include <xrpl/server/WSSession.h>
9#include <xrpl/server/detail/BasePeer.h>
10#include <xrpl/server/detail/LowestLayer.h>
11
12#include <boost/asio/error.hpp>
13#include <boost/beast/core/multi_buffer.hpp>
14#include <boost/beast/http/message.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/logic/tribool.hpp>
17
18#include <functional>
19#include <list>
20
21namespace xrpl {
22
24template <class Handler, class Impl>
25class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
26{
27protected:
29 using error_code = boost::system::error_code;
30 using endpoint_type = boost::asio::ip::tcp::endpoint;
31 using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
32 using BasePeer<Handler, Impl>::strand_;
33
34private:
35 friend class BasePeer<Handler, Impl>;
36
38 boost::beast::multi_buffer rb_;
39 boost::beast::multi_buffer wb_;
44 bool do_close_ = false;
45 boost::beast::websocket::close_reason cr_;
47 bool close_on_timer_ = false;
48 bool ping_active_ = false;
49 boost::beast::websocket::ping_data payload_;
51 std::function<void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_;
52
53public:
54 template <class Body, class Headers>
56 Port const& port,
57 Handler& handler,
58 boost::asio::executor const& executor,
59 waitable_timer timer,
60 endpoint_type remote_address,
61 boost::beast::http::request<Body, Headers>&& request,
62 beast::Journal journal);
63
64 void
65 run() override;
66
67 //
68 // WSSession
69 //
70
71 Port const&
72 port() const override
73 {
74 return this->port_;
75 }
76
78 request() const override
79 {
80 return this->request_;
81 }
82
83 boost::asio::ip::tcp::endpoint const&
84 remote_endpoint() const override
85 {
86 return this->remote_address_;
87 }
88
89 void
91
92 void
93 close() override;
94
95 void
96 close(boost::beast::websocket::close_reason const& reason) override;
97
98 void
99 complete() override;
100
101protected:
102 Impl&
104 {
105 return *static_cast<Impl*>(this);
106 }
107
108 void
110
111 void
113
114 void
116
117 void
119
120 void
122
123 void
124 on_read(error_code const& ec);
125
126 void
128
129 void
131
132 void
134
135 void
136 on_ping(error_code const& ec);
137
138 void
139 on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload);
140
141 void
143
144 template <class String>
145 void
146 fail(error_code ec, String const& what);
147};
148
149//------------------------------------------------------------------------------
150
151template <class Handler, class Impl>
152template <class Body, class Headers>
154 Port const& port,
155 Handler& handler,
156 boost::asio::executor const& executor,
157 waitable_timer timer,
158 endpoint_type remote_address,
159 boost::beast::http::request<Body, Headers>&& request,
160 beast::Journal journal)
161 : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
162 , request_(std::move(request))
163 , timer_(std::move(timer))
164 , payload_("12345678") // ensures size is 8 bytes
165{
166}
167
168template <class Handler, class Impl>
169void
171{
172 if (!strand_.running_in_this_thread())
173 return post(strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
174 impl().ws_.set_option(port().pmd_options);
175 // Must manage the control callback memory outside of the `control_callback`
176 // function
177 control_callback_ = std::bind(&BaseWSPeer::on_ping_pong, this, std::placeholders::_1, std::placeholders::_2);
178 impl().ws_.control_callback(control_callback_);
179 start_timer();
180 close_on_timer_ = true;
181 impl().ws_.set_option(boost::beast::websocket::stream_base::decorator(
182 [](auto& res) { res.set(boost::beast::http::field::server, BuildInfo::getFullVersionString()); }));
183 impl().ws_.async_accept(
184 request_,
185 bind_executor(
186 strand_, std::bind(&BaseWSPeer::on_ws_handshake, impl().shared_from_this(), std::placeholders::_1)));
187}
188
189template <class Handler, class Impl>
190void
192{
193 if (!strand_.running_in_this_thread())
194 return post(strand_, std::bind(&BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
195 if (do_close_)
196 return;
197 if (wq_.size() > port().ws_queue_limit)
198 {
199 cr_.code = safe_cast<decltype(cr_.code)>(boost::beast::websocket::close_code::policy_error);
200 cr_.reason = "Policy error: client is too slow.";
201 JLOG(this->j_.info()) << cr_.reason;
202 wq_.erase(std::next(wq_.begin()), wq_.end());
203 close(cr_);
204 return;
205 }
206 wq_.emplace_back(std::move(w));
207 if (wq_.size() == 1)
208 on_write({});
209}
210
211template <class Handler, class Impl>
212void
214{
215 close(boost::beast::websocket::close_reason{});
216}
217
218template <class Handler, class Impl>
219void
220BaseWSPeer<Handler, Impl>::close(boost::beast::websocket::close_reason const& reason)
221{
222 if (!strand_.running_in_this_thread())
223 return post(strand_, [self = impl().shared_from_this(), reason] { self->close(reason); });
224 if (do_close_)
225 return;
226 do_close_ = true;
227 if (wq_.empty())
228 {
229 impl().ws_.async_close(
230 reason, bind_executor(strand_, [self = impl().shared_from_this()](boost::beast::error_code const& ec) {
231 self->on_close(ec);
232 }));
233 }
234 else
235 {
236 cr_ = reason;
237 }
238}
239
240template <class Handler, class Impl>
241void
243{
244 if (!strand_.running_in_this_thread())
245 return post(strand_, std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
246 do_read();
247}
248
249template <class Handler, class Impl>
250void
252{
253 if (ec)
254 return fail(ec, "on_ws_handshake");
255 close_on_timer_ = false;
256 do_read();
257}
258
259template <class Handler, class Impl>
260void
262{
263 if (!strand_.running_in_this_thread())
264 return post(strand_, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
265 on_write({});
266}
267
268template <class Handler, class Impl>
269void
271{
272 if (ec)
273 return fail(ec, "write");
274 auto& w = *wq_.front();
275 auto const result = w.prepare(65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
276 if (boost::indeterminate(result.first))
277 return;
278 start_timer();
279 if (!result.first)
280 impl().ws_.async_write_some(
281 static_cast<bool>(result.first),
282 result.second,
283 bind_executor(strand_, std::bind(&BaseWSPeer::on_write, impl().shared_from_this(), std::placeholders::_1)));
284 else
285 impl().ws_.async_write_some(
286 static_cast<bool>(result.first),
287 result.second,
288 bind_executor(
289 strand_, std::bind(&BaseWSPeer::on_write_fin, impl().shared_from_this(), std::placeholders::_1)));
290}
291
292template <class Handler, class Impl>
293void
295{
296 if (ec)
297 return fail(ec, "write_fin");
298 wq_.pop_front();
299 if (do_close_)
300 {
301 impl().ws_.async_close(
302 cr_,
303 bind_executor(strand_, std::bind(&BaseWSPeer::on_close, impl().shared_from_this(), std::placeholders::_1)));
304 }
305 else if (!wq_.empty())
306 on_write({});
307}
308
309template <class Handler, class Impl>
310void
312{
313 if (!strand_.running_in_this_thread())
314 return post(strand_, std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
315 impl().ws_.async_read(
316 rb_, bind_executor(strand_, std::bind(&BaseWSPeer::on_read, impl().shared_from_this(), std::placeholders::_1)));
317}
318
319template <class Handler, class Impl>
320void
322{
323 if (ec == boost::beast::websocket::error::closed)
324 return on_close({});
325 if (ec)
326 return fail(ec, "read");
327 auto const& data = rb_.data();
329 b.reserve(std::distance(data.begin(), data.end()));
330 std::copy(data.begin(), data.end(), std::back_inserter(b));
331 this->handler_.onWSMessage(impl().shared_from_this(), b);
332 rb_.consume(rb_.size());
333}
334
335template <class Handler, class Impl>
336void
338{
339 cancel_timer();
340}
341
342template <class Handler, class Impl>
343void
345{
346 // Max seconds without completing a message
347 static constexpr std::chrono::seconds timeout{30};
348 static constexpr std::chrono::seconds timeoutLocal{3};
349
350 try
351 {
352 timer_.expires_after(remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
353 }
354 catch (boost::system::system_error const& e)
355 {
356 return fail(e.code(), "start_timer");
357 }
358
359 timer_.async_wait(bind_executor(
360 strand_, std::bind(&BaseWSPeer<Handler, Impl>::on_timer, impl().shared_from_this(), std::placeholders::_1)));
361}
362
363// Convenience for discarding the error code
364template <class Handler, class Impl>
365void
367{
368 try
369 {
370 timer_.cancel();
371 }
372 catch (boost::system::system_error const&)
373 {
374 // ignored
375 }
376}
377
378template <class Handler, class Impl>
379void
381{
382 if (ec == boost::asio::error::operation_aborted)
383 return;
384 ping_active_ = false;
385 if (!ec)
386 return;
387 fail(ec, "on_ping");
388}
389
390template <class Handler, class Impl>
391void
392BaseWSPeer<Handler, Impl>::on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
393{
394 if (kind == boost::beast::websocket::frame_type::pong)
395 {
396 boost::beast::string_view p(payload_.begin());
397 if (payload == p)
398 {
399 close_on_timer_ = false;
400 JLOG(this->j_.trace()) << "got matching pong";
401 }
402 else
403 {
404 JLOG(this->j_.trace()) << "got pong";
405 }
406 }
407}
408
409template <class Handler, class Impl>
410void
412{
413 if (ec == boost::asio::error::operation_aborted)
414 return;
415 if (!ec)
416 {
417 if (!close_on_timer_ || !ping_active_)
418 {
419 start_timer();
420 close_on_timer_ = true;
421 ping_active_ = true;
422 // cryptographic is probably overkill..
423 beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
424 impl().ws_.async_ping(
425 payload_,
426 bind_executor(
427 strand_, std::bind(&BaseWSPeer::on_ping, impl().shared_from_this(), std::placeholders::_1)));
428 JLOG(this->j_.trace()) << "sent ping";
429 return;
430 }
431 ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
432 }
433 fail(ec, "timer");
434}
435
436template <class Handler, class Impl>
437template <class String>
438void
440{
441 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::BaseWSPeer::fail : strand in this thread");
442
443 cancel_timer();
444 if (!ec_ && ec != boost::asio::error::operation_aborted)
445 {
446 ec_ = ec;
447 JLOG(this->j_.trace()) << what << ": " << ec.message();
448 xrpl::get_lowest_layer(impl().ws_).socket().close(ec);
449 }
450}
451
452} // namespace xrpl
T back_inserter(T... args)
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:40
Stream info() const
Definition Journal.h:306
Stream trace() const
Severity stream access functions.
Definition Journal.h:294
boost::asio::strand< boost::asio::executor > strand_
Definition BasePeer.h:34
Port const & port_
Definition BasePeer.h:27
endpoint_type remote_address_
Definition BasePeer.h:29
Represents an active WebSocket connection.
Definition BaseWSPeer.h:26
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition BaseWSPeer.h:392
boost::beast::websocket::close_reason cr_
Definition BaseWSPeer.h:45
void on_write_fin(error_code const &ec)
Definition BaseWSPeer.h:294
boost::beast::multi_buffer rb_
Definition BaseWSPeer.h:38
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Definition BaseWSPeer.h:84
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition BaseWSPeer.h:153
void on_ping(error_code const &ec)
Definition BaseWSPeer.h:380
error_code ec_
Definition BaseWSPeer.h:50
boost::asio::ip::tcp::endpoint endpoint_type
Definition BaseWSPeer.h:30
boost::beast::multi_buffer wb_
Definition BaseWSPeer.h:39
void fail(error_code ec, String const &what)
Definition BaseWSPeer.h:439
void on_close(error_code const &ec)
Definition BaseWSPeer.h:337
void close(boost::beast::websocket::close_reason const &reason) override
Definition BaseWSPeer.h:220
Port const & port() const override
Definition BaseWSPeer.h:72
bool do_close_
The socket has been closed, or will close after the next write finishes.
Definition BaseWSPeer.h:44
void on_read(error_code const &ec)
Definition BaseWSPeer.h:321
boost::system::error_code error_code
Definition BaseWSPeer.h:29
boost::asio::basic_waitable_timer< clock_type > waitable_timer
Definition BaseWSPeer.h:31
boost::beast::websocket::ping_data payload_
Definition BaseWSPeer.h:49
void on_ws_handshake(error_code const &ec)
Definition BaseWSPeer.h:251
void run() override
Definition BaseWSPeer.h:170
void close() override
Definition BaseWSPeer.h:213
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition BaseWSPeer.h:191
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
Definition BaseWSPeer.h:51
http_request_type request_
Definition BaseWSPeer.h:37
waitable_timer timer_
Definition BaseWSPeer.h:46
void on_write(error_code const &ec)
Definition BaseWSPeer.h:270
std::list< std::shared_ptr< WSMsg > > wq_
Definition BaseWSPeer.h:40
void complete() override
Indicate that the response is complete.
Definition BaseWSPeer.h:242
http_request_type const & request() const override
Definition BaseWSPeer.h:78
void on_timer(error_code ec)
Definition BaseWSPeer.h:411
T copy(T... args)
T distance(T... args)
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:14
STL namespace.
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:64
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
decltype(auto) get_lowest_layer(T &t) noexcept
Definition LowestLayer.h:14
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safe_cast(Src s) noexcept
Definition safe_cast.h:19
T next(T... args)
T reserve(T... args)
Configuration information for a Server listening port.
Definition Port.h:30