xrpld
Loading...
Searching...
No Matches
BaseHTTPPeer.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/beast/net/IPAddressConversion.h>
5#include <xrpl/beast/utility/instrumentation.h>
6#include <xrpl/server/Session.h>
7#include <xrpl/server/detail/Spawn.h>
8#include <xrpl/server/detail/io_list.h>
9
10#include <boost/asio/ip/tcp.hpp>
11#include <boost/asio/spawn.hpp>
12#include <boost/asio/ssl/stream.hpp>
13#include <boost/asio/strand.hpp>
14#include <boost/asio/streambuf.hpp>
15#include <boost/beast/core/stream_traits.hpp>
16#include <boost/beast/http/dynamic_body.hpp>
17#include <boost/beast/http/message.hpp>
18#include <boost/beast/http/parser.hpp>
19#include <boost/beast/http/read.hpp>
20
21#include <atomic>
22#include <chrono>
23#include <functional>
24#include <memory>
25#include <mutex>
26#include <utility>
27#include <vector>
28
29namespace xrpl {
30
32template <class Handler, class Impl>
33class BaseHTTPPeer : public IOList::Work, public Session
34{
35protected:
37 using error_code = boost::system::error_code;
38 using endpoint_type = boost::asio::ip::tcp::endpoint;
39 using yield_context = boost::asio::yield_context;
40
41 static constexpr auto kBufferSize = 4 * 1024; // size of read/write buffer
42 static constexpr auto kTimeoutSeconds = 30; // max seconds without completing a message
43 static constexpr auto kTimeoutSecondsLocal = 3; // used for localhost clients
44
45 struct Buffer
46 {
47 Buffer(void const* ptr, std::size_t len) : data(new char[len]), bytes(len)
48 {
49 memcpy(data.get(), ptr, len);
50 }
51
55 };
56
57 Port const& port_;
58 Handler& handler_;
59 boost::asio::executor_work_guard<boost::asio::executor> work_;
60 boost::asio::strand<boost::asio::executor> strand_;
63
66
67 boost::asio::streambuf readBuf_;
72 bool graceful_ = false;
73 bool complete_ = false;
74 boost::system::error_code ec_;
75
79
80 //--------------------------------------------------------------------------
81
82public:
83 template <class ConstBufferSequence>
85 Port const& port,
86 Handler& handler,
87 boost::asio::executor const& executor,
90 ConstBufferSequence const& buffers);
91
92 ~BaseHTTPPeer() override;
93
94 Session&
96 {
97 return *this;
98 }
99
100 void
101 close() override;
102
103protected:
104 Impl&
106 {
107 return *static_cast<Impl*>(this);
108 }
109
110 void
111 fail(error_code ec, char const* what);
112
113 void
115
116 void
118
119 void
121
122 void
124
125 void
126 onWrite(error_code const& ec, std::size_t bytesTransferred);
127
128 void
129 doWriter(std::shared_ptr<Writer> const& writer, bool keepAlive, yield_context doYield);
130
131 virtual void
133
134 virtual void
135 doClose() = 0;
136
137 // Session
138
140 journal() override
141 {
142 return journal_;
143 }
144
145 Port const&
146 port() override
147 {
148 return port_;
149 }
150
156
158 request() override
159 {
160 return message_;
161 }
162
163 void
164 write(void const* buffer, std::size_t bytes) override;
165
166 void
167 write(std::shared_ptr<Writer> const& writer, bool keepAlive) override;
168
170 detach() override;
171
172 void
173 complete() override;
174
175 void
176 close(bool graceful) override;
177};
178
179//------------------------------------------------------------------------------
180
181template <class Handler, class Impl>
182template <class ConstBufferSequence>
184 Port const& port,
185 Handler& handler,
186 boost::asio::executor const& executor,
189 ConstBufferSequence const& buffers)
190 : port_(port)
191 , handler_(handler)
192 , work_(boost::asio::make_work_guard(executor))
193 , strand_(boost::asio::make_strand(executor))
196{
197 readBuf_.commit(
198 boost::asio::buffer_copy(readBuf_.prepare(boost::asio::buffer_size(buffers)), buffers));
199 static std::atomic<int> kSid;
200 nid_ = ++kSid;
201 id_ = std::string("#") + std::to_string(nid_) + " ";
202 JLOG(journal_.trace()) << id_ << "accept: " << remoteAddress_.address();
203}
204
205template <class Handler, class Impl>
207{
208 handler_.onClose(session(), ec_);
209 JLOG(journal_.trace()) << id_ << "destroyed: " << requestCount_
210 << ((requestCount_ == 1) ? " request" : " requests");
211}
212
213template <class Handler, class Impl>
214void
216{
217 if (!strand_.running_in_this_thread())
218 {
219 return post(
220 strand_,
221 std::bind(
222 (void (BaseHTTPPeer::*)(void))&BaseHTTPPeer::close, impl().shared_from_this()));
223 }
224 boost::beast::get_lowest_layer(impl().stream_).close();
225}
226
227//------------------------------------------------------------------------------
228
229template <class Handler, class Impl>
230void
232{
233 if (!ec_ && ec != boost::asio::error::operation_aborted)
234 {
235 ec_ = ec;
236 JLOG(journal_.trace()) << id_ << std::string(what) << ": " << ec.message();
237 boost::beast::get_lowest_layer(impl().stream_).close();
238 }
239}
240
241template <class Handler, class Impl>
242void
244{
245 boost::beast::get_lowest_layer(impl().stream_)
246 .expires_after(
248 remoteAddress_.address().is_loopback() ? kTimeoutSecondsLocal : kTimeoutSeconds));
249}
250
251// Convenience for discarding the error code
252template <class Handler, class Impl>
253void
255{
256 boost::beast::get_lowest_layer(impl().stream_).expires_never();
257}
258
259// Called when session times out
260template <class Handler, class Impl>
261void
263{
264 auto ec = boost::system::errc::make_error_code(boost::system::errc::timed_out);
265 fail(ec, "timer");
266}
267
268//------------------------------------------------------------------------------
269
270template <class Handler, class Impl>
271void
273{
274 complete_ = false;
275 error_code ec;
276 startTimer();
277 boost::beast::http::async_read(impl().stream_, readBuf_, message_, doYield[ec]);
278 cancelTimer();
279 if (ec == boost::beast::http::error::end_of_stream)
280 return doClose();
281 if (ec == boost::beast::error::timeout)
282 return onTimer();
283 if (ec)
284 return fail(ec, "http::read");
285 doRequest();
286}
287
288// Send everything in the write queue.
289// The write queue must not be empty upon entry.
290template <class Handler, class Impl>
291void
293{
294 cancelTimer();
295 if (ec == boost::beast::error::timeout)
296 return onTimer();
297 if (ec)
298 return fail(ec, "write");
299 bytesOut_ += bytesTransferred;
300 {
301 std::scoped_lock const lock(mutex_);
302 wq2_.clear();
303 wq2_.reserve(wq_.size());
305 }
306 if (!wq2_.empty())
307 {
309 v.reserve(wq2_.size());
310 for (auto const& b : wq2_)
311 v.emplace_back(b.data.get(), b.bytes);
312 startTimer();
313 return boost::asio::async_write(
314 impl().stream_,
315 v,
316 bind_executor(
317 strand_,
318 std::bind(
320 impl().shared_from_this(),
321 std::placeholders::_1,
322 std::placeholders::_2)));
323 }
324 if (!complete_)
325 return;
326 if (graceful_)
327 return doClose();
329 strand_,
330 std::bind(
332 impl().shared_from_this(),
333 std::placeholders::_1));
334}
335
336template <class Handler, class Impl>
337void
339 std::shared_ptr<Writer> const& writer,
340 bool keepAlive,
341 yield_context doYield)
342{
343 std::function<void(void)> resume;
344 {
345 auto const p = impl().shared_from_this();
346 resume = std::function<void(void)>([this, p, writer, keepAlive]() {
348 strand_,
349 std::bind(
351 p,
352 writer,
353 keepAlive,
354 std::placeholders::_1));
355 });
356 }
357
358 for (;;)
359 {
360 if (!writer->prepare(kBufferSize, resume))
361 return;
362 error_code ec;
363 auto const bytesTransferred = boost::asio::async_write(
364 impl().stream_, writer->data(), boost::asio::transfer_at_least(1), doYield[ec]);
365 if (ec)
366 return fail(ec, "writer");
367 writer->consume(bytesTransferred);
368 if (writer->complete())
369 break;
370 }
371
372 if (!keepAlive)
373 return doClose();
374
376 strand_,
377 std::bind(
379 impl().shared_from_this(),
380 std::placeholders::_1));
381}
382
383//------------------------------------------------------------------------------
384
385// Send a copy of the data.
386template <class Handler, class Impl>
387void
389{
390 if (bytes == 0)
391 return;
392 if ([&] {
393 std::scoped_lock const lock(mutex_);
394 wq_.emplace_back(buf, bytes);
395 return wq_.size() == 1 && wq2_.size() == 0;
396 }())
397 {
398 if (!strand_.running_in_this_thread())
399 {
400 return post(
401 strand_,
402 std::bind(&BaseHTTPPeer::onWrite, impl().shared_from_this(), error_code{}, 0));
403 }
404 return onWrite(error_code{}, 0);
405 }
406}
407
408template <class Handler, class Impl>
409void
411{
413 strand_,
414 std::bind(
416 impl().shared_from_this(),
417 writer,
418 keepAlive,
419 std::placeholders::_1));
420}
421
422// DEPRECATED
423// Make the Session asynchronous
424template <class Handler, class Impl>
427{
428 return impl().shared_from_this();
429}
430
431// DEPRECATED
432// Called to indicate the response has been written(but not sent)
433template <class Handler, class Impl>
434void
436{
437 if (!strand_.running_in_this_thread())
438 {
439 return post(
441 }
442
443 message_ = {};
444 complete_ = true;
445
446 {
447 std::scoped_lock const lock(mutex_);
448 if (!wq_.empty() && !wq2_.empty())
449 return;
450 }
451
452 // keep-alive
454 strand_,
455 std::bind(
457 impl().shared_from_this(),
458 std::placeholders::_1));
459}
460
461// DEPRECATED
462// Called from the Handler to close the session.
463template <class Handler, class Impl>
464void
466{
467 if (!strand_.running_in_this_thread())
468 {
469 return post(
470 strand_,
471 std::bind(
473 impl().shared_from_this(),
474 graceful));
475 }
476
477 complete_ = true;
478 if (graceful)
479 {
480 graceful_ = true;
481 {
482 std::scoped_lock const lock(mutex_);
483 if (!wq_.empty() || !wq2_.empty())
484 return;
485 }
486 return doClose();
487 }
488
489 boost::beast::get_lowest_layer(impl().stream_).close();
490}
491
492} // namespace xrpl
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:17
A generic endpoint for log messages.
Definition Journal.h:38
void close() override
std::size_t bytesIn_
BaseHTTPPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, beast::Journal journal, endpoint_type remoteAddress, ConstBufferSequence const &buffers)
endpoint_type remoteAddress_
http_request_type message_
Session & session()
void write(void const *buffer, std::size_t bytes) override
boost::asio::ip::tcp::endpoint endpoint_type
void onWrite(error_code const &ec, std::size_t bytesTransferred)
boost::system::error_code ec_
std::size_t bytesOut_
boost::system::error_code error_code
beast::Journal const journal_
void fail(error_code ec, char const *what)
virtual void doClose()=0
void doWriter(std::shared_ptr< Writer > const &writer, bool keepAlive, yield_context doYield)
boost::asio::streambuf readBuf_
std::shared_ptr< Session > detach() override
Detach the session.
Port const & port_
boost::asio::strand< boost::asio::executor > strand_
static constexpr auto kTimeoutSecondsLocal
boost::asio::executor_work_guard< boost::asio::executor > work_
virtual void doRequest()=0
void complete() override
Indicate that the response is complete.
std::vector< Buffer > wq2_
~BaseHTTPPeer() override
std::chrono::system_clock clock_type
static constexpr auto kBufferSize
std::vector< Buffer > wq_
void doRead(yield_context doYield)
static constexpr auto kTimeoutSeconds
http_request_type & request() override
Returns the current HTTP request.
boost::asio::yield_context yield_context
Session()=default
T emplace_back(T... args)
STL namespace.
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn.
Definition Spawn.h:66
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
T reserve(T... args)
static IP::Endpoint fromAsio(boost::asio::ip::address const &address)
Buffer(void const *ptr, std::size_t len)
std::unique_ptr< char[]> data
Configuration information for a Server listening port.
Definition Port.h:28
T swap(T... args)
T to_string(T... args)