xrpld
Loading...
Searching...
No Matches
short_read_test.cpp
1#include <test/jtx/envconfig.h>
2
3#include <xrpl/basics/make_SSLContext.h>
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/unit_test/suite.h>
6
7#include <boost/asio/basic_waitable_timer.hpp>
8#include <boost/asio/bind_executor.hpp>
9#include <boost/asio/buffer.hpp>
10#include <boost/asio/error.hpp>
11#include <boost/asio/executor_work_guard.hpp>
12#include <boost/asio/io_context.hpp>
13#include <boost/asio/ip/address.hpp>
14#include <boost/asio/ip/tcp.hpp>
15#include <boost/asio/post.hpp>
16#include <boost/asio/read_until.hpp>
17#include <boost/asio/ssl/context.hpp>
18#include <boost/asio/ssl/stream.hpp>
19#include <boost/asio/strand.hpp>
20#include <boost/asio/streambuf.hpp>
21#include <boost/asio/write.hpp>
22#include <boost/optional/optional.hpp> // IWYU pragma: keep
23#include <boost/system/detail/error_code.hpp>
24
25#include <cassert>
26#include <chrono>
27#include <condition_variable>
28#include <cstddef>
29#include <functional>
30#include <map>
31#include <memory>
32#include <mutex>
33#include <ostream>
34#include <string>
35#include <thread>
36#include <utility>
37#include <vector>
38
39namespace xrpl {
40/*
41
42Findings from the test:
43
44If the remote host calls async_shutdown then the local host's
45async_read will complete with eof.
46
47If both hosts call async_shutdown then the calls to async_shutdown
48will complete with eof.
49
50*/
51
53{
54private:
55 using io_context_type = boost::asio::io_context;
56 using strand_type = boost::asio::strand<io_context_type::executor_type>;
57 using timer_type = boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
58 using acceptor_type = boost::asio::ip::tcp::acceptor;
59 using socket_type = boost::asio::ip::tcp::socket;
60 using stream_type = boost::asio::ssl::stream<socket_type&>;
61 using error_code = boost::system::error_code;
62 using endpoint_type = boost::asio::ip::tcp::endpoint;
63 using address_type = boost::asio::ip::address;
64
66 boost::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
69
70 template <class Streambuf>
71 static void
72 write(Streambuf& sb, std::string const& s)
73 {
74 using boost::asio::buffer;
75 using boost::asio::buffer_copy;
76 using boost::asio::buffer_size;
77 boost::asio::const_buffer const buf(s.data(), s.size());
78 sb.commit(buffer_copy(sb.prepare(buffer_size(buf)), buf));
79 }
80
81 //--------------------------------------------------------------------------
82
83 class Base
84 {
85 protected:
86 class Child
87 {
88 private:
90
91 public:
92 explicit Child(Base& base) : base_(base)
93 {
94 }
95
96 virtual ~Child()
97 {
98 base_.remove(this);
99 }
100
101 virtual void
102 close() = 0;
103 };
104
105 private:
109 bool closed_ = false;
110
111 public:
113 {
114 // Derived class must call wait() in the destructor
115 assert(list_.empty());
116 }
117
118 void
120 {
121 std::scoped_lock const lock(mutex_);
122 list_.emplace(child.get(), child);
123 }
124
125 void
126 remove(Child* child)
127 {
128 std::scoped_lock const lock(mutex_);
129 list_.erase(child);
130 if (list_.empty())
131 cond_.notify_one();
132 }
133
134 void
136 {
138 {
139 std::scoped_lock const lock(mutex_);
140 v.reserve(list_.size());
141 if (closed_)
142 return;
143 closed_ = true;
144 for (auto const& c : list_)
145 {
146 if (auto p = c.second.lock())
147 {
148 p->close();
149 // Must destroy shared_ptr outside the
150 // lock otherwise deadlock from the
151 // managed object's destructor.
152 v.emplace_back(std::move(p));
153 }
154 }
155 }
156 }
157
158 void
160 {
162 while (!list_.empty())
163 cond_.wait(lock);
164 }
165 };
166
167 //--------------------------------------------------------------------------
168
169 class Server : public Base
170 {
171 private:
174
176 {
182
184 : Child(server)
185 , server(server)
186 , test(server.test_)
187 , acceptor(
189 endpoint_type(boost::asio::ip::make_address(test::getEnvLocalhostAddr()), 0))
191 , strand(boost::asio::make_strand(test.ioContext_))
192 {
193 acceptor.listen();
194 server.endpoint_ = acceptor.local_endpoint();
195 }
196
197 void
198 close() override
199 {
200 if (!strand.running_in_this_thread())
201 {
203 return;
204 }
205 acceptor.close();
206 }
207
208 void
210 {
211 acceptor.async_accept(
212 socket,
213 bind_executor(
214 strand,
215 std::bind(&Acceptor::onAccept, shared_from_this(), std::placeholders::_1)));
216 }
217
218 void
219 fail(std::string const& what, error_code ec)
220 {
221 if (acceptor.is_open())
222 {
223 if (ec != boost::asio::error::operation_aborted)
224 test.log << what << ": " << ec.message() << std::endl;
225 acceptor.close();
226 }
227 }
228
229 void
231 {
232 if (ec)
233 {
234 fail("accept", ec);
235 return;
236 }
237 auto const p = std::make_shared<Connection>(server, std::move(socket));
238 server.add(p);
239 p->run();
240 acceptor.async_accept(
241 socket,
242 bind_executor(
243 strand,
244 std::bind(&Acceptor::onAccept, shared_from_this(), std::placeholders::_1)));
245 }
246 };
247
249 {
256 boost::asio::streambuf buf;
257
258 Connection(Server& inServer, socket_type&& inSocket)
259 : Child(inServer)
260 , server(inServer)
261 , test(server.test_)
262 , socket(std::move(inSocket))
264 , strand(boost::asio::make_strand(test.ioContext_))
266 {
267 }
268
269 void
270 close() override
271 {
272 if (!strand.running_in_this_thread())
273 {
275 return;
276 }
277 if (socket.is_open())
278 {
279 socket.close();
280 timer.cancel();
281 }
282 }
283
284 void
286 {
287 timer.expires_after(std::chrono::seconds(3));
288 timer.async_wait(bind_executor(
289 strand,
290 std::bind(&Connection::onTimer, shared_from_this(), std::placeholders::_1)));
291 stream.async_handshake(
292 stream_type::server,
293 bind_executor(
294 strand,
295 std::bind(
296 &Connection::onHandshake, shared_from_this(), std::placeholders::_1)));
297 }
298
299 void
300 fail(std::string const& what, error_code ec)
301 {
302 if (socket.is_open())
303 {
304 if (ec != boost::asio::error::operation_aborted)
305 test.log << "[server] " << what << ": " << ec.message() << std::endl;
306 socket.close();
307 timer.cancel();
308 }
309 }
310
311 void
313 {
314 if (ec == boost::asio::error::operation_aborted)
315 return;
316 if (ec)
317 {
318 fail("timer", ec);
319 return;
320 }
321 test.log << "[server] timeout" << std::endl;
322 socket.close();
323 }
324
325 void
327 {
328 if (ec)
329 {
330 fail("handshake", ec);
331 return;
332 }
333#if 1
334 boost::asio::async_read_until(
335 stream,
336 buf,
337 "\n",
338 bind_executor(
339 strand,
340 std::bind(
343 std::placeholders::_1,
344 std::placeholders::_2)));
345#else
346 close();
347#endif
348 }
349
350 void
351 onRead(error_code ec, std::size_t bytesTransferred)
352 {
353 if (ec == boost::asio::error::eof)
354 {
355 server.test_.log << "[server] read: EOF" << std::endl;
356 stream.async_shutdown(bind_executor(
357 strand,
358 std::bind(
359 &Connection::onShutdown, shared_from_this(), std::placeholders::_1)));
360 return;
361 }
362 if (ec)
363 {
364 fail("read", ec);
365 return;
366 }
367
368 buf.commit(bytesTransferred);
369 buf.consume(bytesTransferred);
370 write(buf, "BYE\n");
371 boost::asio::async_write(
372 stream,
373 buf.data(),
374 bind_executor(
375 strand,
376 std::bind(
379 std::placeholders::_1,
380 std::placeholders::_2)));
381 }
382
383 void
384 onWrite(error_code ec, std::size_t bytesTransferred)
385 {
386 buf.consume(bytesTransferred);
387 if (ec)
388 {
389 fail("write", ec);
390 return;
391 }
392 stream.async_shutdown(bind_executor(
393 strand,
394 std::bind(&Connection::onShutdown, shared_from_this(), std::placeholders::_1)));
395 }
396
397 void
399 {
400 if (ec)
401 {
402 fail("shutdown", ec);
403 return;
404 }
405 socket.close();
406 timer.cancel();
407 }
408 };
409
410 public:
412 {
413 auto const p = std::make_shared<Acceptor>(*this);
414 add(p);
415 p->run();
416 }
417
419 {
420 close();
421 wait();
422 }
423
424 [[nodiscard]] endpoint_type const&
425 endpoint() const
426 {
427 return endpoint_;
428 }
429 };
430
431 //--------------------------------------------------------------------------
432 class Client : public Base
433
434 {
435 private:
437
439 {
446 boost::asio::streambuf buf;
448
450 : Child(client)
451 , client(client)
452 , test(client.test_)
455 , strand(boost::asio::make_strand(test.ioContext_))
457 , ep(ep)
458 {
459 }
460
461 void
462 close() override
463 {
464 if (!strand.running_in_this_thread())
465 {
467 return;
468 }
469 if (socket.is_open())
470 {
471 socket.close();
472 timer.cancel();
473 }
474 }
475
476 void
478 {
479 timer.expires_after(std::chrono::seconds(3));
480 timer.async_wait(bind_executor(
481 strand,
482 std::bind(&Connection::onTimer, shared_from_this(), std::placeholders::_1)));
483 socket.async_connect(
484 ep,
485 bind_executor(
486 strand,
487 std::bind(
488 &Connection::onConnect, shared_from_this(), std::placeholders::_1)));
489 }
490
491 void
492 fail(std::string const& what, error_code ec)
493 {
494 if (socket.is_open())
495 {
496 if (ec != boost::asio::error::operation_aborted)
497 test.log << "[client] " << what << ": " << ec.message() << std::endl;
498 socket.close();
499 timer.cancel();
500 }
501 }
502
503 void
505 {
506 if (ec == boost::asio::error::operation_aborted)
507 return;
508 if (ec)
509 {
510 fail("timer", ec);
511 return;
512 }
513 test.log << "[client] timeout";
514 socket.close();
515 }
516
517 void
519 {
520 if (ec)
521 {
522 fail("connect", ec);
523 return;
524 }
525 stream.async_handshake(
526 stream_type::client,
527 bind_executor(
528 strand,
529 std::bind(
530 &Connection::onHandshake, shared_from_this(), std::placeholders::_1)));
531 }
532
533 void
535 {
536 if (ec)
537 {
538 fail("handshake", ec);
539 return;
540 }
541 write(buf, "HELLO\n");
542
543#if 1
544 boost::asio::async_write(
545 stream,
546 buf.data(),
547 bind_executor(
548 strand,
549 std::bind(
552 std::placeholders::_1,
553 std::placeholders::_2)));
554#else
555 stream_.async_shutdown(bind_executor(
556 strand_,
557 std::bind(
558 &Connection::on_shutdown, shared_from_this(), std::placeholders::_1)));
559#endif
560 }
561
562 void
563 onWrite(error_code ec, std::size_t bytesTransferred)
564 {
565 buf.consume(bytesTransferred);
566 if (ec)
567 {
568 fail("write", ec);
569 return;
570 }
571#if 1
572 boost::asio::async_read_until(
573 stream,
574 buf,
575 "\n",
576 bind_executor(
577 strand,
578 std::bind(
581 std::placeholders::_1,
582 std::placeholders::_2)));
583#else
584 stream_.async_shutdown(bind_executor(
585 strand_,
586 std::bind(
587 &Connection::on_shutdown, shared_from_this(), std::placeholders::_1)));
588#endif
589 }
590
591 void
592 onRead(error_code ec, std::size_t bytesTransferred)
593 {
594 if (ec)
595 {
596 fail("read", ec);
597 return;
598 }
599 buf.commit(bytesTransferred);
600 stream.async_shutdown(bind_executor(
601 strand,
602 std::bind(&Connection::onShutdown, shared_from_this(), std::placeholders::_1)));
603 }
604
605 void
607 {
608 if (ec)
609 {
610 fail("shutdown", ec);
611 return;
612 }
613 socket.close();
614 timer.cancel();
615 }
616 };
617
618 public:
620 {
621 auto const p = std::make_shared<Connection>(*this, ep);
622 add(p);
623 p->run(ep);
624 }
625
627 {
628 close();
629 wait();
630 }
631 };
632
633public:
635 : work_(ioContext_.get_executor())
636 , thread_(std::thread([this]() {
637 beast::setCurrentThreadName("io_context");
638 this->ioContext_.run();
639 }))
641 {
642 }
643
645 {
646 work_.reset();
647 thread_.join();
648 }
649
650 void
651 run() override
652 {
653 Server const s(*this);
654 Client c(*this, s.endpoint());
655 c.wait();
656 pass();
657 }
658};
659
660BEAST_DEFINE_TESTSUITE(short_read, overlay, xrpl);
661
662} // namespace xrpl
T bind(T... args)
A testsuite class.
Definition suite.h:50
void pass()
Record a successful test condition.
Definition suite.h:500
std::map< Child *, std::weak_ptr< Child > > list_
std::condition_variable cond_
void add(std::shared_ptr< Child > const &child)
Client(short_read_test &test, endpoint_type const &ep)
Server(short_read_test &test)
endpoint_type const & endpoint() const
io_context_type ioContext_
boost::asio::ip::tcp::endpoint endpoint_type
boost::asio::io_context io_context_type
boost::asio::ip::address address_type
boost::asio::strand< io_context_type::executor_type > strand_type
void run() override
Runs the suite.
boost::asio::ip::tcp::socket socket_type
boost::system::error_code error_code
std::shared_ptr< boost::asio::ssl::context > context_
boost::asio::ip::tcp::acceptor acceptor_type
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_type
boost::asio::ssl::stream< socket_type & > stream_type
static void write(Streambuf &sb, std::string const &s)
boost::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
T data(T... args)
T emplace_back(T... args)
T endl(T... args)
T make_shared(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::shared_ptr< boost::asio::ssl::context > makeSslContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
BEAST_DEFINE_TESTSUITE(AccountTxPaging, app, xrpl)
T reserve(T... args)
T size(T... args)
void onRead(error_code ec, std::size_t bytesTransferred)
Connection(Client &client, endpoint_type const &ep)
void onWrite(error_code ec, std::size_t bytesTransferred)
void fail(std::string const &what, error_code ec)
void fail(std::string const &what, error_code ec)
Connection(Server &inServer, socket_type &&inSocket)
void onRead(error_code ec, std::size_t bytesTransferred)
void fail(std::string const &what, error_code ec)
void onWrite(error_code ec, std::size_t bytesTransferred)