86 return static_cast<Derived<HandlerType>&
>(*this);
93 explicit SendLambda(
HttpBase& self) : self(self)
97 template <
bool IsRequest,
typename Body,
typename Fields>
99 operator()(http::message<IsRequest, Body, Fields>&& msg)
const
106 auto sp = std::make_shared<http::message<IsRequest, Body, Fields>>(std::move(msg));
113 self.derived().stream(),
115 boost::beast::bind_front_handler(&HttpBase::onWrite, self.derived().shared_from_this(), sp->need_eof())
120 std::shared_ptr<void> res_;
122 std::shared_ptr<AdminVerificationStrategy> adminVerification_;
125 boost::beast::flat_buffer buffer_;
126 http::request<http::string_body> req_;
127 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
128 std::shared_ptr<HandlerType>
const handler_;
133 httpFail(boost::beast::error_code ec,
char const* what)
152 if (ec == boost::asio::ssl::error::stream_truncated)
155 if (!ec_ && ec != boost::asio::error::operation_aborted) {
157 LOG(perfLog_.
info()) <<
tag() <<
": " << what <<
": " << ec.message();
158 boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
164 std::string
const& ip,
165 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
166 std::shared_ptr<AdminVerificationStrategy> adminVerification,
167 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
168 std::shared_ptr<HandlerType> handler,
169 boost::beast::flat_buffer buffer
173 , adminVerification_(std::move(adminVerification))
174 , buffer_(std::move(buffer))
175 , dosGuard_(dosGuard)
176 , handler_(std::move(handler))
178 LOG(perfLog_.
debug()) <<
tag() <<
"http session created";
179 dosGuard_.get().increment(ip);
184 LOG(perfLog_.
debug()) <<
tag() <<
"http session closed";
186 dosGuard_.get().decrement(this->clientIp);
199 boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
205 boost::beast::bind_front_handler(&HttpBase::onRead, derived().shared_from_this())
210 onRead(boost::beast::error_code ec, [[maybe_unused]] std::size_t bytesTransferred)
212 if (ec == http::error::end_of_stream)
213 return derived().doClose();
216 return httpFail(ec,
"read");
218 if (req_.method() == http::verb::get and req_.target() ==
"/health")
219 return sender_(httpResponse(http::status::ok,
"text/html", kHEALTH_CHECK_HTML));
222 ConnectionBase::isAdmin_ = adminVerification_->isAdmin(req_, this->clientIp);
224 if (boost::beast::websocket::is_upgrade(req_)) {
225 if (dosGuard_.get().isOk(this->clientIp)) {
227 boost::beast::get_lowest_layer(derived().stream()).expires_never();
230 return derived().upgrade();
233 return sender_(httpResponse(http::status::too_many_requests,
"text/html",
"Too many requests"));
236 if (
auto response = util::prometheus::handlePrometheusRequest(req_,
isAdmin()); response.has_value())
237 return sender_(std::move(response.value()));
239 if (req_.method() != http::verb::post) {
240 return sender_(httpResponse(http::status::bad_request,
"text/html",
"Expected a POST request"));
246 if (!dosGuard_.get().request(clientIp)) {
248 return sender_(httpResponse(
249 http::status::service_unavailable,
251 boost::json::serialize(
rpc::makeError(rpc::RippledError::rpcSLOW_DOWN))
255 LOG(log_.
info()) <<
tag() <<
"Received request from ip = " << clientIp <<
" - posting to WorkQueue";
258 (*handler_)(req_.body(), derived().shared_from_this());
259 }
catch (std::exception
const&) {
260 return sender_(httpResponse(
261 http::status::internal_server_error,
263 boost::json::serialize(
rpc::makeError(rpc::RippledError::rpcINTERNAL))
274 send(std::string&& msg, http::status status = http::status::ok)
override
276 if (!dosGuard_.get().add(clientIp, msg.size())) {
277 auto jsonResponse = boost::json::parse(msg).as_object();
278 jsonResponse[
"warning"] =
"load";
279 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
280 jsonResponse[
"warnings"].as_array().push_back(
rpc::makeWarning(rpc::WarnRpcRateLimit));
282 jsonResponse[
"warnings"] = boost::json::array{
rpc::makeWarning(rpc::WarnRpcRateLimit)};
286 msg = boost::json::serialize(jsonResponse);
288 sender_(httpResponse(status,
"application/json", std::move(msg)));
294 ASSERT(
false,
"SubscriptionContext can't be created for a HTTP connection");
299 onWrite(
bool close, boost::beast::error_code ec, std::size_t bytesTransferred)
301 boost::ignore_unused(bytesTransferred);
304 return httpFail(ec,
"write");
309 return derived().doClose();
316 http::response<http::string_body>
317 httpResponse(http::status status, std::string contentType, std::string message)
const
319 http::response<http::string_body> res{status, req_.version()};
320 res.set(http::field::server,
"clio-server-" + util::build::getClioVersionString());
321 res.set(http::field::content_type, contentType);
322 res.keep_alive(req_.keep_alive());
323 res.body() = std::move(message);
324 res.prepare_payload();