xrpld
Loading...
Searching...
No Matches
Door.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/contract.h>
5#include <xrpl/server/detail/PlainHTTPPeer.h>
6#include <xrpl/server/detail/SSLHTTPPeer.h>
7#include <xrpl/server/detail/io_list.h>
8
9#include <boost/asio/basic_waitable_timer.hpp>
10#include <boost/asio/buffer.hpp>
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/asio/post.hpp>
14#include <boost/asio/spawn.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/beast/core/detect_ssl.hpp>
17#include <boost/beast/core/multi_buffer.hpp>
18#include <boost/beast/core/tcp_stream.hpp>
19#include <boost/container/flat_map.hpp>
20#include <boost/predef.h>
21
22#if !BOOST_OS_WINDOWS
23#include <sys/resource.h>
24
25#include <dirent.h>
26#endif
27
28#include <algorithm>
29#include <chrono>
30#include <cstdint>
31#include <functional>
32#include <memory>
33#include <optional>
34#include <sstream>
35#include <utility>
36
37namespace xrpl {
38
40template <class Handler>
41class Door : public IOList::Work, public std::enable_shared_from_this<Door<Handler>>
42{
43private:
45 using timer_type = boost::asio::basic_waitable_timer<clock_type>;
46 using error_code = boost::system::error_code;
47 using yield_context = boost::asio::yield_context;
48 using protocol_type = boost::asio::ip::tcp;
49 using acceptor_type = protocol_type::acceptor;
50 using endpoint_type = protocol_type::endpoint;
51 using socket_type = boost::asio::ip::tcp::socket;
52 using stream_type = boost::beast::tcp_stream;
53
54 // Detects SSL on a socket
55 class Detector : public IOList::Work, public std::enable_shared_from_this<Detector>
56 {
57 private:
58 Port const& port_;
59 Handler& handler_;
60 boost::asio::io_context& ioc_;
64 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
66
67 public:
69 Port const& port,
70 Handler& handler,
71 boost::asio::io_context& ioc,
72 stream_type&& stream,
73 endpoint_type remoteAddress,
75 void
76 run();
77 void
78 close() override;
79
80 private:
81 void
83 };
84
86 Port const& port_;
87 Handler& handler_;
88 boost::asio::io_context& ioc_;
90 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
91 bool ssl_{
92 port_.protocol.contains("https") || port_.protocol.contains("wss") ||
93 port_.protocol.contains("wss2") || port_.protocol.contains("peer")};
94 bool plain_{
95 port_.protocol.contains("http") || port_.protocol.contains("ws") ||
96 (port_.protocol.contains("ws2"))};
100 boost::asio::steady_timer backoffTimer_;
101 static constexpr std::uint64_t kMaxUsedFdPercent = 70;
103 clock_type::time_point fdSampleAt_;
104 bool cachedThrottle_{false};
105
111
112 void
113 reOpen();
114
116 queryFdStats() const;
117
118 bool
120
121public:
122 Door(Handler& handler, boost::asio::io_context& ioContext, Port const& port, beast::Journal j);
123
124 // Work-around because we can't call shared_from_this in ctor
125 void
126 run();
127
134 void
135 close() override;
136
137 [[nodiscard]] endpoint_type
139 {
140 return acceptor_.local_endpoint();
141 }
142
143private:
144 template <class ConstBufferSequence>
145 void
146 create(
147 bool ssl,
148 ConstBufferSequence const& buffers,
149 stream_type&& stream,
150 endpoint_type remoteAddress);
151
152 void
153 doAccept(yield_context yield);
154};
155
156template <class Handler>
158 Port const& port,
159 Handler& handler,
160 boost::asio::io_context& ioc,
161 stream_type&& stream,
162 endpoint_type remoteAddress,
164 : port_(port)
165 , handler_(handler)
166 , ioc_(ioc)
167 , stream_(std::move(stream))
168 , socket_(stream_.socket())
169 , remoteAddress_(std::move(remoteAddress))
170 , strand_(boost::asio::make_strand(ioc_))
171 , j_(j)
172{
173}
174
175template <class Handler>
176void
178{
180 strand_, std::bind(&Detector::doDetect, this->shared_from_this(), std::placeholders::_1));
181}
182
183template <class Handler>
184void
189
190template <class Handler>
191void
192Door<Handler>::Detector::doDetect(boost::asio::yield_context doYield)
193{
194 boost::beast::multi_buffer buf(16);
195 stream_.expires_after(std::chrono::seconds(15));
196 boost::system::error_code ec;
197 bool const ssl = async_detect_ssl(stream_, buf, doYield[ec]);
198 stream_.expires_never();
199 if (!ec)
200 {
201 if (ssl)
202 {
203 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
204 port_, handler_, ioc_, j_, remoteAddress_, buf.data(), std::move(stream_)))
205 sp->run();
206 return;
207 }
208 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
209 port_, handler_, ioc_, j_, remoteAddress_, buf.data(), std::move(stream_)))
210 sp->run();
211 return;
212 }
213 if (ec != boost::asio::error::operation_aborted)
214 {
215 JLOG(j_.trace()) << "Error detecting ssl: " << ec.message() << " from " << remoteAddress_;
216 }
217}
218
219//------------------------------------------------------------------------------
220
221template <class Handler>
222void
224{
225 error_code ec;
226
227 if (acceptor_.is_open())
228 {
229 acceptor_.close(ec);
230 if (ec)
231 {
233 ss << "Can't close acceptor: " << port_.name << ", " << ec.message();
234 JLOG(j_.error()) << ss.str();
236 }
237 }
238
239 endpoint_type const localAddress = endpoint_type(port_.ip, port_.port);
240
241 acceptor_.open(localAddress.protocol(), ec);
242 if (ec)
243 {
244 JLOG(j_.error()) << "Open port '" << port_.name << "' failed:" << ec.message();
246 }
247
248 acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
249 if (ec)
250 {
251 JLOG(j_.error()) << "Option for port '" << port_.name << "' failed:" << ec.message();
253 }
254
255 acceptor_.bind(localAddress, ec);
256 if (ec)
257 {
258 JLOG(j_.error()) << "Bind port '" << port_.name << "' failed:" << ec.message();
260 }
261
262 acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
263 if (ec)
264 {
265 JLOG(j_.error()) << "Listen on port '" << port_.name << "' failed:" << ec.message();
267 }
268
269 JLOG(j_.info()) << "Opened " << port_;
270}
271
272template <class Handler>
274 Handler& handler,
275 boost::asio::io_context& ioContext,
276 Port const& port,
278 : j_(j)
279 , port_(port)
280 , handler_(handler)
281 , ioc_(ioContext)
282 , acceptor_(ioContext)
283 , strand_(boost::asio::make_strand(ioContext))
284 , backoffTimer_(ioContext)
286{
287 reOpen();
288}
289
290template <class Handler>
291void
293{
295 strand_,
296 std::bind(&Door<Handler>::doAccept, this->shared_from_this(), std::placeholders::_1));
297}
298
299template <class Handler>
300void
302{
303 if (!strand_.running_in_this_thread())
304 {
305 return boost::asio::post(
307 }
308 backoffTimer_.cancel();
309 error_code ec;
310 acceptor_.close(ec);
311}
312
313//------------------------------------------------------------------------------
314
315template <class Handler>
316template <class ConstBufferSequence>
317void
319 bool ssl,
320 ConstBufferSequence const& buffers,
321 stream_type&& stream,
322 endpoint_type remoteAddress)
323{
324 if (ssl)
325 {
326 if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
327 port_, handler_, ioc_, j_, remoteAddress, buffers, std::move(stream)))
328 sp->run();
329 return;
330 }
331 if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
332 port_, handler_, ioc_, j_, remoteAddress, buffers, std::move(stream)))
333 sp->run();
334}
335
336template <class Handler>
337void
338Door<Handler>::doAccept(boost::asio::yield_context doYield)
339{
340 while (acceptor_.is_open())
341 {
343 {
344 JLOG(j_.warn()) << "Throttling do_accept for " << acceptDelay_.count() << "ms.";
345 backoffTimer_.expires_after(acceptDelay_);
346 boost::system::error_code tec;
347 backoffTimer_.async_wait(doYield[tec]);
349 continue;
350 }
351
352 error_code ec;
353 endpoint_type remoteAddress;
354 stream_type stream(ioc_);
355 socket_type& socket = stream.socket();
356 acceptor_.async_accept(socket, remoteAddress, doYield[ec]);
357 if (ec)
358 {
359 if (ec == boost::asio::error::operation_aborted)
360 break;
361
362 if (ec == boost::asio::error::no_descriptors ||
363 ec == boost::asio::error::no_buffer_space)
364 {
365 char const* const cause = (ec == boost::asio::error::no_descriptors)
366 ? "too many open files"
367 : "kernel buffer space exhausted";
368 JLOG(j_.warn()) << "accept: " << cause << ". Pausing for " << acceptDelay_.count()
369 << "ms.";
370
371 backoffTimer_.expires_after(acceptDelay_);
372 boost::system::error_code tec;
373 backoffTimer_.async_wait(doYield[tec]);
374
376 }
377 else
378 {
379 JLOG(j_.error()) << "accept error: " << ec.message();
380 }
381 continue;
382 }
383
385
386 if (ssl_ && plain_)
387 {
388 if (auto sp = ios().template emplace<Detector>(
389 port_, handler_, ioc_, std::move(stream), remoteAddress, j_))
390 sp->run();
391 }
392 else if (ssl_ || plain_)
393 {
394 create(ssl_, boost::asio::null_buffers{}, std::move(stream), remoteAddress);
395 }
396 }
397}
398
399template <class Handler>
402{
403#if BOOST_OS_WINDOWS
404 return std::nullopt;
405#else
406 FDStats s;
407 struct rlimit rl{};
408 if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
409 return std::nullopt;
410 s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
411#if BOOST_OS_LINUX
412 static constexpr char const* kFdDir = "/proc/self/fd";
413#else
414 static constexpr char const* kFdDir = "/dev/fd";
415#endif
416 if (DIR* d = ::opendir(kFdDir))
417 {
418 std::uint64_t cnt = 0;
419 while (::readdir(d) != nullptr)
420 ++cnt;
421 ::closedir(d);
422 // readdir counts '.', '..', and the DIR* itself shows in the list
423 s.used = (cnt >= 3) ? (cnt - 3) : 0;
424 return s;
425 }
426 return std::nullopt;
427#endif
428}
429
430template <class Handler>
431bool
433{
434#if BOOST_OS_WINDOWS
435 return false;
436#else
437 auto const now = clock_type::now();
438 if (now - fdSampleAt_ < kFdSampleInterval)
439 return cachedThrottle_;
440
441 fdSampleAt_ = now;
442 auto const stats = queryFdStats();
444 stats && stats->limit > 0 && stats->used * 100 > stats->limit * kMaxUsedFdPercent;
445 return cachedThrottle_;
446#endif
447}
448
449} // namespace xrpl
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:38
Handler & handler_
Definition Door.h:59
stream_type stream_
Definition Door.h:61
Port const & port_
Definition Door.h:58
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:64
void close() override
Definition Door.h:185
beast::Journal const j_
Definition Door.h:65
void doDetect(yield_context yield)
Definition Door.h:192
boost::asio::io_context & ioc_
Definition Door.h:60
socket_type & socket_
Definition Door.h:62
endpoint_type remoteAddress_
Definition Door.h:63
Detector(Port const &port, Handler &handler, boost::asio::io_context &ioc, stream_type &&stream, endpoint_type remoteAddress, beast::Journal j)
Definition Door.h:157
beast::Journal const j_
Definition Door.h:85
protocol_type::acceptor acceptor_type
Definition Door.h:49
void run()
Definition Door.h:292
boost::asio::ip::tcp::socket socket_type
Definition Door.h:51
boost::asio::io_context & ioc_
Definition Door.h:88
bool ssl_
Definition Door.h:91
static constexpr std::chrono::milliseconds kFdSampleInterval
Definition Door.h:102
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition Door.h:90
void create(bool ssl, ConstBufferSequence const &buffers, stream_type &&stream, endpoint_type remoteAddress)
Definition Door.h:318
acceptor_type acceptor_
Definition Door.h:89
bool plain_
Definition Door.h:94
endpoint_type getEndpoint() const
Definition Door.h:138
boost::asio::yield_context yield_context
Definition Door.h:47
boost::beast::tcp_stream stream_type
Definition Door.h:52
static constexpr std::uint64_t kMaxUsedFdPercent
Definition Door.h:101
protocol_type::endpoint endpoint_type
Definition Door.h:50
void doAccept(yield_context yield)
Definition Door.h:338
static constexpr std::chrono::milliseconds kMaxAcceptDelay
Definition Door.h:98
boost::system::error_code error_code
Definition Door.h:46
std::chrono::milliseconds acceptDelay_
Definition Door.h:99
boost::asio::basic_waitable_timer< clock_type > timer_type
Definition Door.h:45
void close() override
Close the Door listening socket and connections.
Definition Door.h:301
clock_type::time_point fdSampleAt_
Definition Door.h:103
Door(Handler &handler, boost::asio::io_context &ioContext, Port const &port, beast::Journal j)
Definition Door.h:273
bool shouldThrottleForFds()
Definition Door.h:432
boost::asio::ip::tcp protocol_type
Definition Door.h:48
std::optional< FDStats > queryFdStats() const
Definition Door.h:401
Handler & handler_
Definition Door.h:87
std::chrono::steady_clock clock_type
Definition Door.h:44
Port const & port_
Definition Door.h:86
bool cachedThrottle_
Definition Door.h:104
void reOpen()
Definition Door.h:223
static constexpr std::chrono::milliseconds kInitialAcceptDelay
Definition Door.h:97
boost::asio::steady_timer backoffTimer_
Definition Door.h:100
IOList & ios()
Return the IOList associated with the work.
Definition io_list.h:40
T min(T... args)
STL namespace.
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn.
Definition Spawn.h:66
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
boost::beast::ssl_stream< socket_type > stream_type
Definition Handshake.h:22
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
T str(T... args)
std::uint64_t limit
Definition Door.h:109
std::uint64_t used
Definition Door.h:108
Configuration information for a Server listening port.
Definition Port.h:28