rippled
Loading...
Searching...
No Matches
short_read_test.cpp
1#include <test/jtx/envconfig.h>
2
3#include <xrpl/basics/make_SSLContext.h>
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/unit_test.h>
6
7#include <boost/asio/bind_executor.hpp>
8#include <boost/asio/buffer.hpp>
9#include <boost/asio/ip/tcp.hpp>
10#include <boost/asio/read_until.hpp>
11#include <boost/asio/ssl.hpp>
12#include <boost/asio/strand.hpp>
13#include <boost/asio/streambuf.hpp>
14#include <boost/utility/in_place_factory.hpp>
15
16#include <condition_variable>
17#include <functional>
18#include <thread>
19#include <utility>
20
21namespace xrpl {
22/*
23
24Findings from the test:
25
26If the remote host calls async_shutdown then the local host's
27async_read will complete with eof.
28
29If both hosts call async_shutdown then the calls to async_shutdown
30will complete with eof.
31
32*/
33
35{
36private:
37 using io_context_type = boost::asio::io_context;
38 using strand_type = boost::asio::strand<io_context_type::executor_type>;
39 using timer_type = boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
40 using acceptor_type = boost::asio::ip::tcp::acceptor;
41 using socket_type = boost::asio::ip::tcp::socket;
42 using stream_type = boost::asio::ssl::stream<socket_type&>;
43 using error_code = boost::system::error_code;
44 using endpoint_type = boost::asio::ip::tcp::endpoint;
45 using address_type = boost::asio::ip::address;
46
48 boost::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
51
52 template <class Streambuf>
53 static void
54 write(Streambuf& sb, std::string const& s)
55 {
56 using boost::asio::buffer;
57 using boost::asio::buffer_copy;
58 using boost::asio::buffer_size;
59 boost::asio::const_buffer const buf(s.data(), s.size());
60 sb.commit(buffer_copy(sb.prepare(buffer_size(buf)), buf));
61 }
62
63 //--------------------------------------------------------------------------
64
65 class Base
66 {
67 protected:
68 class Child
69 {
70 private:
72
73 public:
74 explicit Child(Base& base) : base_(base)
75 {
76 }
77
78 virtual ~Child()
79 {
80 base_.remove(this);
81 }
82
83 virtual void
84 close() = 0;
85 };
86
87 private:
91 bool closed_ = false;
92
93 public:
95 {
96 // Derived class must call wait() in the destructor
97 assert(list_.empty());
98 }
99
100 void
102 {
103 std::lock_guard const lock(mutex_);
104 list_.emplace(child.get(), child);
105 }
106
107 void
108 remove(Child* child)
109 {
110 std::lock_guard const lock(mutex_);
111 list_.erase(child);
112 if (list_.empty())
114 }
115
116 void
118 {
120 {
121 std::lock_guard const lock(mutex_);
122 v.reserve(list_.size());
123 if (closed_)
124 return;
125 closed_ = true;
126 for (auto const& c : list_)
127 {
128 if (auto p = c.second.lock())
129 {
130 p->close();
131 // Must destroy shared_ptr outside the
132 // lock otherwise deadlock from the
133 // managed object's destructor.
134 v.emplace_back(std::move(p));
135 }
136 }
137 }
138 }
139
140 void
142 {
144 while (!list_.empty())
145 cond_.wait(lock);
146 }
147 };
148
149 //--------------------------------------------------------------------------
150
151 class Server : public Base
152 {
153 private:
156
158 {
164
165 explicit Acceptor(Server& server)
166 : Child(server)
167 , server_(server)
169 , acceptor_(
171 endpoint_type(boost::asio::ip::make_address(test::getEnvLocalhostAddr()), 0))
173 , strand_(boost::asio::make_strand(test_.io_context_))
174 {
175 acceptor_.listen();
176 server_.endpoint_ = acceptor_.local_endpoint();
177 }
178
179 void
180 close() override
181 {
182 if (!strand_.running_in_this_thread())
183 {
185 return;
186 }
187 acceptor_.close();
188 }
189
190 void
192 {
193 acceptor_.async_accept(
194 socket_,
195 bind_executor(
196 strand_,
197 std::bind(
198 &Acceptor::on_accept, shared_from_this(), std::placeholders::_1)));
199 }
200
201 void
202 fail(std::string const& what, error_code ec)
203 {
204 if (acceptor_.is_open())
205 {
206 if (ec != boost::asio::error::operation_aborted)
207 test_.log << what << ": " << ec.message() << std::endl;
208 acceptor_.close();
209 }
210 }
211
212 void
214 {
215 if (ec)
216 {
217 fail("accept", ec);
218 return;
219 }
220 auto const p = std::make_shared<Connection>(server_, std::move(socket_));
221 server_.add(p);
222 p->run();
223 acceptor_.async_accept(
224 socket_,
225 bind_executor(
226 strand_,
227 std::bind(
228 &Acceptor::on_accept, shared_from_this(), std::placeholders::_1)));
229 }
230 };
231
233 {
240 boost::asio::streambuf buf_;
241
242 Connection(Server& server, socket_type&& socket)
243 : Child(server)
244 , server_(server)
246 , socket_(std::move(socket))
248 , strand_(boost::asio::make_strand(test_.io_context_))
250 {
251 }
252
253 void
254 close() override
255 {
256 if (!strand_.running_in_this_thread())
257 {
259 return;
260 }
261 if (socket_.is_open())
262 {
263 socket_.close();
264 timer_.cancel();
265 }
266 }
267
268 void
270 {
271 timer_.expires_after(std::chrono::seconds(3));
272 timer_.async_wait(bind_executor(
273 strand_,
274 std::bind(&Connection::on_timer, shared_from_this(), std::placeholders::_1)));
275 stream_.async_handshake(
276 stream_type::server,
277 bind_executor(
278 strand_,
279 std::bind(
280 &Connection::on_handshake, shared_from_this(), std::placeholders::_1)));
281 }
282
283 void
284 fail(std::string const& what, error_code ec)
285 {
286 if (socket_.is_open())
287 {
288 if (ec != boost::asio::error::operation_aborted)
289 test_.log << "[server] " << what << ": " << ec.message() << std::endl;
290 socket_.close();
291 timer_.cancel();
292 }
293 }
294
295 void
297 {
298 if (ec == boost::asio::error::operation_aborted)
299 return;
300 if (ec)
301 {
302 fail("timer", ec);
303 return;
304 }
305 test_.log << "[server] timeout" << std::endl;
306 socket_.close();
307 }
308
309 void
311 {
312 if (ec)
313 {
314 fail("handshake", ec);
315 return;
316 }
317#if 1
318 boost::asio::async_read_until(
319 stream_,
320 buf_,
321 "\n",
322 bind_executor(
323 strand_,
324 std::bind(
327 std::placeholders::_1,
328 std::placeholders::_2)));
329#else
330 close();
331#endif
332 }
333
334 void
335 on_read(error_code ec, std::size_t bytes_transferred)
336 {
337 if (ec == boost::asio::error::eof)
338 {
339 server_.test_.log << "[server] read: EOF" << std::endl;
340 stream_.async_shutdown(bind_executor(
341 strand_,
342 std::bind(
343 &Connection::on_shutdown, shared_from_this(), std::placeholders::_1)));
344 return;
345 }
346 if (ec)
347 {
348 fail("read", ec);
349 return;
350 }
351
352 buf_.commit(bytes_transferred);
353 buf_.consume(bytes_transferred);
354 write(buf_, "BYE\n");
355 boost::asio::async_write(
356 stream_,
357 buf_.data(),
358 bind_executor(
359 strand_,
360 std::bind(
363 std::placeholders::_1,
364 std::placeholders::_2)));
365 }
366
367 void
368 on_write(error_code ec, std::size_t bytes_transferred)
369 {
370 buf_.consume(bytes_transferred);
371 if (ec)
372 {
373 fail("write", ec);
374 return;
375 }
376 stream_.async_shutdown(bind_executor(
377 strand_,
378 std::bind(
379 &Connection::on_shutdown, shared_from_this(), std::placeholders::_1)));
380 }
381
382 void
384 {
385 if (ec)
386 {
387 fail("shutdown", ec);
388 return;
389 }
390 socket_.close();
391 timer_.cancel();
392 }
393 };
394
395 public:
396 explicit Server(short_read_test& test) : test_(test)
397 {
398 auto const p = std::make_shared<Acceptor>(*this);
399 add(p);
400 p->run();
401 }
402
404 {
405 close();
406 wait();
407 }
408
409 endpoint_type const&
410 endpoint() const
411 {
412 return endpoint_;
413 }
414 };
415
416 //--------------------------------------------------------------------------
417 class Client : public Base
418
419 {
420 private:
422
424 {
431 boost::asio::streambuf buf_;
433
434 Connection(Client& client, endpoint_type const& ep)
435 : Child(client)
436 , client_(client)
440 , strand_(boost::asio::make_strand(test_.io_context_))
442 , ep_(ep)
443 {
444 }
445
446 void
447 close() override
448 {
449 if (!strand_.running_in_this_thread())
450 {
452 return;
453 }
454 if (socket_.is_open())
455 {
456 socket_.close();
457 timer_.cancel();
458 }
459 }
460
461 void
463 {
464 timer_.expires_after(std::chrono::seconds(3));
465 timer_.async_wait(bind_executor(
466 strand_,
467 std::bind(&Connection::on_timer, shared_from_this(), std::placeholders::_1)));
468 socket_.async_connect(
469 ep,
470 bind_executor(
471 strand_,
472 std::bind(
473 &Connection::on_connect, shared_from_this(), std::placeholders::_1)));
474 }
475
476 void
477 fail(std::string const& what, error_code ec)
478 {
479 if (socket_.is_open())
480 {
481 if (ec != boost::asio::error::operation_aborted)
482 test_.log << "[client] " << what << ": " << ec.message() << std::endl;
483 socket_.close();
484 timer_.cancel();
485 }
486 }
487
488 void
490 {
491 if (ec == boost::asio::error::operation_aborted)
492 return;
493 if (ec)
494 {
495 fail("timer", ec);
496 return;
497 }
498 test_.log << "[client] timeout";
499 socket_.close();
500 }
501
502 void
504 {
505 if (ec)
506 {
507 fail("connect", ec);
508 return;
509 }
510 stream_.async_handshake(
511 stream_type::client,
512 bind_executor(
513 strand_,
514 std::bind(
515 &Connection::on_handshake, shared_from_this(), std::placeholders::_1)));
516 }
517
518 void
520 {
521 if (ec)
522 {
523 fail("handshake", ec);
524 return;
525 }
526 write(buf_, "HELLO\n");
527
528#if 1
529 boost::asio::async_write(
530 stream_,
531 buf_.data(),
532 bind_executor(
533 strand_,
534 std::bind(
537 std::placeholders::_1,
538 std::placeholders::_2)));
539#else
540 stream_.async_shutdown(bind_executor(
541 strand_,
542 std::bind(
543 &Connection::on_shutdown, shared_from_this(), std::placeholders::_1)));
544#endif
545 }
546
547 void
548 on_write(error_code ec, std::size_t bytes_transferred)
549 {
550 buf_.consume(bytes_transferred);
551 if (ec)
552 {
553 fail("write", ec);
554 return;
555 }
556#if 1
557 boost::asio::async_read_until(
558 stream_,
559 buf_,
560 "\n",
561 bind_executor(
562 strand_,
563 std::bind(
566 std::placeholders::_1,
567 std::placeholders::_2)));
568#else
569 stream_.async_shutdown(bind_executor(
570 strand_,
571 std::bind(
572 &Connection::on_shutdown, shared_from_this(), std::placeholders::_1)));
573#endif
574 }
575
576 void
577 on_read(error_code ec, std::size_t bytes_transferred)
578 {
579 if (ec)
580 {
581 fail("read", ec);
582 return;
583 }
584 buf_.commit(bytes_transferred);
585 stream_.async_shutdown(bind_executor(
586 strand_,
587 std::bind(
588 &Connection::on_shutdown, shared_from_this(), std::placeholders::_1)));
589 }
590
591 void
593 {
594 if (ec)
595 {
596 fail("shutdown", ec);
597 return;
598 }
599 socket_.close();
600 timer_.cancel();
601 }
602 };
603
604 public:
605 Client(short_read_test& test, endpoint_type const& ep) : test_(test)
606 {
607 auto const p = std::make_shared<Connection>(*this, ep);
608 add(p);
609 p->run(ep);
610 }
611
613 {
614 close();
615 wait();
616 }
617 };
618
619public:
621 : work_(io_context_.get_executor())
622 , thread_(std::thread([this]() {
623 beast::setCurrentThreadName("io_context");
624 this->io_context_.run();
625 }))
627 {
628 }
629
631 {
632 work_.reset();
633 thread_.join();
634 }
635
636 void
637 run() override
638 {
639 Server const s(*this);
640 Client c(*this, s.endpoint());
641 c.wait();
642 pass();
643 }
644};
645
646BEAST_DEFINE_TESTSUITE(short_read, overlay, xrpl);
647
648} // namespace xrpl
T bind(T... args)
A testsuite class.
Definition suite.h:51
log_os< char > log
Logging output stream.
Definition suite.h:147
void pass()
Record a successful test condition.
Definition suite.h:497
friend class thread
Definition suite.h:298
std::map< Child *, std::weak_ptr< Child > > list_
std::condition_variable cond_
void add(std::shared_ptr< Child > const &child)
Client(short_read_test &test, endpoint_type const &ep)
Server(short_read_test &test)
endpoint_type const & endpoint() const
boost::asio::ip::tcp::endpoint endpoint_type
boost::asio::io_context io_context_type
boost::asio::ip::address address_type
boost::asio::strand< io_context_type::executor_type > strand_type
void run() override
Runs the suite.
io_context_type io_context_
boost::asio::ip::tcp::socket socket_type
boost::system::error_code error_code
std::shared_ptr< boost::asio::ssl::context > context_
boost::asio::ip::tcp::acceptor acceptor_type
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_type
boost::asio::ssl::stream< socket_type & > stream_type
static void write(Streambuf &sb, std::string const &s)
boost::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
T data(T... args)
T emplace_back(T... args)
T endl(T... args)
T is_same_v
T join(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
T reserve(T... args)
T size(T... args)
void on_write(error_code ec, std::size_t bytes_transferred)
Connection(Client &client, endpoint_type const &ep)
void fail(std::string const &what, error_code ec)
void on_read(error_code ec, std::size_t bytes_transferred)
void fail(std::string const &what, error_code ec)
void on_read(error_code ec, std::size_t bytes_transferred)
void on_write(error_code ec, std::size_t bytes_transferred)
void fail(std::string const &what, error_code ec)
Connection(Server &server, socket_type &&socket)