rippled
Loading...
Searching...
No Matches
Door.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/contract.h>
5#include <xrpl/server/detail/PlainHTTPPeer.h>
6#include <xrpl/server/detail/SSLHTTPPeer.h>
7#include <xrpl/server/detail/io_list.h>
8
9#include <boost/asio/basic_waitable_timer.hpp>
10#include <boost/asio/buffer.hpp>
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/asio/post.hpp>
14#include <boost/asio/spawn.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/beast/core/detect_ssl.hpp>
17#include <boost/beast/core/multi_buffer.hpp>
18#include <boost/beast/core/tcp_stream.hpp>
19#include <boost/container/flat_map.hpp>
20#include <boost/predef.h>
21
22#if !BOOST_OS_WINDOWS
23#include <sys/resource.h>
24
25#include <dirent.h>
26#include <unistd.h>
27#endif
28
29#include <algorithm>
30#include <chrono>
31#include <cstdint>
32#include <functional>
33#include <memory>
34#include <optional>
35#include <sstream>
36
37namespace xrpl {
38
40template <class Handler>
41class Door : public io_list::work, public std::enable_shared_from_this<Door<Handler>>
42{
43private:
45 using timer_type = boost::asio::basic_waitable_timer<clock_type>;
46 using error_code = boost::system::error_code;
47 using yield_context = boost::asio::yield_context;
48 using protocol_type = boost::asio::ip::tcp;
49 using acceptor_type = protocol_type::acceptor;
50 using endpoint_type = protocol_type::endpoint;
51 using socket_type = boost::asio::ip::tcp::socket;
52 using stream_type = boost::beast::tcp_stream;
53
54 // Detects SSL on a socket
55 class Detector : public io_list::work, public std::enable_shared_from_this<Detector>
56 {
57 private:
58 Port const& port_;
59 Handler& handler_;
60 boost::asio::io_context& ioc_;
64 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
66
67 public:
69 Port const& port,
70 Handler& handler,
71 boost::asio::io_context& ioc,
72 stream_type&& stream,
73 endpoint_type remote_address,
75 void
76 run();
77 void
78 close() override;
79
80 private:
81 void
83 };
84
86 Port const& port_;
87 Handler& handler_;
88 boost::asio::io_context& ioc_;
90 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
91 bool ssl_;
92 bool plain_;
96 boost::asio::steady_timer backoff_timer_;
97 static constexpr double FREE_FD_THRESHOLD = 0.70;
98
104
105 void
106 reOpen();
107
109 query_fd_stats() const;
110
111 bool
113
114public:
115 Door(Handler& handler, boost::asio::io_context& io_context, Port const& port, beast::Journal j);
116
117 // Work-around because we can't call shared_from_this in ctor
118 void
119 run();
120
127 void
128 close() override;
129
132 {
133 return acceptor_.local_endpoint();
134 }
135
136private:
137 template <class ConstBufferSequence>
138 void
139 create(bool ssl, ConstBufferSequence const& buffers, stream_type&& stream, endpoint_type remote_address);
140
141 void
143};
144
145template <class Handler>
147 Port const& port,
148 Handler& handler,
149 boost::asio::io_context& ioc,
150 stream_type&& stream,
151 endpoint_type remote_address,
153 : port_(port)
154 , handler_(handler)
155 , ioc_(ioc)
156 , stream_(std::move(stream))
157 , socket_(stream_.socket())
158 , remote_address_(remote_address)
159 , strand_(boost::asio::make_strand(ioc_))
160 , j_(j)
161{
162}
163
164template <class Handler>
165void
170
171template <class Handler>
172void
174{
175 stream_.close();
176}
177
178template <class Handler>
179void
180Door<Handler>::Detector::do_detect(boost::asio::yield_context do_yield)
181{
182 boost::beast::multi_buffer buf(16);
183 stream_.expires_after(std::chrono::seconds(15));
184 boost::system::error_code ec;
185 bool const ssl = async_detect_ssl(stream_, buf, do_yield[ec]);
186 stream_.expires_never();
187 if (!ec)
188 {
189 if (ssl)
190 {
191 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
192 port_, handler_, ioc_, j_, remote_address_, buf.data(), std::move(stream_)))
193 sp->run();
194 return;
195 }
196 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
197 port_, handler_, ioc_, j_, remote_address_, buf.data(), std::move(stream_)))
198 sp->run();
199 return;
200 }
201 if (ec != boost::asio::error::operation_aborted)
202 {
203 JLOG(j_.trace()) << "Error detecting ssl: " << ec.message() << " from " << remote_address_;
204 }
205}
206
207//------------------------------------------------------------------------------
208
209template <class Handler>
210void
212{
213 error_code ec;
214
215 if (acceptor_.is_open())
216 {
217 acceptor_.close(ec);
218 if (ec)
219 {
221 ss << "Can't close acceptor: " << port_.name << ", " << ec.message();
222 JLOG(j_.error()) << ss.str();
223 Throw<std::runtime_error>(ss.str());
224 }
225 }
226
227 endpoint_type const local_address = endpoint_type(port_.ip, port_.port);
228
229 acceptor_.open(local_address.protocol(), ec);
230 if (ec)
231 {
232 JLOG(j_.error()) << "Open port '" << port_.name << "' failed:" << ec.message();
233 Throw<std::exception>();
234 }
235
236 acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
237 if (ec)
238 {
239 JLOG(j_.error()) << "Option for port '" << port_.name << "' failed:" << ec.message();
240 Throw<std::exception>();
241 }
242
243 acceptor_.bind(local_address, ec);
244 if (ec)
245 {
246 JLOG(j_.error()) << "Bind port '" << port_.name << "' failed:" << ec.message();
247 Throw<std::exception>();
248 }
249
250 acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
251 if (ec)
252 {
253 JLOG(j_.error()) << "Listen on port '" << port_.name << "' failed:" << ec.message();
254 Throw<std::exception>();
255 }
256
257 JLOG(j_.info()) << "Opened " << port_;
258}
259
260template <class Handler>
261Door<Handler>::Door(Handler& handler, boost::asio::io_context& io_context, Port const& port, beast::Journal j)
262 : j_(j)
263 , port_(port)
264 , handler_(handler)
265 , ioc_(io_context)
266 , acceptor_(io_context)
267 , strand_(boost::asio::make_strand(io_context))
268 , ssl_(
269 port_.protocol.count("https") > 0 || port_.protocol.count("wss") > 0 || port_.protocol.count("wss2") > 0 ||
270 port_.protocol.count("peer") > 0)
271 , plain_(port_.protocol.count("http") > 0 || port_.protocol.count("ws") > 0 || port_.protocol.count("ws2"))
272 , backoff_timer_(io_context)
273{
274 reOpen();
275}
276
277template <class Handler>
278void
280{
281 util::spawn(strand_, std::bind(&Door<Handler>::do_accept, this->shared_from_this(), std::placeholders::_1));
282}
283
284template <class Handler>
285void
287{
288 if (!strand_.running_in_this_thread())
289 return boost::asio::post(strand_, std::bind(&Door<Handler>::close, this->shared_from_this()));
290 backoff_timer_.cancel();
291 error_code ec;
292 acceptor_.close(ec);
293}
294
295//------------------------------------------------------------------------------
296
297template <class Handler>
298template <class ConstBufferSequence>
299void
300Door<Handler>::create(bool ssl, ConstBufferSequence const& buffers, stream_type&& stream, endpoint_type remote_address)
301{
302 if (ssl)
303 {
304 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
305 port_, handler_, ioc_, j_, remote_address, buffers, std::move(stream)))
306 sp->run();
307 return;
308 }
309 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
310 port_, handler_, ioc_, j_, remote_address, buffers, std::move(stream)))
311 sp->run();
312}
313
314template <class Handler>
315void
316Door<Handler>::do_accept(boost::asio::yield_context do_yield)
317{
318 while (acceptor_.is_open())
319 {
320 if (should_throttle_for_fds())
321 {
322 backoff_timer_.expires_after(accept_delay_);
323 boost::system::error_code tec;
324 backoff_timer_.async_wait(do_yield[tec]);
325 accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
326 JLOG(j_.warn()) << "Throttling do_accept for " << accept_delay_.count() << "ms.";
327 continue;
328 }
329
330 error_code ec;
331 endpoint_type remote_address;
332 stream_type stream(ioc_);
333 socket_type& socket = stream.socket();
334 acceptor_.async_accept(socket, remote_address, do_yield[ec]);
335 if (ec)
336 {
337 if (ec == boost::asio::error::operation_aborted)
338 break;
339
340 if (ec == boost::asio::error::no_descriptors || ec == boost::asio::error::no_buffer_space)
341 {
342 JLOG(j_.warn()) << "accept: Too many open files. Pausing for " << accept_delay_.count() << "ms.";
343
344 backoff_timer_.expires_after(accept_delay_);
345 boost::system::error_code tec;
346 backoff_timer_.async_wait(do_yield[tec]);
347
348 accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
349 }
350 else
351 {
352 JLOG(j_.error()) << "accept error: " << ec.message();
353 }
354 continue;
355 }
356
357 accept_delay_ = INITIAL_ACCEPT_DELAY;
358
359 if (ssl_ && plain_)
360 {
361 if (auto sp =
362 ios().template emplace<Detector>(port_, handler_, ioc_, std::move(stream), remote_address, j_))
363 sp->run();
364 }
365 else if (ssl_ || plain_)
366 {
367 create(ssl_, boost::asio::null_buffers{}, std::move(stream), remote_address);
368 }
369 }
370}
371
372template <class Handler>
375{
376#if BOOST_OS_WINDOWS
377 return std::nullopt;
378#else
379 FDStats s;
380 struct rlimit rl;
381 if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
382 return std::nullopt;
383 s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
384#if BOOST_OS_LINUX
385 constexpr char const* kFdDir = "/proc/self/fd";
386#else
387 constexpr char const* kFdDir = "/dev/fd";
388#endif
389 if (DIR* d = ::opendir(kFdDir))
390 {
391 std::uint64_t cnt = 0;
392 while (::readdir(d) != nullptr)
393 ++cnt;
394 ::closedir(d);
395 // readdir counts '.', '..', and the DIR* itself shows in the list
396 s.used = (cnt >= 3) ? (cnt - 3) : 0;
397 return s;
398 }
399 return std::nullopt;
400#endif
401}
402
403template <class Handler>
404bool
406{
407#if BOOST_OS_WINDOWS
408 return false;
409#else
410 auto const stats = query_fd_stats();
411 if (!stats || stats->limit == 0)
412 return false;
413
414 auto const& s = *stats;
415 auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
416 double const free_ratio = static_cast<double>(free) / static_cast<double>(s.limit);
417 if (free_ratio < FREE_FD_THRESHOLD)
418 {
419 return true;
420 }
421 return false;
422#endif
423}
424
425} // namespace xrpl
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:318
Stream info() const
Definition Journal.h:306
Stream trace() const
Severity stream access functions.
Definition Journal.h:294
Stream warn() const
Definition Journal.h:312
Handler & handler_
Definition Door.h:59
stream_type stream_
Definition Door.h:61
void do_detect(yield_context yield)
Definition Door.h:180
Detector(Port const &port, Handler &handler, boost::asio::io_context &ioc, stream_type &&stream, endpoint_type remote_address, beast::Journal j)
Definition Door.h:146
endpoint_type remote_address_
Definition Door.h:63
Port const & port_
Definition Door.h:58
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:64
void close() override
Definition Door.h:173
beast::Journal const j_
Definition Door.h:65
boost::asio::io_context & ioc_
Definition Door.h:60
socket_type & socket_
Definition Door.h:62
A listening socket.
Definition Door.h:42
beast::Journal const j_
Definition Door.h:85
protocol_type::acceptor acceptor_type
Definition Door.h:49
void run()
Definition Door.h:279
static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY
Definition Door.h:93
endpoint_type get_endpoint() const
Definition Door.h:131
boost::asio::ip::tcp::socket socket_type
Definition Door.h:51
boost::asio::io_context & ioc_
Definition Door.h:88
static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY
Definition Door.h:94
bool ssl_
Definition Door.h:91
Door(Handler &handler, boost::asio::io_context &io_context, Port const &port, beast::Journal j)
Definition Door.h:261
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:90
acceptor_type acceptor_
Definition Door.h:89
boost::asio::steady_timer backoff_timer_
Definition Door.h:96
bool plain_
Definition Door.h:92
boost::asio::yield_context yield_context
Definition Door.h:47
boost::beast::tcp_stream stream_type
Definition Door.h:52
protocol_type::endpoint endpoint_type
Definition Door.h:50
static constexpr double FREE_FD_THRESHOLD
Definition Door.h:97
boost::system::error_code error_code
Definition Door.h:46
void create(bool ssl, ConstBufferSequence const &buffers, stream_type &&stream, endpoint_type remote_address)
Definition Door.h:300
std::optional< FDStats > query_fd_stats() const
Definition Door.h:374
boost::asio::basic_waitable_timer< clock_type > timer_type
Definition Door.h:45
void close() override
Close the Door listening socket and connections.
Definition Door.h:286
boost::asio::ip::tcp protocol_type
Definition Door.h:48
std::chrono::milliseconds accept_delay_
Definition Door.h:95
bool should_throttle_for_fds()
Definition Door.h:405
Handler & handler_
Definition Door.h:87
void do_accept(yield_context yield)
Definition Door.h:316
Port const & port_
Definition Door.h:86
void reOpen()
Definition Door.h:211
io_list & ios()
Return the io_list associated with the work.
Definition io_list.h:40
T is_same_v
T min(T... args)
STL namespace.
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Definition Spawn.h:65
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
boost::beast::ssl_stream< socket_type > stream_type
Definition Handshake.h:22
T str(T... args)
std::uint64_t limit
Definition Door.h:102
std::uint64_t used
Definition Door.h:101
Configuration information for a Server listening port.
Definition Port.h:30
boost::asio::ip::address ip
Definition Port.h:34
std::string name
Definition Port.h:33
std::uint16_t port
Definition Port.h:35