rippled
Loading...
Searching...
No Matches
BaseHTTPPeer.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/beast/net/IPAddressConversion.h>
5#include <xrpl/beast/utility/instrumentation.h>
6#include <xrpl/server/Session.h>
7#include <xrpl/server/detail/Spawn.h>
8#include <xrpl/server/detail/io_list.h>
9
10#include <boost/asio/ip/tcp.hpp>
11#include <boost/asio/spawn.hpp>
12#include <boost/asio/ssl/stream.hpp>
13#include <boost/asio/strand.hpp>
14#include <boost/asio/streambuf.hpp>
15#include <boost/beast/core/stream_traits.hpp>
16#include <boost/beast/http/dynamic_body.hpp>
17#include <boost/beast/http/message.hpp>
18#include <boost/beast/http/parser.hpp>
19#include <boost/beast/http/read.hpp>
20
21#include <atomic>
22#include <chrono>
23#include <functional>
24#include <memory>
25#include <mutex>
26#include <vector>
27
28namespace xrpl {
29
31template <class Handler, class Impl>
32class BaseHTTPPeer : public io_list::work, public Session
33{
34protected:
36 using error_code = boost::system::error_code;
37 using endpoint_type = boost::asio::ip::tcp::endpoint;
38 using yield_context = boost::asio::yield_context;
39
40 enum {
41 // Size of our read/write buffer
42 bufferSize = 4 * 1024,
43
44 // Max seconds without completing a message
46 timeoutSecondsLocal = 3 // used for localhost clients
47 };
48
49 struct buffer
50 {
51 buffer(void const* ptr, std::size_t len) : data(new char[len]), bytes(len)
52 {
53 memcpy(data.get(), ptr, len);
54 }
55
59 };
60
61 Port const& port_;
62 Handler& handler_;
63 boost::asio::executor_work_guard<boost::asio::executor> work_;
64 boost::asio::strand<boost::asio::executor> strand_;
67
70
71 boost::asio::streambuf read_buf_;
76 bool graceful_ = false;
77 bool complete_ = false;
78 boost::system::error_code ec_;
79
83
84 //--------------------------------------------------------------------------
85
86public:
87 template <class ConstBufferSequence>
89 Port const& port,
90 Handler& handler,
91 boost::asio::executor const& executor,
93 endpoint_type remote_address,
94 ConstBufferSequence const& buffers);
95
96 virtual ~BaseHTTPPeer();
97
98 Session&
100 {
101 return *this;
102 }
103
104 void
105 close() override;
106
107protected:
108 Impl&
110 {
111 return *static_cast<Impl*>(this);
112 }
113
114 void
115 fail(error_code ec, char const* what);
116
117 void
119
120 void
122
123 void
125
126 void
128
129 void
130 on_write(error_code const& ec, std::size_t bytes_transferred);
131
132 void
133 do_writer(std::shared_ptr<Writer> const& writer, bool keep_alive, yield_context do_yield);
134
135 virtual void
137
138 virtual void
139 do_close() = 0;
140
141 // Session
142
144 journal() override
145 {
146 return journal_;
147 }
148
149 Port const&
150 port() override
151 {
152 return port_;
153 }
154
160
162 request() override
163 {
164 return message_;
165 }
166
167 void
168 write(void const* buffer, std::size_t bytes) override;
169
170 void
171 write(std::shared_ptr<Writer> const& writer, bool keep_alive) override;
172
174 detach() override;
175
176 void
177 complete() override;
178
179 void
180 close(bool graceful) override;
181};
182
183//------------------------------------------------------------------------------
184
185template <class Handler, class Impl>
186template <class ConstBufferSequence>
188 Port const& port,
189 Handler& handler,
190 boost::asio::executor const& executor,
191 beast::Journal journal,
192 endpoint_type remote_address,
193 ConstBufferSequence const& buffers)
194 : port_(port)
195 , handler_(handler)
196 , work_(boost::asio::make_work_guard(executor))
197 , strand_(boost::asio::make_strand(executor))
198 , remote_address_(remote_address)
199 , journal_(journal)
200{
201 read_buf_.commit(
202 boost::asio::buffer_copy(read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers));
203 static std::atomic<int> sid;
204 nid_ = ++sid;
205 id_ = std::string("#") + std::to_string(nid_) + " ";
206 JLOG(journal_.trace()) << id_ << "accept: " << remote_address_.address();
207}
208
209template <class Handler, class Impl>
211{
212 handler_.onClose(session(), ec_);
213 JLOG(journal_.trace()) << id_ << "destroyed: " << request_count_
214 << ((request_count_ == 1) ? " request" : " requests");
215}
216
217template <class Handler, class Impl>
218void
220{
221 if (!strand_.running_in_this_thread())
222 return post(
223 strand_,
224 std::bind(
225 (void (BaseHTTPPeer::*)(void))&BaseHTTPPeer::close, impl().shared_from_this()));
226 boost::beast::get_lowest_layer(impl().stream_).close();
227}
228
229//------------------------------------------------------------------------------
230
231template <class Handler, class Impl>
232void
234{
235 if (!ec_ && ec != boost::asio::error::operation_aborted)
236 {
237 ec_ = ec;
238 JLOG(journal_.trace()) << id_ << std::string(what) << ": " << ec.message();
239 boost::beast::get_lowest_layer(impl().stream_).close();
240 }
241}
242
243template <class Handler, class Impl>
244void
246{
247 boost::beast::get_lowest_layer(impl().stream_)
248 .expires_after(
250 remote_address_.address().is_loopback() ? timeoutSecondsLocal : timeoutSeconds));
251}
252
253// Convenience for discarding the error code
254template <class Handler, class Impl>
255void
257{
258 boost::beast::get_lowest_layer(impl().stream_).expires_never();
259}
260
261// Called when session times out
262template <class Handler, class Impl>
263void
265{
266 auto ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
267 fail(ec, "timer");
268}
269
270//------------------------------------------------------------------------------
271
272template <class Handler, class Impl>
273void
275{
276 complete_ = false;
277 error_code ec;
278 start_timer();
279 boost::beast::http::async_read(impl().stream_, read_buf_, message_, do_yield[ec]);
280 cancel_timer();
281 if (ec == boost::beast::http::error::end_of_stream)
282 return do_close();
283 if (ec == boost::beast::error::timeout)
284 return on_timer();
285 if (ec)
286 return fail(ec, "http::read");
287 do_request();
288}
289
290// Send everything in the write queue.
291// The write queue must not be empty upon entry.
292template <class Handler, class Impl>
293void
295{
296 cancel_timer();
297 if (ec == boost::beast::error::timeout)
298 return on_timer();
299 if (ec)
300 return fail(ec, "write");
301 bytes_out_ += bytes_transferred;
302 {
303 std::lock_guard const lock(mutex_);
304 wq2_.clear();
305 wq2_.reserve(wq_.size());
306 std::swap(wq2_, wq_);
307 }
308 if (!wq2_.empty())
309 {
311 v.reserve(wq2_.size());
312 for (auto const& b : wq2_)
313 v.emplace_back(b.data.get(), b.bytes);
314 start_timer();
315 return boost::asio::async_write(
316 impl().stream_,
317 v,
318 bind_executor(
319 strand_,
320 std::bind(
322 impl().shared_from_this(),
323 std::placeholders::_1,
324 std::placeholders::_2)));
325 }
326 if (!complete_)
327 return;
328 if (graceful_)
329 return do_close();
331 strand_,
332 std::bind(
334 impl().shared_from_this(),
335 std::placeholders::_1));
336}
337
338template <class Handler, class Impl>
339void
341 std::shared_ptr<Writer> const& writer,
342 bool keep_alive,
343 yield_context do_yield)
344{
345 std::function<void(void)> resume;
346 {
347 auto const p = impl().shared_from_this();
348 resume = std::function<void(void)>([this, p, writer, keep_alive]() {
350 strand_,
351 std::bind(
353 p,
354 writer,
355 keep_alive,
356 std::placeholders::_1));
357 });
358 }
359
360 for (;;)
361 {
362 if (!writer->prepare(bufferSize, resume))
363 return;
364 error_code ec;
365 auto const bytes_transferred = boost::asio::async_write(
366 impl().stream_, writer->data(), boost::asio::transfer_at_least(1), do_yield[ec]);
367 if (ec)
368 return fail(ec, "writer");
369 writer->consume(bytes_transferred);
370 if (writer->complete())
371 break;
372 }
373
374 if (!keep_alive)
375 return do_close();
376
378 strand_,
379 std::bind(
381 impl().shared_from_this(),
382 std::placeholders::_1));
383}
384
385//------------------------------------------------------------------------------
386
387// Send a copy of the data.
388template <class Handler, class Impl>
389void
391{
392 if (bytes == 0)
393 return;
394 if ([&] {
395 std::lock_guard const lock(mutex_);
396 wq_.emplace_back(buf, bytes);
397 return wq_.size() == 1 && wq2_.size() == 0;
398 }())
399 {
400 if (!strand_.running_in_this_thread())
401 return post(
402 strand_,
403 std::bind(&BaseHTTPPeer::on_write, impl().shared_from_this(), error_code{}, 0));
404 else
405 return on_write(error_code{}, 0);
406 }
407}
408
409template <class Handler, class Impl>
410void
412{
414 strand_,
415 std::bind(
417 impl().shared_from_this(),
418 writer,
419 keep_alive,
420 std::placeholders::_1));
421}
422
423// DEPRECATED
424// Make the Session asynchronous
425template <class Handler, class Impl>
428{
429 return impl().shared_from_this();
430}
431
432// DEPRECATED
433// Called to indicate the response has been written(but not sent)
434template <class Handler, class Impl>
435void
437{
438 if (!strand_.running_in_this_thread())
439 return post(
440 strand_, std::bind(&BaseHTTPPeer<Handler, Impl>::complete, impl().shared_from_this()));
441
442 message_ = {};
443 complete_ = true;
444
445 {
446 std::lock_guard const lock(mutex_);
447 if (!wq_.empty() && !wq2_.empty())
448 return;
449 }
450
451 // keep-alive
453 strand_,
454 std::bind(
456 impl().shared_from_this(),
457 std::placeholders::_1));
458}
459
460// DEPRECATED
461// Called from the Handler to close the session.
462template <class Handler, class Impl>
463void
465{
466 if (!strand_.running_in_this_thread())
467 return post(
468 strand_,
469 std::bind(
471 impl().shared_from_this(),
472 graceful));
473
474 complete_ = true;
475 if (graceful)
476 {
477 graceful_ = true;
478 {
479 std::lock_guard const lock(mutex_);
480 if (!wq_.empty() || !wq2_.empty())
481 return;
482 }
483 return do_close();
484 }
485
486 boost::beast::get_lowest_layer(impl().stream_).close();
487}
488
489} // namespace xrpl
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
A generic endpoint for log messages.
Definition Journal.h:40
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Represents an active connection.
void close() override
boost::asio::streambuf read_buf_
http_request_type message_
Session & session()
void write(void const *buffer, std::size_t bytes) override
virtual ~BaseHTTPPeer()
beast::IP::Endpoint remoteAddress() override
Returns the remote address of the connection.
std::vector< buffer > wq_
boost::asio::ip::tcp::endpoint endpoint_type
void do_read(yield_context do_yield)
boost::system::error_code ec_
boost::system::error_code error_code
beast::Journal const journal_
void fail(error_code ec, char const *what)
endpoint_type remote_address_
virtual void do_request()=0
std::size_t bytes_out_
std::vector< buffer > wq2_
std::shared_ptr< Session > detach() override
Detach the session.
Port const & port_
void on_write(error_code const &ec, std::size_t bytes_transferred)
beast::Journal journal() override
Returns the Journal to use for logging.
boost::asio::strand< boost::asio::executor > strand_
BaseHTTPPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, beast::Journal journal, endpoint_type remote_address, ConstBufferSequence const &buffers)
boost::asio::executor_work_guard< boost::asio::executor > work_
void complete() override
Indicate that the response is complete.
void write(std::shared_ptr< Writer > const &writer, bool keep_alive) override
void close(bool graceful) override
Close the session.
Port const & port() override
Returns the Port settings for this connection.
void do_writer(std::shared_ptr< Writer > const &writer, bool keep_alive, yield_context do_yield)
http_request_type & request() override
Returns the current HTTP request.
boost::asio::yield_context yield_context
virtual void do_close()=0
std::size_t bytes_in_
Persistent state information for a connection session.
Definition Session.h:23
T emplace_back(T... args)
T get(T... args)
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Definition Spawn.h:66
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
T reserve(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
buffer(void const *ptr, std::size_t len)
std::unique_ptr< char[]> data
Configuration information for a Server listening port.
Definition Port.h:30
T swap(T... args)
T to_string(T... args)