85 return static_cast<Derived<HandlerType>&
>(*this);
92 explicit SendLambda(HttpBase& self) : self(self)
96 template <
bool IsRequest,
typename Body,
typename Fields>
98 operator()(http::message<IsRequest, Body, Fields>&& msg)
const
105 auto sp = std::make_shared<http::message<IsRequest, Body, Fields>>(std::move(msg));
112 self.derived().stream(),
114 boost::beast::bind_front_handler(
115 &HttpBase::onWrite, self.derived().shared_from_this(), sp->need_eof()
121 std::shared_ptr<void> res_;
123 std::shared_ptr<AdminVerificationStrategy> adminVerification_;
124 std::shared_ptr<ProxyIpResolver> proxyIpResolver_;
125 bool isProxyConnection_ =
false;
128 boost::beast::flat_buffer buffer_;
129 http::request<http::string_body> req_;
130 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
131 std::shared_ptr<HandlerType>
const handler_;
132 std::reference_wrapper<data::LedgerCacheInterface const> cache_;
137 httpFail(boost::beast::error_code ec,
char const* what)
156 if (ec == boost::asio::ssl::error::stream_truncated)
159 if (!ec_ && ec != boost::asio::error::operation_aborted) {
161 LOG(perfLog_.info()) <<
tag() <<
": " << what <<
": " << ec.message();
162 boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
168 std::string
const& ip,
169 std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
170 std::shared_ptr<AdminVerificationStrategy> adminVerification,
171 std::shared_ptr<ProxyIpResolver> proxyIpResolver,
172 std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
173 std::shared_ptr<HandlerType> handler,
174 std::reference_wrapper<data::LedgerCacheInterface const> cache,
175 boost::beast::flat_buffer buffer
179 , adminVerification_(std::move(adminVerification))
180 , proxyIpResolver_(std::move(proxyIpResolver))
181 , buffer_(std::move(buffer))
182 , dosGuard_(dosGuard)
183 , handler_(std::move(handler))
186 LOG(perfLog_.debug()) <<
tag() <<
"http session created";
187 dosGuard_.get().increment(ip);
192 LOG(perfLog_.debug()) <<
tag() <<
"http session closed";
194 dosGuard_.get().decrement(clientIp_);
207 boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
213 boost::beast::bind_front_handler(&HttpBase::onRead, derived().shared_from_this())
218 onRead(boost::beast::error_code ec, [[maybe_unused]] std::size_t bytesTransferred)
220 if (ec == http::error::end_of_stream)
221 return derived().doClose();
224 return httpFail(ec,
"read");
226 auto const updateClientIp = [&](std::string newIp) {
227 if (newIp == clientIp_)
229 LOG(log_.info()) <<
tag()
230 <<
"Detected a forwarded request from proxy. Resolved client ip: "
232 dosGuard_.get().decrement(clientIp_);
233 clientIp_ = std::move(newIp);
234 dosGuard_.get().increment(clientIp_);
237 if (isProxyConnection_) {
239 updateClientIp(std::move(*resolvedIp));
241 auto resolvedIp = proxyIpResolver_->resolveClientIp(clientIp_, req_);
242 resolvedIp.has_value()
244 updateClientIp(std::move(*resolvedIp));
245 isProxyConnection_ =
true;
248 if (req_.method() == http::verb::get and req_.target() ==
"/health")
249 return sender_(httpResponse(http::status::ok,
"text/html", kHealthCheckHtml));
251 if (req_.method() == http::verb::get and req_.target() ==
"/cache_state") {
252 if (cache_.get().isFull()) {
253 return sender_(httpResponse(http::status::ok,
"text/html", kCacheCheckLoadedHtml));
256 return sender_(httpResponse(
257 http::status::service_unavailable,
"text/html", kCacheCheckNotLoadedHtml
262 ConnectionBase::isAdmin_ = adminVerification_->isAdmin(req_, clientIp_);
264 if (boost::beast::websocket::is_upgrade(req_)) {
265 if (dosGuard_.get().isOk(clientIp_)) {
267 boost::beast::get_lowest_layer(derived().stream()).expires_never();
270 return derived().upgrade();
274 httpResponse(http::status::too_many_requests,
"text/html",
"Too many requests")
278 if (
auto response = util::prometheus::handlePrometheusRequest(req_,
isAdmin());
279 response.has_value())
280 return sender_(std::move(response.value()));
282 if (req_.method() != http::verb::post) {
284 httpResponse(http::status::bad_request,
"text/html",
"Expected a POST request")
288 LOG(log_.info()) <<
tag() <<
"Received request from ip = " << clientIp_;
291 (*handler_)(req_.body(), derived().shared_from_this());
292 }
catch (std::exception
const&) {
293 return sender_(httpResponse(
294 http::status::internal_server_error,
296 boost::json::serialize(
rpc::makeError(rpc::RippledError::rpcINTERNAL))
304 sender_(httpResponse(
305 http::status::service_unavailable,
307 boost::json::serialize(
rpc::makeError(rpc::RippledError::rpcSLOW_DOWN))
317 send(std::string&& msg, http::status status = http::status::ok)
override
319 if (!dosGuard_.get().add(clientIp_, msg.size())) {
320 auto jsonResponse = boost::json::parse(msg).as_object();
321 jsonResponse[
"warning"] =
"load";
322 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
323 jsonResponse[
"warnings"].as_array().push_back(
327 jsonResponse[
"warnings"] =
332 msg = boost::json::serialize(jsonResponse);
334 sender_(httpResponse(status,
"application/json", std::move(msg)));
340 ASSERT(
false,
"SubscriptionContext can't be created for a HTTP connection");
345 onWrite(
bool close, boost::beast::error_code ec, std::size_t bytesTransferred)
347 boost::ignore_unused(bytesTransferred);
350 return httpFail(ec,
"write");
355 return derived().doClose();
362 [[nodiscard]] http::response<http::string_body>
363 httpResponse(http::status status, std::string contentType, std::string message)
const
365 http::response<http::string_body> res{status, req_.version()};
366 res.set(http::field::server,
"clio-server-" + util::build::getClioVersionString());
367 res.set(http::field::content_type, contentType);
368 res.keep_alive(req_.keep_alive());
369 res.body() = std::move(message);
370 res.prepare_payload();