rippled
Loading...
Searching...
No Matches
BaseHTTPPeer.h
1#ifndef XRPL_SERVER_BASEHTTPPEER_H_INCLUDED
2#define XRPL_SERVER_BASEHTTPPEER_H_INCLUDED
3
4#include <xrpl/basics/Log.h>
5#include <xrpl/beast/net/IPAddressConversion.h>
6#include <xrpl/beast/utility/instrumentation.h>
7#include <xrpl/server/Session.h>
8#include <xrpl/server/detail/Spawn.h>
9#include <xrpl/server/detail/io_list.h>
10
11#include <boost/asio/ip/tcp.hpp>
12#include <boost/asio/spawn.hpp>
13#include <boost/asio/ssl/stream.hpp>
14#include <boost/asio/strand.hpp>
15#include <boost/asio/streambuf.hpp>
16#include <boost/beast/core/stream_traits.hpp>
17#include <boost/beast/http/dynamic_body.hpp>
18#include <boost/beast/http/message.hpp>
19#include <boost/beast/http/parser.hpp>
20#include <boost/beast/http/read.hpp>
21
22#include <atomic>
23#include <chrono>
24#include <functional>
25#include <memory>
26#include <mutex>
27#include <vector>
28
29namespace ripple {
30
32template <class Handler, class Impl>
33class BaseHTTPPeer : public io_list::work, public Session
34{
35protected:
37 using error_code = boost::system::error_code;
38 using endpoint_type = boost::asio::ip::tcp::endpoint;
39 using yield_context = boost::asio::yield_context;
40
41 enum {
42 // Size of our read/write buffer
43 bufferSize = 4 * 1024,
44
45 // Max seconds without completing a message
47 timeoutSecondsLocal = 3 // used for localhost clients
48 };
49
50 struct buffer
51 {
52 buffer(void const* ptr, std::size_t len)
53 : data(new char[len]), bytes(len), used(0)
54 {
55 memcpy(data.get(), ptr, len);
56 }
57
61 };
62
63 Port const& port_;
64 Handler& handler_;
65 boost::asio::executor_work_guard<boost::asio::executor> work_;
66 boost::asio::strand<boost::asio::executor> strand_;
69
72
73 boost::asio::streambuf read_buf_;
78 bool graceful_ = false;
79 bool complete_ = false;
80 boost::system::error_code ec_;
81
85
86 //--------------------------------------------------------------------------
87
88public:
89 template <class ConstBufferSequence>
91 Port const& port,
92 Handler& handler,
93 boost::asio::executor const& executor,
95 endpoint_type remote_address,
96 ConstBufferSequence const& buffers);
97
98 virtual ~BaseHTTPPeer();
99
100 Session&
102 {
103 return *this;
104 }
105
106 void
107 close() override;
108
109protected:
110 Impl&
112 {
113 return *static_cast<Impl*>(this);
114 }
115
116 void
117 fail(error_code ec, char const* what);
118
119 void
121
122 void
124
125 void
127
128 void
130
131 void
132 on_write(error_code const& ec, std::size_t bytes_transferred);
133
134 void
136 std::shared_ptr<Writer> const& writer,
137 bool keep_alive,
138 yield_context do_yield);
139
140 virtual void
142
143 virtual void
144 do_close() = 0;
145
146 // Session
147
149 journal() override
150 {
151 return journal_;
152 }
153
154 Port const&
155 port() override
156 {
157 return port_;
158 }
159
165
167 request() override
168 {
169 return message_;
170 }
171
172 void
173 write(void const* buffer, std::size_t bytes) override;
174
175 void
176 write(std::shared_ptr<Writer> const& writer, bool keep_alive) override;
177
179 detach() override;
180
181 void
182 complete() override;
183
184 void
185 close(bool graceful) override;
186};
187
188//------------------------------------------------------------------------------
189
190template <class Handler, class Impl>
191template <class ConstBufferSequence>
193 Port const& port,
194 Handler& handler,
195 boost::asio::executor const& executor,
196 beast::Journal journal,
197 endpoint_type remote_address,
198 ConstBufferSequence const& buffers)
199 : port_(port)
200 , handler_(handler)
201 , work_(boost::asio::make_work_guard(executor))
202 , strand_(boost::asio::make_strand(executor))
203 , remote_address_(remote_address)
204 , journal_(journal)
205{
206 read_buf_.commit(boost::asio::buffer_copy(
207 read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers));
208 static std::atomic<int> sid;
209 nid_ = ++sid;
210 id_ = std::string("#") + std::to_string(nid_) + " ";
211 JLOG(journal_.trace()) << id_ << "accept: " << remote_address_.address();
212}
213
214template <class Handler, class Impl>
216{
217 handler_.onClose(session(), ec_);
218 JLOG(journal_.trace()) << id_ << "destroyed: " << request_count_
219 << ((request_count_ == 1) ? " request"
220 : " requests");
221}
222
223template <class Handler, class Impl>
224void
226{
227 if (!strand_.running_in_this_thread())
228 return post(
229 strand_,
230 std::bind(
231 (void(BaseHTTPPeer::*)(void)) & BaseHTTPPeer::close,
232 impl().shared_from_this()));
233 boost::beast::get_lowest_layer(impl().stream_).close();
234}
235
236//------------------------------------------------------------------------------
237
238template <class Handler, class Impl>
239void
241{
242 if (!ec_ && ec != boost::asio::error::operation_aborted)
243 {
244 ec_ = ec;
245 JLOG(journal_.trace())
246 << id_ << std::string(what) << ": " << ec.message();
247 boost::beast::get_lowest_layer(impl().stream_).close();
248 }
249}
250
251template <class Handler, class Impl>
252void
254{
255 boost::beast::get_lowest_layer(impl().stream_)
256 .expires_after(std::chrono::seconds(
257 remote_address_.address().is_loopback() ? timeoutSecondsLocal
258 : timeoutSeconds));
259}
260
261// Convenience for discarding the error code
262template <class Handler, class Impl>
263void
265{
266 boost::beast::get_lowest_layer(impl().stream_).expires_never();
267}
268
269// Called when session times out
270template <class Handler, class Impl>
271void
273{
274 auto ec =
275 boost::system::errc::make_error_code(boost::system::errc::timed_out);
276 fail(ec, "timer");
277}
278
279//------------------------------------------------------------------------------
280
281template <class Handler, class Impl>
282void
284{
285 complete_ = false;
286 error_code ec;
287 start_timer();
288 boost::beast::http::async_read(
289 impl().stream_, read_buf_, message_, do_yield[ec]);
290 cancel_timer();
291 if (ec == boost::beast::http::error::end_of_stream)
292 return do_close();
293 if (ec == boost::beast::error::timeout)
294 return on_timer();
295 if (ec)
296 return fail(ec, "http::read");
297 do_request();
298}
299
300// Send everything in the write queue.
301// The write queue must not be empty upon entry.
302template <class Handler, class Impl>
303void
305 error_code const& ec,
306 std::size_t bytes_transferred)
307{
308 cancel_timer();
309 if (ec == boost::beast::error::timeout)
310 return on_timer();
311 if (ec)
312 return fail(ec, "write");
313 bytes_out_ += bytes_transferred;
314 {
315 std::lock_guard lock(mutex_);
316 wq2_.clear();
317 wq2_.reserve(wq_.size());
318 std::swap(wq2_, wq_);
319 }
320 if (!wq2_.empty())
321 {
323 v.reserve(wq2_.size());
324 for (auto const& b : wq2_)
325 v.emplace_back(b.data.get(), b.bytes);
326 start_timer();
327 return boost::asio::async_write(
328 impl().stream_,
329 v,
330 bind_executor(
331 strand_,
332 std::bind(
334 impl().shared_from_this(),
335 std::placeholders::_1,
336 std::placeholders::_2)));
337 }
338 if (!complete_)
339 return;
340 if (graceful_)
341 return do_close();
343 strand_,
344 std::bind(
346 impl().shared_from_this(),
347 std::placeholders::_1));
348}
349
350template <class Handler, class Impl>
351void
353 std::shared_ptr<Writer> const& writer,
354 bool keep_alive,
355 yield_context do_yield)
356{
357 std::function<void(void)> resume;
358 {
359 auto const p = impl().shared_from_this();
360 resume = std::function<void(void)>([this, p, writer, keep_alive]() {
362 strand_,
363 std::bind(
365 p,
366 writer,
367 keep_alive,
368 std::placeholders::_1));
369 });
370 }
371
372 for (;;)
373 {
374 if (!writer->prepare(bufferSize, resume))
375 return;
376 error_code ec;
377 auto const bytes_transferred = boost::asio::async_write(
378 impl().stream_,
379 writer->data(),
380 boost::asio::transfer_at_least(1),
381 do_yield[ec]);
382 if (ec)
383 return fail(ec, "writer");
384 writer->consume(bytes_transferred);
385 if (writer->complete())
386 break;
387 }
388
389 if (!keep_alive)
390 return do_close();
391
393 strand_,
394 std::bind(
396 impl().shared_from_this(),
397 std::placeholders::_1));
398}
399
400//------------------------------------------------------------------------------
401
402// Send a copy of the data.
403template <class Handler, class Impl>
404void
406{
407 if (bytes == 0)
408 return;
409 if ([&] {
410 std::lock_guard lock(mutex_);
411 wq_.emplace_back(buf, bytes);
412 return wq_.size() == 1 && wq2_.size() == 0;
413 }())
414 {
415 if (!strand_.running_in_this_thread())
416 return post(
417 strand_,
418 std::bind(
420 impl().shared_from_this(),
421 error_code{},
422 0));
423 else
424 return on_write(error_code{}, 0);
425 }
426}
427
428template <class Handler, class Impl>
429void
431 std::shared_ptr<Writer> const& writer,
432 bool keep_alive)
433{
435 strand_,
436 std::bind(
438 impl().shared_from_this(),
439 writer,
440 keep_alive,
441 std::placeholders::_1));
442}
443
444// DEPRECATED
445// Make the Session asynchronous
446template <class Handler, class Impl>
449{
450 return impl().shared_from_this();
451}
452
453// DEPRECATED
454// Called to indicate the response has been written(but not sent)
455template <class Handler, class Impl>
456void
458{
459 if (!strand_.running_in_this_thread())
460 return post(
461 strand_,
462 std::bind(
464 impl().shared_from_this()));
465
466 message_ = {};
467 complete_ = true;
468
469 {
470 std::lock_guard lock(mutex_);
471 if (!wq_.empty() && !wq2_.empty())
472 return;
473 }
474
475 // keep-alive
477 strand_,
478 std::bind(
480 impl().shared_from_this(),
481 std::placeholders::_1));
482}
483
484// DEPRECATED
485// Called from the Handler to close the session.
486template <class Handler, class Impl>
487void
489{
490 if (!strand_.running_in_this_thread())
491 return post(
492 strand_,
493 std::bind(
494 (void(BaseHTTPPeer::*)(bool)) &
496 impl().shared_from_this(),
497 graceful));
498
499 complete_ = true;
500 if (graceful)
501 {
502 graceful_ = true;
503 {
504 std::lock_guard lock(mutex_);
505 if (!wq_.empty() || !wq2_.empty())
506 return;
507 }
508 return do_close();
509 }
510
511 boost::beast::get_lowest_layer(impl().stream_).close();
512}
513
514} // namespace ripple
515
516#endif
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
A generic endpoint for log messages.
Definition Journal.h:41
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Represents an active connection.
BaseHTTPPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, beast::Journal journal, endpoint_type remote_address, ConstBufferSequence const &buffers)
boost::asio::yield_context yield_context
void write(void const *buffer, std::size_t bytes) override
boost::system::error_code error_code
Port const & port() override
Returns the Port settings for this connection.
http_request_type message_
std::vector< buffer > wq2_
boost::asio::strand< boost::asio::executor > strand_
void on_write(error_code const &ec, std::size_t bytes_transferred)
void write(std::shared_ptr< Writer > const &writer, bool keep_alive) override
virtual void do_close()=0
beast::Journal const journal_
endpoint_type remote_address_
boost::asio::ip::tcp::endpoint endpoint_type
std::size_t bytes_out_
virtual void do_request()=0
std::size_t bytes_in_
boost::asio::streambuf read_buf_
std::vector< buffer > wq_
boost::system::error_code ec_
beast::IP::Endpoint remoteAddress() override
Returns the remote address of the connection.
void close(bool graceful) override
Close the session.
std::shared_ptr< Session > detach() override
Detach the session.
http_request_type & request() override
Returns the current HTTP request.
void complete() override
Indicate that the response is complete.
void do_writer(std::shared_ptr< Writer > const &writer, bool keep_alive, yield_context do_yield)
void fail(error_code ec, char const *what)
void do_read(yield_context do_yield)
void close() override
boost::asio::executor_work_guard< boost::asio::executor > work_
beast::Journal journal() override
Returns the Journal to use for logging.
Persistent state information for a connection session.
Definition Session.h:24
T emplace_back(T... args)
T get(T... args)
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Definition Spawn.h:68
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:14
T reserve(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
buffer(void const *ptr, std::size_t len)
std::unique_ptr< char[]> data
Configuration information for a Server listening port.
Definition Port.h:31
T swap(T... args)
T to_string(T... args)