rippled
Loading...
Searching...
No Matches
Door.h
1#ifndef XRPL_SERVER_DOOR_H_INCLUDED
2#define XRPL_SERVER_DOOR_H_INCLUDED
3
4#include <xrpl/basics/Log.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/server/detail/PlainHTTPPeer.h>
7#include <xrpl/server/detail/SSLHTTPPeer.h>
8#include <xrpl/server/detail/io_list.h>
9
10#include <boost/asio/basic_waitable_timer.hpp>
11#include <boost/asio/buffer.hpp>
12#include <boost/asio/io_context.hpp>
13#include <boost/asio/ip/tcp.hpp>
14#include <boost/asio/post.hpp>
15#include <boost/asio/spawn.hpp>
16#include <boost/asio/steady_timer.hpp>
17#include <boost/beast/core/detect_ssl.hpp>
18#include <boost/beast/core/multi_buffer.hpp>
19#include <boost/beast/core/tcp_stream.hpp>
20#include <boost/container/flat_map.hpp>
21#include <boost/predef.h>
22
23#if !BOOST_OS_WINDOWS
24#include <sys/resource.h>
25
26#include <dirent.h>
27#include <unistd.h>
28#endif
29
30#include <algorithm>
31#include <chrono>
32#include <cstdint>
33#include <functional>
34#include <memory>
35#include <optional>
36#include <sstream>
37
38namespace ripple {
39
41template <class Handler>
42class Door : public io_list::work,
43 public std::enable_shared_from_this<Door<Handler>>
44{
45private:
47 using timer_type = boost::asio::basic_waitable_timer<clock_type>;
48 using error_code = boost::system::error_code;
49 using yield_context = boost::asio::yield_context;
50 using protocol_type = boost::asio::ip::tcp;
51 using acceptor_type = protocol_type::acceptor;
52 using endpoint_type = protocol_type::endpoint;
53 using socket_type = boost::asio::ip::tcp::socket;
54 using stream_type = boost::beast::tcp_stream;
55
56 // Detects SSL on a socket
57 class Detector : public io_list::work,
58 public std::enable_shared_from_this<Detector>
59 {
60 private:
61 Port const& port_;
62 Handler& handler_;
63 boost::asio::io_context& ioc_;
67 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
69
70 public:
72 Port const& port,
73 Handler& handler,
74 boost::asio::io_context& ioc,
75 stream_type&& stream,
76 endpoint_type remote_address,
78 void
79 run();
80 void
81 close() override;
82
83 private:
84 void
86 };
87
89 Port const& port_;
90 Handler& handler_;
91 boost::asio::io_context& ioc_;
93 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
94 bool ssl_;
95 bool plain_;
99 boost::asio::steady_timer backoff_timer_;
100 static constexpr double FREE_FD_THRESHOLD = 0.70;
101
107
108 void
109 reOpen();
110
112 query_fd_stats() const;
113
114 bool
116
117public:
118 Door(
119 Handler& handler,
120 boost::asio::io_context& io_context,
121 Port const& port,
123
124 // Work-around because we can't call shared_from_this in ctor
125 void
126 run();
127
134 void
135 close() override;
136
139 {
140 return acceptor_.local_endpoint();
141 }
142
143private:
144 template <class ConstBufferSequence>
145 void
146 create(
147 bool ssl,
148 ConstBufferSequence const& buffers,
149 stream_type&& stream,
150 endpoint_type remote_address);
151
152 void
154};
155
156template <class Handler>
158 Port const& port,
159 Handler& handler,
160 boost::asio::io_context& ioc,
161 stream_type&& stream,
162 endpoint_type remote_address,
164 : port_(port)
165 , handler_(handler)
166 , ioc_(ioc)
167 , stream_(std::move(stream))
168 , socket_(stream_.socket())
169 , remote_address_(remote_address)
170 , strand_(boost::asio::make_strand(ioc_))
171 , j_(j)
172{
173}
174
175template <class Handler>
176void
178{
180 strand_,
181 std::bind(
183 this->shared_from_this(),
184 std::placeholders::_1));
185}
186
187template <class Handler>
188void
190{
191 stream_.close();
192}
193
194template <class Handler>
195void
196Door<Handler>::Detector::do_detect(boost::asio::yield_context do_yield)
197{
198 boost::beast::multi_buffer buf(16);
199 stream_.expires_after(std::chrono::seconds(15));
200 boost::system::error_code ec;
201 bool const ssl = async_detect_ssl(stream_, buf, do_yield[ec]);
202 stream_.expires_never();
203 if (!ec)
204 {
205 if (ssl)
206 {
207 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
208 port_,
209 handler_,
210 ioc_,
211 j_,
212 remote_address_,
213 buf.data(),
214 std::move(stream_)))
215 sp->run();
216 return;
217 }
218 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
219 port_,
220 handler_,
221 ioc_,
222 j_,
223 remote_address_,
224 buf.data(),
225 std::move(stream_)))
226 sp->run();
227 return;
228 }
229 if (ec != boost::asio::error::operation_aborted)
230 {
231 JLOG(j_.trace()) << "Error detecting ssl: " << ec.message() << " from "
232 << remote_address_;
233 }
234}
235
236//------------------------------------------------------------------------------
237
238template <class Handler>
239void
241{
242 error_code ec;
243
244 if (acceptor_.is_open())
245 {
246 acceptor_.close(ec);
247 if (ec)
248 {
250 ss << "Can't close acceptor: " << port_.name << ", "
251 << ec.message();
252 JLOG(j_.error()) << ss.str();
253 Throw<std::runtime_error>(ss.str());
254 }
255 }
256
257 endpoint_type const local_address = endpoint_type(port_.ip, port_.port);
258
259 acceptor_.open(local_address.protocol(), ec);
260 if (ec)
261 {
262 JLOG(j_.error()) << "Open port '" << port_.name
263 << "' failed:" << ec.message();
264 Throw<std::exception>();
265 }
266
267 acceptor_.set_option(
268 boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
269 if (ec)
270 {
271 JLOG(j_.error()) << "Option for port '" << port_.name
272 << "' failed:" << ec.message();
273 Throw<std::exception>();
274 }
275
276 acceptor_.bind(local_address, ec);
277 if (ec)
278 {
279 JLOG(j_.error()) << "Bind port '" << port_.name
280 << "' failed:" << ec.message();
281 Throw<std::exception>();
282 }
283
284 acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
285 if (ec)
286 {
287 JLOG(j_.error()) << "Listen on port '" << port_.name
288 << "' failed:" << ec.message();
289 Throw<std::exception>();
290 }
291
292 JLOG(j_.info()) << "Opened " << port_;
293}
294
295template <class Handler>
297 Handler& handler,
298 boost::asio::io_context& io_context,
299 Port const& port,
301 : j_(j)
302 , port_(port)
303 , handler_(handler)
304 , ioc_(io_context)
305 , acceptor_(io_context)
306 , strand_(boost::asio::make_strand(io_context))
307 , ssl_(
308 port_.protocol.count("https") > 0 ||
309 port_.protocol.count("wss") > 0 || port_.protocol.count("wss2") > 0 ||
310 port_.protocol.count("peer") > 0)
311 , plain_(
312 port_.protocol.count("http") > 0 || port_.protocol.count("ws") > 0 ||
313 port_.protocol.count("ws2"))
314 , backoff_timer_(io_context)
315{
316 reOpen();
317}
318
319template <class Handler>
320void
322{
324 strand_,
325 std::bind(
327 this->shared_from_this(),
328 std::placeholders::_1));
329}
330
331template <class Handler>
332void
334{
335 if (!strand_.running_in_this_thread())
336 return boost::asio::post(
337 strand_,
338 std::bind(&Door<Handler>::close, this->shared_from_this()));
339 backoff_timer_.cancel();
340 error_code ec;
341 acceptor_.close(ec);
342}
343
344//------------------------------------------------------------------------------
345
346template <class Handler>
347template <class ConstBufferSequence>
348void
350 bool ssl,
351 ConstBufferSequence const& buffers,
352 stream_type&& stream,
353 endpoint_type remote_address)
354{
355 if (ssl)
356 {
357 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
358 port_,
359 handler_,
360 ioc_,
361 j_,
362 remote_address,
363 buffers,
364 std::move(stream)))
365 sp->run();
366 return;
367 }
368 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
369 port_,
370 handler_,
371 ioc_,
372 j_,
373 remote_address,
374 buffers,
375 std::move(stream)))
376 sp->run();
377}
378
379template <class Handler>
380void
381Door<Handler>::do_accept(boost::asio::yield_context do_yield)
382{
383 while (acceptor_.is_open())
384 {
385 if (should_throttle_for_fds())
386 {
387 backoff_timer_.expires_after(accept_delay_);
388 boost::system::error_code tec;
389 backoff_timer_.async_wait(do_yield[tec]);
390 accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
391 JLOG(j_.warn()) << "Throttling do_accept for "
392 << accept_delay_.count() << "ms.";
393 continue;
394 }
395
396 error_code ec;
397 endpoint_type remote_address;
398 stream_type stream(ioc_);
399 socket_type& socket = stream.socket();
400 acceptor_.async_accept(socket, remote_address, do_yield[ec]);
401 if (ec)
402 {
403 if (ec == boost::asio::error::operation_aborted)
404 break;
405
406 if (ec == boost::asio::error::no_descriptors ||
407 ec == boost::asio::error::no_buffer_space)
408 {
409 JLOG(j_.warn()) << "accept: Too many open files. Pausing for "
410 << accept_delay_.count() << "ms.";
411
412 backoff_timer_.expires_after(accept_delay_);
413 boost::system::error_code tec;
414 backoff_timer_.async_wait(do_yield[tec]);
415
416 accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
417 }
418 else
419 {
420 JLOG(j_.error()) << "accept error: " << ec.message();
421 }
422 continue;
423 }
424
425 accept_delay_ = INITIAL_ACCEPT_DELAY;
426
427 if (ssl_ && plain_)
428 {
429 if (auto sp = ios().template emplace<Detector>(
430 port_,
431 handler_,
432 ioc_,
433 std::move(stream),
434 remote_address,
435 j_))
436 sp->run();
437 }
438 else if (ssl_ || plain_)
439 {
440 create(
441 ssl_,
442 boost::asio::null_buffers{},
443 std::move(stream),
444 remote_address);
445 }
446 }
447}
448
449template <class Handler>
452{
453#if BOOST_OS_WINDOWS
454 return std::nullopt;
455#else
456 FDStats s;
457 struct rlimit rl;
458 if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
459 return std::nullopt;
460 s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
461#if BOOST_OS_LINUX
462 constexpr char const* kFdDir = "/proc/self/fd";
463#else
464 constexpr char const* kFdDir = "/dev/fd";
465#endif
466 if (DIR* d = ::opendir(kFdDir))
467 {
468 std::uint64_t cnt = 0;
469 while (::readdir(d) != nullptr)
470 ++cnt;
471 ::closedir(d);
472 // readdir counts '.', '..', and the DIR* itself shows in the list
473 s.used = (cnt >= 3) ? (cnt - 3) : 0;
474 return s;
475 }
476 return std::nullopt;
477#endif
478}
479
480template <class Handler>
481bool
483{
484#if BOOST_OS_WINDOWS
485 return false;
486#else
487 auto const stats = query_fd_stats();
488 if (!stats || stats->limit == 0)
489 return false;
490
491 auto const& s = *stats;
492 auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
493 double const free_ratio =
494 static_cast<double>(free) / static_cast<double>(s.limit);
495 if (free_ratio < FREE_FD_THRESHOLD)
496 {
497 return true;
498 }
499 return false;
500#endif
501}
502
503} // namespace ripple
504
505#endif
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:67
Handler & handler_
Definition Door.h:62
beast::Journal const j_
Definition Door.h:68
socket_type & socket_
Definition Door.h:65
void close() override
Definition Door.h:189
endpoint_type remote_address_
Definition Door.h:66
void do_detect(yield_context yield)
Definition Door.h:196
stream_type stream_
Definition Door.h:64
Port const & port_
Definition Door.h:61
boost::asio::io_context & ioc_
Definition Door.h:63
Detector(Port const &port, Handler &handler, boost::asio::io_context &ioc, stream_type &&stream, endpoint_type remote_address, beast::Journal j)
Definition Door.h:157
A listening socket.
Definition Door.h:44
void do_accept(yield_context yield)
Definition Door.h:381
void create(bool ssl, ConstBufferSequence const &buffers, stream_type &&stream, endpoint_type remote_address)
Definition Door.h:349
static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY
Definition Door.h:96
protocol_type::endpoint endpoint_type
Definition Door.h:52
boost::asio::io_context & ioc_
Definition Door.h:91
Door(Handler &handler, boost::asio::io_context &io_context, Port const &port, beast::Journal j)
Definition Door.h:296
boost::beast::tcp_stream stream_type
Definition Door.h:54
boost::asio::basic_waitable_timer< clock_type > timer_type
Definition Door.h:47
static constexpr double FREE_FD_THRESHOLD
Definition Door.h:100
beast::Journal const j_
Definition Door.h:88
bool should_throttle_for_fds()
Definition Door.h:482
std::optional< FDStats > query_fd_stats() const
Definition Door.h:451
protocol_type::acceptor acceptor_type
Definition Door.h:51
boost::system::error_code error_code
Definition Door.h:48
bool ssl_
Definition Door.h:94
boost::asio::ip::tcp protocol_type
Definition Door.h:50
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:93
boost::asio::steady_timer backoff_timer_
Definition Door.h:99
std::chrono::milliseconds accept_delay_
Definition Door.h:98
void reOpen()
Definition Door.h:240
boost::asio::yield_context yield_context
Definition Door.h:49
Handler & handler_
Definition Door.h:90
void close() override
Close the Door listening socket and connections.
Definition Door.h:333
endpoint_type get_endpoint() const
Definition Door.h:138
Port const & port_
Definition Door.h:89
acceptor_type acceptor_
Definition Door.h:92
void run()
Definition Door.h:321
bool plain_
Definition Door.h:95
boost::asio::ip::tcp::socket socket_type
Definition Door.h:53
static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY
Definition Door.h:97
io_list & ios()
Return the io_list associated with the work.
Definition io_list.h:41
T is_same_v
T min(T... args)
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Definition Spawn.h:68
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
boost::beast::ssl_stream< socket_type > stream_type
Definition Handshake.h:23
STL namespace.
T str(T... args)
std::uint64_t used
Definition Door.h:104
std::uint64_t limit
Definition Door.h:105
Configuration information for a Server listening port.
Definition Port.h:31
std::uint16_t port
Definition Port.h:36
boost::asio::ip::address ip
Definition Port.h:35
std::string name
Definition Port.h:34