22#include "data/BackendInterface.hpp"
24#include "rpc/Factories.hpp"
27#include "rpc/common/impl/APIVersionParser.hpp"
28#include "util/Assert.hpp"
29#include "util/CoroutineGroup.hpp"
30#include "util/JsonUtils.hpp"
31#include "util/Profiler.hpp"
32#include "util/Taggable.hpp"
33#include "util/log/Logger.hpp"
34#include "web/SubscriptionContextInterface.hpp"
35#include "web/ng/Connection.hpp"
36#include "web/ng/Request.hpp"
37#include "web/ng/Response.hpp"
38#include "web/ng/impl/ErrorHandling.hpp"
40#include <boost/asio/spawn.hpp>
41#include <boost/asio/steady_timer.hpp>
42#include <boost/beast/core/error.hpp>
43#include <boost/beast/http/status.hpp>
44#include <boost/json/array.hpp>
45#include <boost/json/object.hpp>
46#include <boost/json/parse.hpp>
47#include <boost/json/serialize.hpp>
48#include <boost/system/system_error.hpp>
49#include <xrpl/protocol/jss.h>
68template <
typename RPCEngineType,
typename ETLType>
70 std::shared_ptr<BackendInterface const>
const backend_;
71 std::shared_ptr<RPCEngineType>
const rpcEngine_;
72 std::shared_ptr<ETLType const>
const etl_;
90 std::shared_ptr<BackendInterface const>
const& backend,
91 std::shared_ptr<RPCEngineType>
const& rpcEngine,
92 std::shared_ptr<ETLType const>
const&
etl
95 , rpcEngine_(rpcEngine)
98 , apiVersionParser_(config.getObject(
"api_version"))
116 boost::asio::yield_context yield
119 std::optional<Response> response;
122 ASSERT(onTaskComplete.has_value(),
"Coroutine group can't be full");
124 bool const postSuccessful = rpcEngine_->post(
128 &onTaskComplete = onTaskComplete.value(),
130 subscriptionContext = std::move(subscriptionContext)](boost::asio::yield_context yield)
mutable {
132 auto parsedRequest = boost::json::parse(request.message()).as_object();
133 LOG(perfLog_.debug()) << connectionMetadata.tag() <<
"Adding to work queue";
135 if (not connectionMetadata.wasUpgraded() and shouldReplaceParams(parsedRequest))
136 parsedRequest[JS(params)] = boost::json::array({boost::json::object{}});
138 response = handleRequest(
139 yield, request, std::move(parsedRequest), connectionMetadata, std::move(subscriptionContext)
141 } catch (boost::system::system_error
const& ex) {
143 rpcEngine_->notifyBadSyntax();
145 LOG(log_.
warn()) <<
"Error parsing JSON: " << ex.what() <<
". For request: " << request.
message();
146 }
catch (std::invalid_argument
const& ex) {
148 rpcEngine_->notifyBadSyntax();
149 LOG(log_.
warn()) <<
"Invalid argument error: " << ex.what()
150 <<
". For request: " << request.
message();
152 }
catch (std::exception
const& ex) {
153 LOG(perfLog_.
error()) << connectionMetadata.
tag() <<
"Caught exception: " << ex.what();
154 rpcEngine_->notifyInternalError();
161 connectionMetadata.ip()
164 if (not postSuccessful) {
166 onTaskComplete->operator()();
167 rpcEngine_->notifyTooBusy();
172 coroutineGroup.asyncWait(yield);
173 ASSERT(response.has_value(),
"Woke up coroutine without setting response");
174 return std::move(response).value();
180 boost::asio::yield_context yield,
181 Request
const& rawRequest,
182 boost::json::object&& request,
183 ConnectionMetadata
const& connectionMetadata,
187 LOG(log_.info()) << connectionMetadata.tag() << (connectionMetadata.wasUpgraded() ?
"ws" :
"http")
189 <<
" ip = " << connectionMetadata.ip();
192 auto const range = backend_->fetchLedgerRange();
195 rpcEngine_->notifyNotReady();
196 return impl::ErrorHelper{rawRequest, std::move(request)}.makeNotReadyError();
199 auto const context = [&] {
200 if (connectionMetadata.wasUpgraded()) {
201 ASSERT(subscriptionContext !=
nullptr,
"Subscription context must exist for a WS connecton");
205 std::move(subscriptionContext),
206 tagFactory_.
with(connectionMetadata.tag()),
208 connectionMetadata.ip(),
209 std::cref(apiVersionParser_),
210 connectionMetadata.isAdmin()
216 tagFactory_.
with(connectionMetadata.tag()),
218 connectionMetadata.ip(),
219 std::cref(apiVersionParser_),
220 connectionMetadata.isAdmin()
225 auto const err = context.error();
226 LOG(perfLog_.warn()) << connectionMetadata.tag() <<
"Could not create Web context: " << err;
227 LOG(log_.warn()) << connectionMetadata.tag() <<
"Could not create Web context: " << err;
231 rpcEngine_->notifyBadSyntax();
232 return impl::ErrorHelper(rawRequest, std::move(request)).makeError(err);
235 auto [result, timeDiff] =
util::timed([&]() {
return rpcEngine_->buildResponse(*context); });
237 auto us = std::chrono::duration<int, std::milli>(timeDiff);
240 boost::json::object response;
242 if (
auto const status = std::get_if<rpc::Status>(&result.response)) {
244 response = impl::ErrorHelper(rawRequest, request).composeError(*status);
245 auto const responseStr = boost::json::serialize(response);
247 LOG(perfLog_.debug()) << context->tag() <<
"Encountered error: " << responseStr;
248 LOG(log_.debug()) << context->tag() <<
"Encountered error: " << responseStr;
251 rpcEngine_->notifyComplete(context->method, us);
253 auto& json = std::get<boost::json::object>(result.response);
254 auto const isForwarded =
255 json.contains(
"forwarded") && json.at(
"forwarded").is_bool() && json.at(
"forwarded").as_bool();
258 json.erase(
"forwarded");
263 if (isForwarded && (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) {
264 for (
auto const& [k, v] : json)
265 response.insert_or_assign(k, v);
267 response[JS(result)] = json;
271 response[
"forwarded"] =
true;
275 if (connectionMetadata.wasUpgraded()) {
276 auto const appendFieldIfExist = [&](
auto const& field) {
277 if (request.contains(field) and not request.at(field).is_null())
278 response[field] = request.at(field);
281 appendFieldIfExist(JS(
id));
282 appendFieldIfExist(JS(api_version));
284 if (!response.contains(JS(error)))
285 response[JS(status)] = JS(success);
287 response[JS(type)] = JS(response);
289 if (response.contains(JS(result)) && !response[JS(result)].as_object().contains(JS(error)))
290 response[JS(result)].as_object()[JS(status)] = JS(success);
294 boost::json::array warnings = std::move(result.warnings);
297 if (etl_->lastCloseAgeSeconds() >= 60)
300 response[
"warnings"] = warnings;
301 return Response{boost::beast::http::status::ok, response, rawRequest};
302 }
catch (std::exception
const& ex) {
305 LOG(perfLog_.error()) << connectionMetadata.tag() <<
"Caught exception: " << ex.what();
306 LOG(log_.error()) << connectionMetadata.tag() <<
"Caught exception: " << ex.what();
308 rpcEngine_->notifyInternalError();
309 return impl::ErrorHelper(rawRequest, std::move(request)).makeInternalError();
314 shouldReplaceParams(boost::json::object
const& req)
const
316 auto const hasParams = req.contains(JS(params));
317 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
318 auto const paramsIsEmptyString =
319 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
320 auto const paramsIsEmptyObject =
321 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
322 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
323 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
324 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
325 auto const firstArgIsNull = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
326 auto const firstArgIsEmptyString = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_string() and
327 req.at(JS(params)).as_array().at(0).as_string().empty();
330 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or arrayIsEmpty or
331 firstArgIsEmptyString or firstArgIsNull;
Definition APIVersionParser.hpp:34
CoroutineGroup is a helper class to manage a group of coroutines. It allows to spawn multiple corouti...
Definition CoroutineGroup.hpp:37
std::optional< std::function< void()> > registerForeign()
Register a foreign coroutine this group should wait for.
Definition CoroutineGroup.cpp:59
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:210
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:215
A factory for TagDecorator instantiation.
Definition Taggable.hpp:169
TagDecoratorFactory with(ParentType parent) const noexcept
Creates a new tag decorator factory with a bound parent tag decorator.
Definition Taggable.cpp:66
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:267
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
The server handler for RPC requests called by web server.
Definition RPCServerHandler.hpp:69
Response operator()(Request const &request, ConnectionMetadata const &connectionMetadata, SubscriptionContextPtr subscriptionContext, boost::asio::yield_context yield)
The callback when server receives a request.
Definition RPCServerHandler.hpp:112
RPCServerHandler(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface const > const &backend, std::shared_ptr< RPCEngineType > const &rpcEngine, std::shared_ptr< ETLType const > const &etl)
Create a new server handler.
Definition RPCServerHandler.hpp:88
Represents an HTTP or WebSocket request.
Definition Request.hpp:37
std::string_view message() const
Get the body (in case of an HTTP request) or the message (in case of a WebSocket request).
Definition Request.cpp:93
Represents an HTTP or Websocket response.
Definition Response.hpp:40
A helper that attempts to match rippled reporting mode HTTP errors as close as possible.
Definition ErrorHandling.hpp:41
Response makeTooBusyError() const
Make a response for when the server is too busy.
Definition ErrorHandling.cpp:133
Response makeJsonParsingError() const
Make a response when json parsing fails.
Definition ErrorHandling.cpp:143
Response makeInternalError() const
Make an internal error response.
Definition ErrorHandling.cpp:121
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:36
void logDuration(web::Context const &ctx, T const &dur)
Log the duration of the request processing.
Definition RPCHelpers.hpp:658
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:77
boost::json::object removeSecret(boost::json::object const &object)
Removes any detected secret information from a response JSON object.
Definition JsonUtils.hpp:67
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86