3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/contract.h>
5#include <xrpl/server/detail/PlainHTTPPeer.h>
6#include <xrpl/server/detail/SSLHTTPPeer.h>
7#include <xrpl/server/detail/io_list.h>
9#include <boost/asio/basic_waitable_timer.hpp>
10#include <boost/asio/buffer.hpp>
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/asio/post.hpp>
14#include <boost/asio/spawn.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/beast/core/detect_ssl.hpp>
17#include <boost/beast/core/multi_buffer.hpp>
18#include <boost/beast/core/tcp_stream.hpp>
19#include <boost/container/flat_map.hpp>
20#include <boost/predef.h>
23#include <sys/resource.h>
40template <
class Handler>
45 using timer_type = boost::asio::basic_waitable_timer<clock_type>;
60 boost::asio::io_context&
ioc_;
64 boost::asio::strand<boost::asio::io_context::executor_type>
strand_;
71 boost::asio::io_context& ioc,
88 boost::asio::io_context&
ioc_;
90 boost::asio::strand<boost::asio::io_context::executor_type>
strand_;
141 template <
class ConstBufferSequence>
145 ConstBufferSequence
const& buffers,
153template <
class Handler>
157 boost::asio::io_context& ioc,
164 , stream_(
std::move(stream))
165 , socket_(stream_.socket())
166 , remote_address_(remote_address)
167 , strand_(
boost::asio::make_strand(ioc_))
172template <
class Handler>
180template <
class Handler>
187template <
class Handler>
191 boost::beast::multi_buffer buf(16);
193 boost::system::error_code ec;
194 bool const ssl = async_detect_ssl(stream_, buf, do_yield[ec]);
195 stream_.expires_never();
210 if (ec != boost::asio::error::operation_aborted)
212 JLOG(
j_.
trace()) <<
"Error detecting ssl: " << ec.message() <<
" from " << remote_address_;
218template <
class Handler>
230 ss <<
"Can't close acceptor: " <<
port_.
name <<
", " << ec.message();
232 Throw<std::runtime_error>(ss.
str());
238 acceptor_.open(local_address.protocol(), ec);
241 JLOG(
j_.
error()) <<
"Open port '" <<
port_.
name <<
"' failed:" << ec.message();
242 Throw<std::exception>();
245 acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(
true), ec);
248 JLOG(
j_.
error()) <<
"Option for port '" <<
port_.
name <<
"' failed:" << ec.message();
249 Throw<std::exception>();
255 JLOG(
j_.
error()) <<
"Bind port '" <<
port_.
name <<
"' failed:" << ec.message();
256 Throw<std::exception>();
259 acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
262 JLOG(
j_.
error()) <<
"Listen on port '" <<
port_.
name <<
"' failed:" << ec.message();
263 Throw<std::exception>();
269template <
class Handler>
272 boost::asio::io_context& io_context,
286template <
class Handler>
295template <
class Handler>
299 if (!strand_.running_in_this_thread())
300 return boost::asio::post(
302 backoff_timer_.cancel();
309template <
class Handler>
310template <
class ConstBufferSequence>
314 ConstBufferSequence
const& buffers,
321 port_, handler_, ioc_, j_, remote_address, buffers, std::move(stream)))
326 port_, handler_, ioc_, j_, remote_address, buffers, std::move(stream)))
330template <
class Handler>
334 while (acceptor_.is_open())
336 if (should_throttle_for_fds())
338 backoff_timer_.expires_after(accept_delay_);
339 boost::system::error_code tec;
340 backoff_timer_.async_wait(do_yield[tec]);
341 accept_delay_ =
std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
342 JLOG(j_.
warn()) <<
"Throttling do_accept for " << accept_delay_.count() <<
"ms.";
350 acceptor_.async_accept(socket, remote_address, do_yield[ec]);
353 if (ec == boost::asio::error::operation_aborted)
356 if (ec == boost::asio::error::no_descriptors ||
357 ec == boost::asio::error::no_buffer_space)
359 JLOG(j_.
warn()) <<
"accept: Too many open files. Pausing for "
360 << accept_delay_.count() <<
"ms.";
362 backoff_timer_.expires_after(accept_delay_);
363 boost::system::error_code tec;
364 backoff_timer_.async_wait(do_yield[tec]);
366 accept_delay_ =
std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
370 JLOG(j_.
error()) <<
"accept error: " << ec.message();
375 accept_delay_ = INITIAL_ACCEPT_DELAY;
379 if (
auto sp = ios().
template emplace<Detector>(
380 port_, handler_, ioc_, std::move(stream), remote_address, j_))
383 else if (ssl_ || plain_)
385 create(ssl_, boost::asio::null_buffers{}, std::move(stream), remote_address);
390template <
class Handler>
399 if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
403 constexpr char const* kFdDir =
"/proc/self/fd";
405 constexpr char const* kFdDir =
"/dev/fd";
407 if (DIR* d = ::opendir(kFdDir))
410 while (::readdir(d) !=
nullptr)
414 s.
used = (cnt >= 3) ? (cnt - 3) : 0;
421template <
class Handler>
428 auto const stats = query_fd_stats();
429 if (!stats || stats->limit == 0)
432 auto const& s = *stats;
433 auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
434 double const free_ratio =
static_cast<double>(free) /
static_cast<double>(s.limit);
435 if (free_ratio < FREE_FD_THRESHOLD)
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
void do_detect(yield_context yield)
Detector(Port const &port, Handler &handler, boost::asio::io_context &ioc, stream_type &&stream, endpoint_type remote_address, beast::Journal j)
endpoint_type remote_address_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
boost::asio::io_context & ioc_
protocol_type::acceptor acceptor_type
static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY
endpoint_type get_endpoint() const
boost::asio::ip::tcp::socket socket_type
boost::asio::io_context & ioc_
static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY
Door(Handler &handler, boost::asio::io_context &io_context, Port const &port, beast::Journal j)
boost::asio::strand< boost::asio::io_context::executor_type > strand_
boost::asio::steady_timer backoff_timer_
boost::asio::yield_context yield_context
boost::beast::tcp_stream stream_type
protocol_type::endpoint endpoint_type
static constexpr double FREE_FD_THRESHOLD
boost::system::error_code error_code
void create(bool ssl, ConstBufferSequence const &buffers, stream_type &&stream, endpoint_type remote_address)
std::optional< FDStats > query_fd_stats() const
boost::asio::basic_waitable_timer< clock_type > timer_type
void close() override
Close the Door listening socket and connections.
boost::asio::ip::tcp protocol_type
std::chrono::milliseconds accept_delay_
bool should_throttle_for_fds()
void do_accept(yield_context yield)
io_list & ios()
Return the io_list associated with the work.
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
boost::beast::ssl_stream< socket_type > stream_type
T shared_from_this(T... args)
Configuration information for a Server listening port.
std::set< std::string, boost::beast::iless > protocol
boost::asio::ip::address ip