rippled
Loading...
Searching...
No Matches
short_read_test.cpp
1#include <test/jtx/envconfig.h>
2
3#include <xrpl/basics/make_SSLContext.h>
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/unit_test.h>
6
7#include <boost/asio/bind_executor.hpp>
8#include <boost/asio/buffer.hpp>
9#include <boost/asio/ip/tcp.hpp>
10#include <boost/asio/read_until.hpp>
11#include <boost/asio/ssl.hpp>
12#include <boost/asio/strand.hpp>
13#include <boost/asio/streambuf.hpp>
14#include <boost/utility/in_place_factory.hpp>
15
16#include <condition_variable>
17#include <functional>
18#include <thread>
19#include <utility>
20
21namespace ripple {
22/*
23
24Findings from the test:
25
26If the remote host calls async_shutdown then the local host's
27async_read will complete with eof.
28
29If both hosts call async_shutdown then the calls to async_shutdown
30will complete with eof.
31
32*/
33
35{
36private:
37 using io_context_type = boost::asio::io_context;
38 using strand_type = boost::asio::strand<io_context_type::executor_type>;
39 using timer_type =
40 boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
41 using acceptor_type = boost::asio::ip::tcp::acceptor;
42 using socket_type = boost::asio::ip::tcp::socket;
43 using stream_type = boost::asio::ssl::stream<socket_type&>;
44 using error_code = boost::system::error_code;
45 using endpoint_type = boost::asio::ip::tcp::endpoint;
46 using address_type = boost::asio::ip::address;
47
49 boost::optional<boost::asio::executor_work_guard<
50 boost::asio::io_context::executor_type>>
54
55 template <class Streambuf>
56 static void
57 write(Streambuf& sb, std::string const& s)
58 {
59 using boost::asio::buffer;
60 using boost::asio::buffer_copy;
61 using boost::asio::buffer_size;
62 boost::asio::const_buffer buf(s.data(), s.size());
63 sb.commit(buffer_copy(sb.prepare(buffer_size(buf)), buf));
64 }
65
66 //--------------------------------------------------------------------------
67
68 class Base
69 {
70 protected:
71 class Child
72 {
73 private:
75
76 public:
77 explicit Child(Base& base) : base_(base)
78 {
79 }
80
81 virtual ~Child()
82 {
83 base_.remove(this);
84 }
85
86 virtual void
87 close() = 0;
88 };
89
90 private:
94 bool closed_ = false;
95
96 public:
98 {
99 // Derived class must call wait() in the destructor
100 assert(list_.empty());
101 }
102
103 void
105 {
107 list_.emplace(child.get(), child);
108 }
109
110 void
111 remove(Child* child)
112 {
114 list_.erase(child);
115 if (list_.empty())
117 }
118
119 void
121 {
123 {
125 v.reserve(list_.size());
126 if (closed_)
127 return;
128 closed_ = true;
129 for (auto const& c : list_)
130 {
131 if (auto p = c.second.lock())
132 {
133 p->close();
134 // Must destroy shared_ptr outside the
135 // lock otherwise deadlock from the
136 // managed object's destructor.
137 v.emplace_back(std::move(p));
138 }
139 }
140 }
141 }
142
143 void
145 {
147 while (!list_.empty())
148 cond_.wait(lock);
149 }
150 };
151
152 //--------------------------------------------------------------------------
153
154 class Server : public Base
155 {
156 private:
159
161 {
167
168 explicit Acceptor(Server& server)
169 : Child(server)
170 , server_(server)
172 , acceptor_(
175 boost::asio::ip::make_address(
176 test::getEnvLocalhostAddr()),
177 0))
179 , strand_(boost::asio::make_strand(test_.io_context_))
180 {
181 acceptor_.listen();
182 server_.endpoint_ = acceptor_.local_endpoint();
183 }
184
185 void
186 close() override
187 {
188 if (!strand_.running_in_this_thread())
189 return post(
190 strand_,
192 acceptor_.close();
193 }
194
195 void
197 {
198 acceptor_.async_accept(
199 socket_,
200 bind_executor(
201 strand_,
202 std::bind(
205 std::placeholders::_1)));
206 }
207
208 void
209 fail(std::string const& what, error_code ec)
210 {
211 if (acceptor_.is_open())
212 {
213 if (ec != boost::asio::error::operation_aborted)
214 test_.log << what << ": " << ec.message() << std::endl;
215 acceptor_.close();
216 }
217 }
218
219 void
221 {
222 if (ec)
223 return fail("accept", ec);
224 auto const p =
226 server_.add(p);
227 p->run();
228 acceptor_.async_accept(
229 socket_,
230 bind_executor(
231 strand_,
232 std::bind(
235 std::placeholders::_1)));
236 }
237 };
238
240 {
247 boost::asio::streambuf buf_;
248
249 Connection(Server& server, socket_type&& socket)
250 : Child(server)
251 , server_(server)
253 , socket_(std::move(socket))
255 , strand_(boost::asio::make_strand(test_.io_context_))
257 {
258 }
259
260 void
261 close() override
262 {
263 if (!strand_.running_in_this_thread())
264 return post(
265 strand_,
267 if (socket_.is_open())
268 {
269 socket_.close();
270 timer_.cancel();
271 }
272 }
273
274 void
276 {
277 timer_.expires_after(std::chrono::seconds(3));
278 timer_.async_wait(bind_executor(
279 strand_,
280 std::bind(
283 std::placeholders::_1)));
284 stream_.async_handshake(
285 stream_type::server,
286 bind_executor(
287 strand_,
288 std::bind(
291 std::placeholders::_1)));
292 }
293
294 void
295 fail(std::string const& what, error_code ec)
296 {
297 if (socket_.is_open())
298 {
299 if (ec != boost::asio::error::operation_aborted)
300 test_.log << "[server] " << what << ": " << ec.message()
301 << std::endl;
302 socket_.close();
303 timer_.cancel();
304 }
305 }
306
307 void
309 {
310 if (ec == boost::asio::error::operation_aborted)
311 return;
312 if (ec)
313 return fail("timer", ec);
314 test_.log << "[server] timeout" << std::endl;
315 socket_.close();
316 }
317
318 void
320 {
321 if (ec)
322 return fail("handshake", ec);
323#if 1
324 boost::asio::async_read_until(
325 stream_,
326 buf_,
327 "\n",
328 bind_executor(
329 strand_,
330 std::bind(
333 std::placeholders::_1,
334 std::placeholders::_2)));
335#else
336 close();
337#endif
338 }
339
340 void
341 on_read(error_code ec, std::size_t bytes_transferred)
342 {
343 if (ec == boost::asio::error::eof)
344 {
345 server_.test_.log << "[server] read: EOF" << std::endl;
346 return stream_.async_shutdown(bind_executor(
347 strand_,
348 std::bind(
351 std::placeholders::_1)));
352 }
353 if (ec)
354 return fail("read", ec);
355
356 buf_.commit(bytes_transferred);
357 buf_.consume(bytes_transferred);
358 write(buf_, "BYE\n");
359 boost::asio::async_write(
360 stream_,
361 buf_.data(),
362 bind_executor(
363 strand_,
364 std::bind(
367 std::placeholders::_1,
368 std::placeholders::_2)));
369 }
370
371 void
372 on_write(error_code ec, std::size_t bytes_transferred)
373 {
374 buf_.consume(bytes_transferred);
375 if (ec)
376 return fail("write", ec);
377 stream_.async_shutdown(bind_executor(
378 strand_,
379 std::bind(
382 std::placeholders::_1)));
383 }
384
385 void
387 {
388 if (ec)
389 return fail("shutdown", ec);
390 socket_.close();
391 timer_.cancel();
392 }
393 };
394
395 public:
396 explicit Server(short_read_test& test) : test_(test)
397 {
398 auto const p = std::make_shared<Acceptor>(*this);
399 add(p);
400 p->run();
401 }
402
404 {
405 close();
406 wait();
407 }
408
409 endpoint_type const&
410 endpoint() const
411 {
412 return endpoint_;
413 }
414 };
415
416 //--------------------------------------------------------------------------
417 class Client : public Base
418
419 {
420 private:
422
424 {
431 boost::asio::streambuf buf_;
433
434 Connection(Client& client, endpoint_type const& ep)
435 : Child(client)
436 , client_(client)
440 , strand_(boost::asio::make_strand(test_.io_context_))
442 , ep_(ep)
443 {
444 }
445
446 void
447 close() override
448 {
449 if (!strand_.running_in_this_thread())
450 return post(
451 strand_,
453 if (socket_.is_open())
454 {
455 socket_.close();
456 timer_.cancel();
457 }
458 }
459
460 void
462 {
463 timer_.expires_after(std::chrono::seconds(3));
464 timer_.async_wait(bind_executor(
465 strand_,
466 std::bind(
469 std::placeholders::_1)));
470 socket_.async_connect(
471 ep,
472 bind_executor(
473 strand_,
474 std::bind(
477 std::placeholders::_1)));
478 }
479
480 void
481 fail(std::string const& what, error_code ec)
482 {
483 if (socket_.is_open())
484 {
485 if (ec != boost::asio::error::operation_aborted)
486 test_.log << "[client] " << what << ": " << ec.message()
487 << std::endl;
488 socket_.close();
489 timer_.cancel();
490 }
491 }
492
493 void
495 {
496 if (ec == boost::asio::error::operation_aborted)
497 return;
498 if (ec)
499 return fail("timer", ec);
500 test_.log << "[client] timeout";
501 socket_.close();
502 }
503
504 void
506 {
507 if (ec)
508 return fail("connect", ec);
509 stream_.async_handshake(
510 stream_type::client,
511 bind_executor(
512 strand_,
513 std::bind(
516 std::placeholders::_1)));
517 }
518
519 void
521 {
522 if (ec)
523 return fail("handshake", ec);
524 write(buf_, "HELLO\n");
525
526#if 1
527 boost::asio::async_write(
528 stream_,
529 buf_.data(),
530 bind_executor(
531 strand_,
532 std::bind(
535 std::placeholders::_1,
536 std::placeholders::_2)));
537#else
538 stream_.async_shutdown(bind_executor(
539 strand_,
540 std::bind(
543 std::placeholders::_1)));
544#endif
545 }
546
547 void
548 on_write(error_code ec, std::size_t bytes_transferred)
549 {
550 buf_.consume(bytes_transferred);
551 if (ec)
552 return fail("write", ec);
553#if 1
554 boost::asio::async_read_until(
555 stream_,
556 buf_,
557 "\n",
558 bind_executor(
559 strand_,
560 std::bind(
563 std::placeholders::_1,
564 std::placeholders::_2)));
565#else
566 stream_.async_shutdown(bind_executor(
567 strand_,
568 std::bind(
571 std::placeholders::_1)));
572#endif
573 }
574
575 void
576 on_read(error_code ec, std::size_t bytes_transferred)
577 {
578 if (ec)
579 return fail("read", ec);
580 buf_.commit(bytes_transferred);
581 stream_.async_shutdown(bind_executor(
582 strand_,
583 std::bind(
586 std::placeholders::_1)));
587 }
588
589 void
591 {
592 if (ec)
593 return fail("shutdown", ec);
594 socket_.close();
595 timer_.cancel();
596 }
597 };
598
599 public:
600 Client(short_read_test& test, endpoint_type const& ep) : test_(test)
601 {
602 auto const p = std::make_shared<Connection>(*this, ep);
603 add(p);
604 p->run(ep);
605 }
606
608 {
609 close();
610 wait();
611 }
612 };
613
614public:
616 : work_(io_context_.get_executor())
617 , thread_(std::thread([this]() {
618 beast::setCurrentThreadName("io_context");
619 this->io_context_.run();
620 }))
622 {
623 }
624
626 {
627 work_.reset();
628 thread_.join();
629 }
630
631 void
632 run() override
633 {
634 Server s(*this);
635 Client c(*this, s.endpoint());
636 c.wait();
637 pass();
638 }
639};
640
641BEAST_DEFINE_TESTSUITE(short_read, overlay, ripple);
642
643} // namespace ripple
T bind(T... args)
A testsuite class.
Definition suite.h:52
log_os< char > log
Logging output stream.
Definition suite.h:149
void pass()
Record a successful test condition.
Definition suite.h:508
friend class thread
Definition suite.h:304
std::map< Child *, std::weak_ptr< Child > > list_
void add(std::shared_ptr< Child > const &child)
std::condition_variable cond_
Client(short_read_test &test, endpoint_type const &ep)
endpoint_type const & endpoint() const
std::shared_ptr< boost::asio::ssl::context > context_
void run() override
Runs the suite.
boost::system::error_code error_code
boost::asio::ssl::stream< socket_type & > stream_type
boost::asio::io_context io_context_type
boost::asio::ip::tcp::endpoint endpoint_type
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_type
boost::asio::ip::tcp::acceptor acceptor_type
static void write(Streambuf &sb, std::string const &s)
boost::asio::ip::address address_type
boost::asio::strand< io_context_type::executor_type > strand_type
boost::asio::ip::tcp::socket socket_type
boost::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
T data(T... args)
T emplace_back(T... args)
T endl(T... args)
T is_same_v
T join(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
STL namespace.
T reserve(T... args)
T size(T... args)
void on_read(error_code ec, std::size_t bytes_transferred)
void on_write(error_code ec, std::size_t bytes_transferred)
void fail(std::string const &what, error_code ec)
Connection(Client &client, endpoint_type const &ep)
void fail(std::string const &what, error_code ec)
void fail(std::string const &what, error_code ec)
void on_read(error_code ec, std::size_t bytes_transferred)
void on_write(error_code ec, std::size_t bytes_transferred)
Connection(Server &server, socket_type &&socket)