rippled
Loading...
Searching...
No Matches
Door.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/contract.h>
5#include <xrpl/server/detail/PlainHTTPPeer.h>
6#include <xrpl/server/detail/SSLHTTPPeer.h>
7#include <xrpl/server/detail/io_list.h>
8
9#include <boost/asio/basic_waitable_timer.hpp>
10#include <boost/asio/buffer.hpp>
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/asio/post.hpp>
14#include <boost/asio/spawn.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/beast/core/detect_ssl.hpp>
17#include <boost/beast/core/multi_buffer.hpp>
18#include <boost/beast/core/tcp_stream.hpp>
19#include <boost/container/flat_map.hpp>
20#include <boost/predef.h>
21
22#if !BOOST_OS_WINDOWS
23#include <sys/resource.h>
24
25#include <dirent.h>
26#include <unistd.h>
27#endif
28
29#include <algorithm>
30#include <chrono>
31#include <cstdint>
32#include <functional>
33#include <memory>
34#include <optional>
35#include <sstream>
36
37namespace xrpl {
38
40template <class Handler>
41class Door : public io_list::work, public std::enable_shared_from_this<Door<Handler>>
42{
43private:
45 using timer_type = boost::asio::basic_waitable_timer<clock_type>;
46 using error_code = boost::system::error_code;
47 using yield_context = boost::asio::yield_context;
48 using protocol_type = boost::asio::ip::tcp;
49 using acceptor_type = protocol_type::acceptor;
50 using endpoint_type = protocol_type::endpoint;
51 using socket_type = boost::asio::ip::tcp::socket;
52 using stream_type = boost::beast::tcp_stream;
53
54 // Detects SSL on a socket
55 class Detector : public io_list::work, public std::enable_shared_from_this<Detector>
56 {
57 private:
58 Port const& port_;
59 Handler& handler_;
60 boost::asio::io_context& ioc_;
64 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
66
67 public:
69 Port const& port,
70 Handler& handler,
71 boost::asio::io_context& ioc,
72 stream_type&& stream,
73 endpoint_type remote_address,
75 void
76 run();
77 void
78 close() override;
79
80 private:
81 void
83 };
84
86 Port const& port_;
87 Handler& handler_;
88 boost::asio::io_context& ioc_;
90 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
91 bool ssl_{
92 port_.protocol.count("https") > 0 || port_.protocol.count("wss") > 0 ||
93 port_.protocol.count("wss2") > 0 || port_.protocol.count("peer") > 0};
94 bool plain_{
95 port_.protocol.count("http") > 0 || port_.protocol.count("ws") > 0 ||
96 port_.protocol.count("ws2")};
100 boost::asio::steady_timer backoff_timer_;
101 static constexpr double FREE_FD_THRESHOLD = 0.70;
102
108
109 void
110 reOpen();
111
113 query_fd_stats() const;
114
115 bool
117
118public:
119 Door(Handler& handler, boost::asio::io_context& io_context, Port const& port, beast::Journal j);
120
121 // Work-around because we can't call shared_from_this in ctor
122 void
123 run();
124
131 void
132 close() override;
133
136 {
137 return acceptor_.local_endpoint();
138 }
139
140private:
141 template <class ConstBufferSequence>
142 void
143 create(
144 bool ssl,
145 ConstBufferSequence const& buffers,
146 stream_type&& stream,
147 endpoint_type remote_address);
148
149 void
151};
152
153template <class Handler>
155 Port const& port,
156 Handler& handler,
157 boost::asio::io_context& ioc,
158 stream_type&& stream,
159 endpoint_type remote_address,
161 : port_(port)
162 , handler_(handler)
163 , ioc_(ioc)
164 , stream_(std::move(stream))
165 , socket_(stream_.socket())
166 , remote_address_(remote_address)
167 , strand_(boost::asio::make_strand(ioc_))
168 , j_(j)
169{
170}
171
172template <class Handler>
173void
175{
177 strand_, std::bind(&Detector::do_detect, this->shared_from_this(), std::placeholders::_1));
178}
179
180template <class Handler>
181void
183{
184 stream_.close();
185}
186
187template <class Handler>
188void
189Door<Handler>::Detector::do_detect(boost::asio::yield_context do_yield)
190{
191 boost::beast::multi_buffer buf(16);
192 stream_.expires_after(std::chrono::seconds(15));
193 boost::system::error_code ec;
194 bool const ssl = async_detect_ssl(stream_, buf, do_yield[ec]);
195 stream_.expires_never();
196 if (!ec)
197 {
198 if (ssl)
199 {
200 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
201 port_, handler_, ioc_, j_, remote_address_, buf.data(), std::move(stream_)))
202 sp->run();
203 return;
204 }
205 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
206 port_, handler_, ioc_, j_, remote_address_, buf.data(), std::move(stream_)))
207 sp->run();
208 return;
209 }
210 if (ec != boost::asio::error::operation_aborted)
211 {
212 JLOG(j_.trace()) << "Error detecting ssl: " << ec.message() << " from " << remote_address_;
213 }
214}
215
216//------------------------------------------------------------------------------
217
218template <class Handler>
219void
221{
222 error_code ec;
223
224 if (acceptor_.is_open())
225 {
226 acceptor_.close(ec);
227 if (ec)
228 {
230 ss << "Can't close acceptor: " << port_.name << ", " << ec.message();
231 JLOG(j_.error()) << ss.str();
232 Throw<std::runtime_error>(ss.str());
233 }
234 }
235
236 endpoint_type const local_address = endpoint_type(port_.ip, port_.port);
237
238 acceptor_.open(local_address.protocol(), ec);
239 if (ec)
240 {
241 JLOG(j_.error()) << "Open port '" << port_.name << "' failed:" << ec.message();
242 Throw<std::exception>();
243 }
244
245 acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
246 if (ec)
247 {
248 JLOG(j_.error()) << "Option for port '" << port_.name << "' failed:" << ec.message();
249 Throw<std::exception>();
250 }
251
252 acceptor_.bind(local_address, ec);
253 if (ec)
254 {
255 JLOG(j_.error()) << "Bind port '" << port_.name << "' failed:" << ec.message();
256 Throw<std::exception>();
257 }
258
259 acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
260 if (ec)
261 {
262 JLOG(j_.error()) << "Listen on port '" << port_.name << "' failed:" << ec.message();
263 Throw<std::exception>();
264 }
265
266 JLOG(j_.info()) << "Opened " << port_;
267}
268
269template <class Handler>
271 Handler& handler,
272 boost::asio::io_context& io_context,
273 Port const& port,
275 : j_(j)
276 , port_(port)
277 , handler_(handler)
278 , ioc_(io_context)
279 , acceptor_(io_context)
280 , strand_(boost::asio::make_strand(io_context))
281 , backoff_timer_(io_context)
282{
283 reOpen();
284}
285
286template <class Handler>
287void
289{
291 strand_,
292 std::bind(&Door<Handler>::do_accept, this->shared_from_this(), std::placeholders::_1));
293}
294
295template <class Handler>
296void
298{
299 if (!strand_.running_in_this_thread())
300 return boost::asio::post(
301 strand_, std::bind(&Door<Handler>::close, this->shared_from_this()));
302 backoff_timer_.cancel();
303 error_code ec;
304 acceptor_.close(ec);
305}
306
307//------------------------------------------------------------------------------
308
309template <class Handler>
310template <class ConstBufferSequence>
311void
313 bool ssl,
314 ConstBufferSequence const& buffers,
315 stream_type&& stream,
316 endpoint_type remote_address)
317{
318 if (ssl)
319 {
320 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
321 port_, handler_, ioc_, j_, remote_address, buffers, std::move(stream)))
322 sp->run();
323 return;
324 }
325 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
326 port_, handler_, ioc_, j_, remote_address, buffers, std::move(stream)))
327 sp->run();
328}
329
330template <class Handler>
331void
332Door<Handler>::do_accept(boost::asio::yield_context do_yield)
333{
334 while (acceptor_.is_open())
335 {
336 if (should_throttle_for_fds())
337 {
338 backoff_timer_.expires_after(accept_delay_);
339 boost::system::error_code tec;
340 backoff_timer_.async_wait(do_yield[tec]);
341 accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
342 JLOG(j_.warn()) << "Throttling do_accept for " << accept_delay_.count() << "ms.";
343 continue;
344 }
345
346 error_code ec;
347 endpoint_type remote_address;
348 stream_type stream(ioc_);
349 socket_type& socket = stream.socket();
350 acceptor_.async_accept(socket, remote_address, do_yield[ec]);
351 if (ec)
352 {
353 if (ec == boost::asio::error::operation_aborted)
354 break;
355
356 if (ec == boost::asio::error::no_descriptors ||
357 ec == boost::asio::error::no_buffer_space)
358 {
359 JLOG(j_.warn()) << "accept: Too many open files. Pausing for "
360 << accept_delay_.count() << "ms.";
361
362 backoff_timer_.expires_after(accept_delay_);
363 boost::system::error_code tec;
364 backoff_timer_.async_wait(do_yield[tec]);
365
366 accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
367 }
368 else
369 {
370 JLOG(j_.error()) << "accept error: " << ec.message();
371 }
372 continue;
373 }
374
375 accept_delay_ = INITIAL_ACCEPT_DELAY;
376
377 if (ssl_ && plain_)
378 {
379 if (auto sp = ios().template emplace<Detector>(
380 port_, handler_, ioc_, std::move(stream), remote_address, j_))
381 sp->run();
382 }
383 else if (ssl_ || plain_)
384 {
385 create(ssl_, boost::asio::null_buffers{}, std::move(stream), remote_address);
386 }
387 }
388}
389
390template <class Handler>
393{
394#if BOOST_OS_WINDOWS
395 return std::nullopt;
396#else
397 FDStats s;
398 struct rlimit rl{};
399 if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
400 return std::nullopt;
401 s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
402#if BOOST_OS_LINUX
403 constexpr char const* kFdDir = "/proc/self/fd";
404#else
405 constexpr char const* kFdDir = "/dev/fd";
406#endif
407 if (DIR* d = ::opendir(kFdDir))
408 {
409 std::uint64_t cnt = 0;
410 while (::readdir(d) != nullptr)
411 ++cnt;
412 ::closedir(d);
413 // readdir counts '.', '..', and the DIR* itself shows in the list
414 s.used = (cnt >= 3) ? (cnt - 3) : 0;
415 return s;
416 }
417 return std::nullopt;
418#endif
419}
420
421template <class Handler>
422bool
424{
425#if BOOST_OS_WINDOWS
426 return false;
427#else
428 auto const stats = query_fd_stats();
429 if (!stats || stats->limit == 0)
430 return false;
431
432 auto const& s = *stats;
433 auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
434 double const free_ratio = static_cast<double>(free) / static_cast<double>(s.limit);
435 if (free_ratio < FREE_FD_THRESHOLD)
436 {
437 return true;
438 }
439 return false;
440#endif
441}
442
443} // namespace xrpl
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
Handler & handler_
Definition Door.h:59
stream_type stream_
Definition Door.h:61
void do_detect(yield_context yield)
Definition Door.h:189
Detector(Port const &port, Handler &handler, boost::asio::io_context &ioc, stream_type &&stream, endpoint_type remote_address, beast::Journal j)
Definition Door.h:154
endpoint_type remote_address_
Definition Door.h:63
Port const & port_
Definition Door.h:58
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:64
void close() override
Definition Door.h:182
beast::Journal const j_
Definition Door.h:65
boost::asio::io_context & ioc_
Definition Door.h:60
socket_type & socket_
Definition Door.h:62
A listening socket.
Definition Door.h:42
beast::Journal const j_
Definition Door.h:85
protocol_type::acceptor acceptor_type
Definition Door.h:49
void run()
Definition Door.h:288
static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY
Definition Door.h:97
endpoint_type get_endpoint() const
Definition Door.h:135
boost::asio::ip::tcp::socket socket_type
Definition Door.h:51
boost::asio::io_context & ioc_
Definition Door.h:88
static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY
Definition Door.h:98
bool ssl_
Definition Door.h:91
Door(Handler &handler, boost::asio::io_context &io_context, Port const &port, beast::Journal j)
Definition Door.h:270
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:90
acceptor_type acceptor_
Definition Door.h:89
boost::asio::steady_timer backoff_timer_
Definition Door.h:100
bool plain_
Definition Door.h:94
boost::asio::yield_context yield_context
Definition Door.h:47
boost::beast::tcp_stream stream_type
Definition Door.h:52
protocol_type::endpoint endpoint_type
Definition Door.h:50
static constexpr double FREE_FD_THRESHOLD
Definition Door.h:101
boost::system::error_code error_code
Definition Door.h:46
void create(bool ssl, ConstBufferSequence const &buffers, stream_type &&stream, endpoint_type remote_address)
Definition Door.h:312
std::optional< FDStats > query_fd_stats() const
Definition Door.h:392
boost::asio::basic_waitable_timer< clock_type > timer_type
Definition Door.h:45
void close() override
Close the Door listening socket and connections.
Definition Door.h:297
boost::asio::ip::tcp protocol_type
Definition Door.h:48
std::chrono::milliseconds accept_delay_
Definition Door.h:99
bool should_throttle_for_fds()
Definition Door.h:423
Handler & handler_
Definition Door.h:87
void do_accept(yield_context yield)
Definition Door.h:332
Port const & port_
Definition Door.h:86
void reOpen()
Definition Door.h:220
io_list & ios()
Return the io_list associated with the work.
Definition io_list.h:40
T count(T... args)
T is_same_v
T min(T... args)
STL namespace.
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Definition Spawn.h:66
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
boost::beast::ssl_stream< socket_type > stream_type
Definition Handshake.h:22
T str(T... args)
std::uint64_t limit
Definition Door.h:106
std::uint64_t used
Definition Door.h:105
Configuration information for a Server listening port.
Definition Port.h:30
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:36
boost::asio::ip::address ip
Definition Port.h:34
std::string name
Definition Port.h:33
std::uint16_t port
Definition Port.h:35