3#include "data/BackendInterface.hpp"
4#include "etl/ETLServiceInterface.hpp"
6#include "rpc/Factories.hpp"
9#include "rpc/common/impl/APIVersionParser.hpp"
10#include "util/Assert.hpp"
11#include "util/CoroutineGroup.hpp"
12#include "util/JsonUtils.hpp"
13#include "util/Profiler.hpp"
14#include "util/Taggable.hpp"
15#include "util/log/Logger.hpp"
16#include "web/SubscriptionContextInterface.hpp"
17#include "web/dosguard/DOSGuardInterface.hpp"
18#include "web/ng/Connection.hpp"
19#include "web/ng/Request.hpp"
20#include "web/ng/Response.hpp"
21#include "web/ng/impl/ErrorHandling.hpp"
23#include <boost/asio/spawn.hpp>
24#include <boost/asio/steady_timer.hpp>
25#include <boost/beast/core/error.hpp>
26#include <boost/beast/http/status.hpp>
27#include <boost/json/array.hpp>
28#include <boost/json/object.hpp>
29#include <boost/json/parse.hpp>
30#include <boost/json/serialize.hpp>
31#include <boost/system/system_error.hpp>
32#include <xrpl/protocol/jss.h>
50template <
typename RPCEngineType>
52 std::shared_ptr<BackendInterface const>
const backend_;
53 std::shared_ptr<RPCEngineType>
const rpcEngine_;
54 std::shared_ptr<etl::ETLServiceInterface const>
const etl_;
55 std::reference_wrapper<dosguard::DOSGuardInterface> dosguard_;
74 std::shared_ptr<BackendInterface const>
const& backend,
75 std::shared_ptr<RPCEngineType>
const& rpcEngine,
76 std::shared_ptr<etl::ETLServiceInterface const>
const& etl,
80 , rpcEngine_(rpcEngine)
84 , apiVersionParser_(config.getObject(
"api_version"))
102 boost::asio::yield_context yield
105 if (not dosguard_.get().isOk(connectionMetadata.
ip())) {
106 return makeSlowDownResponse(request, std::nullopt);
109 std::optional<Response> response;
112 ASSERT(onTaskComplete.has_value(),
"Coroutine group can't be full");
114 bool const postSuccessful = rpcEngine_->post(
118 &onTaskComplete = onTaskComplete.value(),
120 subscriptionContext =
121 std::move(subscriptionContext)](boost::asio::yield_context innerYield)
mutable {
123 boost::system::error_code ec;
124 auto parsedRequest = boost::json::parse(request.message(), ec);
125 if (ec.failed() or not parsedRequest.is_object()) {
126 rpcEngine_->notifyBadSyntax();
127 response = impl::ErrorHelper{request}.makeJsonParsingError();
129 LOG(log_.warn()) <<
"Error parsing JSON: " << ec.message()
130 <<
". For request: " << request.message();
133 <<
"Received not a JSON object. For request: " << request.message();
136 auto parsedObject = std::move(parsedRequest).as_object();
138 if (not dosguard_.get().request(connectionMetadata.ip(), parsedObject)) {
139 response = makeSlowDownResponse(request, parsedObject);
141 LOG(perfLog_.debug())
142 << connectionMetadata.tag() <<
"Adding to work queue";
144 if (not connectionMetadata.wasUpgraded() and
145 shouldReplaceParams(parsedObject)) {
146 parsedObject[JS(params)] =
147 boost::json::array({boost::json::object{}});
150 response = handleRequest(
153 std::move(parsedObject),
155 std::move(subscriptionContext)
159 } catch (std::exception
const& ex) {
160 LOG(perfLog_.error())
161 << connectionMetadata.
tag() <<
"Caught exception: " << ex.what();
162 rpcEngine_->notifyInternalError();
169 connectionMetadata.ip()
172 if (not postSuccessful) {
174 onTaskComplete->operator()();
175 rpcEngine_->notifyTooBusy();
180 coroutineGroup.asyncWait(yield);
181 ASSERT(response.has_value(),
"Woke up coroutine without setting response");
183 if (not dosguard_.get().add(connectionMetadata.ip(), response->message().size())) {
184 response->setMessage(makeLoadWarning(*response));
187 return std::move(response).value();
193 boost::asio::yield_context yield,
195 boost::json::object&& request,
200 LOG(log_.
info()) << connectionMetadata.tag()
201 << (connectionMetadata.wasUpgraded() ?
"ws" :
"http")
203 <<
" ip = " << connectionMetadata.ip();
206 auto const range = backend_->fetchLedgerRange();
209 rpcEngine_->notifyNotReady();
213 auto const context = [&] {
214 if (connectionMetadata.wasUpgraded()) {
216 subscriptionContext !=
nullptr,
217 "Subscription context must exist for a WS connection"
222 std::move(subscriptionContext),
223 tagFactory_.
with(connectionMetadata.tag()),
225 connectionMetadata.ip(),
226 std::cref(apiVersionParser_),
227 connectionMetadata.isAdmin()
233 tagFactory_.
with(connectionMetadata.tag()),
235 connectionMetadata.ip(),
236 std::cref(apiVersionParser_),
237 connectionMetadata.isAdmin()
242 auto const err = context.error();
244 << connectionMetadata.tag() <<
"Could not create Web context: " << err;
245 LOG(log_.
warn()) << connectionMetadata.tag()
246 <<
"Could not create Web context: " << err;
251 rpcEngine_->notifyBadSyntax();
255 auto [result, timeDiff] =
256 util::timed([&]() {
return rpcEngine_->buildResponse(*context); });
258 auto us = std::chrono::duration<int, std::milli>(timeDiff);
261 boost::json::object response;
263 if (!result.response.has_value()) {
267 auto const responseStr = boost::json::serialize(response);
269 LOG(perfLog_.
debug()) << context->tag() <<
"Encountered error: " << responseStr;
270 LOG(log_.
debug()) << context->tag() <<
"Encountered error: " << responseStr;
272 auto& json = result.response.value();
273 auto const isForwarded = json.contains(
"forwarded") &&
274 json.at(
"forwarded").is_bool() && json.at(
"forwarded").as_bool();
278 rpcEngine_->notifyComplete(*context, us, isForwarded);
281 json.erase(
"forwarded");
287 (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) {
288 for (
auto const& [k, v] : json)
289 response.insert_or_assign(k, v);
291 response[JS(result)] = json;
295 response[
"forwarded"] =
true;
299 if (connectionMetadata.wasUpgraded()) {
300 auto const appendFieldIfExist = [&](
auto const& field) {
301 if (request.contains(field) and not request.at(field).is_null())
302 response[field] = request.at(field);
305 appendFieldIfExist(JS(
id));
306 appendFieldIfExist(JS(api_version));
308 if (!response.contains(JS(error)))
309 response[JS(status)] = JS(success);
311 response[JS(type)] = JS(response);
313 if (response.contains(JS(result)) &&
314 !response[JS(result)].as_object().contains(JS(error)))
315 response[JS(result)].as_object()[JS(status)] = JS(success);
319 boost::json::array warnings = std::move(result.warnings);
322 if (etl_->lastCloseAgeSeconds() >= 60)
325 response[
"warnings"] = warnings;
326 return Response{boost::beast::http::status::ok, response, rawRequest};
327 }
catch (std::exception
const& ex) {
330 LOG(perfLog_.
error()) << connectionMetadata.tag() <<
"Caught exception: " << ex.what();
331 LOG(log_.
error()) << connectionMetadata.tag() <<
"Caught exception: " << ex.what();
333 rpcEngine_->notifyInternalError();
339 makeSlowDownResponse(
Request const& request, std::optional<boost::json::value> requestJson)
343 if (not request.isHttp()) {
345 if (not requestJson.has_value()) {
346 requestJson = boost::json::parse(request.message());
348 if (requestJson->is_object() && requestJson->as_object().contains(
"id"))
349 error[
"id"] = requestJson->as_object().at(
"id");
350 error[
"request"] = request.message();
351 }
catch (std::exception
const&) {
352 error[
"request"] = request.message();
355 return web::ng::Response{boost::beast::http::status::service_unavailable, error, request};
358 static boost::json::object
359 makeLoadWarning(
Response const& response)
361 auto jsonResponse = boost::json::parse(response.message()).as_object();
362 jsonResponse[
"warning"] =
"load";
363 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
364 jsonResponse[
"warnings"].as_array().push_back(
rpc::makeWarning(rpc::WarnRpcRateLimit));
366 jsonResponse[
"warnings"] = boost::json::array{
rpc::makeWarning(rpc::WarnRpcRateLimit)};
372 shouldReplaceParams(boost::json::object
const& req)
const
374 auto const hasParams = req.contains(JS(params));
375 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
376 auto const paramsIsEmptyString =
377 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
378 auto const paramsIsEmptyObject =
379 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
380 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
381 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
382 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
383 auto const firstArgIsNull =
384 arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
385 auto const firstArgIsEmptyString = arrayIsNotEmpty and
386 req.at(JS(params)).as_array().at(0).is_string() and
387 req.at(JS(params)).as_array().at(0).as_string().empty();
390 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or
391 arrayIsEmpty or firstArgIsEmptyString or firstArgIsNull;
Definition APIVersionParser.hpp:15
CoroutineGroup is a helper class to manage a group of coroutines. It allows to spawn multiple corouti...
Definition CoroutineGroup.hpp:18
std::optional< std::function< void()> > registerForeign(boost::asio::yield_context yield)
Register a foreign coroutine this group should wait for.
Definition CoroutineGroup.cpp:48
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:493
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:498
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:483
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:488
A factory for TagDecorator instantiation.
Definition Taggable.hpp:165
TagDecoratorFactory with(ParentType parent) const noexcept
Creates a new tag decorator factory with a bound parent tag decorator.
Definition Taggable.cpp:47
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:264
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
The interface of a denial of service guard.
Definition DOSGuardInterface.hpp:27
RPCServerHandler(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface const > const &backend, std::shared_ptr< RPCEngineType > const &rpcEngine, std::shared_ptr< etl::ETLServiceInterface const > const &etl, dosguard::DOSGuardInterface &dosguard)
Create a new server handler.
Definition RPCServerHandler.hpp:72
Response operator()(Request const &request, ConnectionMetadata const &connectionMetadata, SubscriptionContextPtr subscriptionContext, boost::asio::yield_context yield)
The callback when server receives a request.
Definition RPCServerHandler.hpp:98
Represents an HTTP or WebSocket request.
Definition Request.hpp:18
Represents an HTTP or Websocket response.
Definition Response.hpp:21
A helper that attempts to match rippled reporting mode HTTP errors as close as possible.
Definition ErrorHandling.hpp:22
Response makeTooBusyError() const
Make a response for when the server is too busy.
Definition ErrorHandling.cpp:126
boost::json::object composeError(rpc::Status const &error) const
Compose an error into json object from a status.
Definition ErrorHandling.cpp:158
Response makeError(rpc::Status const &err) const
Make an error response from a status.
Definition ErrorHandling.cpp:62
Response makeNotReadyError() const
Make a response for when the server is not ready.
Definition ErrorHandling.cpp:120
Response makeInternalError() const
Make an internal error response.
Definition ErrorHandling.cpp:110
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:28
void logDuration(boost::json::object const &request, util::BaseTagDecorator const &tag, DurationType const &dur)
Log the duration of the request processing.
Definition RPCHelpers.hpp:756
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:160
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:84
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:63
boost::json::object removeSecret(boost::json::object const &object)
Removes any detected secret information from a response JSON object.
Definition JsonUtils.hpp:55
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:21
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:64