Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RPCServerHandler.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "etl/ETLServiceInterface.hpp"
5#include "rpc/Errors.hpp"
6#include "rpc/Factories.hpp"
7#include "rpc/JS.hpp"
8#include "rpc/RPCHelpers.hpp"
9#include "rpc/common/impl/APIVersionParser.hpp"
10#include "util/Assert.hpp"
11#include "util/CoroutineGroup.hpp"
12#include "util/JsonUtils.hpp"
13#include "util/Profiler.hpp"
14#include "util/Taggable.hpp"
15#include "util/log/Logger.hpp"
16#include "web/SubscriptionContextInterface.hpp"
17#include "web/dosguard/DOSGuardInterface.hpp"
18#include "web/ng/Connection.hpp"
19#include "web/ng/Request.hpp"
20#include "web/ng/Response.hpp"
21#include "web/ng/impl/ErrorHandling.hpp"
22
23#include <boost/asio/spawn.hpp>
24#include <boost/asio/steady_timer.hpp>
25#include <boost/beast/core/error.hpp>
26#include <boost/beast/http/status.hpp>
27#include <boost/json/array.hpp>
28#include <boost/json/object.hpp>
29#include <boost/json/parse.hpp>
30#include <boost/json/serialize.hpp>
31#include <boost/system/system_error.hpp>
32#include <xrpl/protocol/jss.h>
33
34#include <chrono>
35#include <exception>
36#include <functional>
37#include <memory>
38#include <optional>
39#include <ratio>
40#include <string>
41#include <utility>
42
43namespace web::ng {
44
50template <typename RPCEngineType>
52 std::shared_ptr<BackendInterface const> const backend_;
53 std::shared_ptr<RPCEngineType> const rpcEngine_;
54 std::shared_ptr<etl::ETLServiceInterface const> const etl_;
55 std::reference_wrapper<dosguard::DOSGuardInterface> dosguard_;
56 util::TagDecoratorFactory const tagFactory_;
57 rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed
58
59 util::Logger log_{"RPC"};
60 util::Logger perfLog_{"Performance"};
61
62public:
74 std::shared_ptr<BackendInterface const> const& backend,
75 std::shared_ptr<RPCEngineType> const& rpcEngine,
76 std::shared_ptr<etl::ETLServiceInterface const> const& etl,
78 )
79 : backend_(backend)
80 , rpcEngine_(rpcEngine)
81 , etl_(etl)
82 , dosguard_(dosguard)
83 , tagFactory_(config)
84 , apiVersionParser_(config.getObject("api_version"))
85 {
86 }
87
97 [[nodiscard]] Response
99 Request const& request,
100 ConnectionMetadata const& connectionMetadata,
101 SubscriptionContextPtr subscriptionContext,
102 boost::asio::yield_context yield
103 )
104 {
105 if (not dosguard_.get().isOk(connectionMetadata.ip())) {
106 return makeSlowDownResponse(request, std::nullopt);
107 }
108
109 std::optional<Response> response;
110 util::CoroutineGroup coroutineGroup{yield, 1};
111 auto const onTaskComplete = coroutineGroup.registerForeign(yield);
112 ASSERT(onTaskComplete.has_value(), "Coroutine group can't be full");
113
114 bool const postSuccessful = rpcEngine_->post(
115 [this,
116 &request,
117 &response,
118 &onTaskComplete = *onTaskComplete, // NOLINT(bugprone-unchecked-optional-access)
119 &connectionMetadata,
120 subscriptionContext =
121 std::move(subscriptionContext)](boost::asio::yield_context innerYield) mutable {
122 try {
123 boost::system::error_code ec;
124 auto parsedRequest = boost::json::parse(request.message(), ec);
125 if (ec.failed() or not parsedRequest.is_object()) {
126 rpcEngine_->notifyBadSyntax();
127 response = impl::ErrorHelper{request}.makeJsonParsingError();
128 if (ec.failed()) {
129 LOG(log_.warn()) << "Error parsing JSON: " << ec.message()
130 << ". For request: " << request.message();
131 } else {
132 LOG(log_.warn())
133 << "Received not a JSON object. For request: " << request.message();
134 }
135 } else {
136 auto parsedObject = std::move(parsedRequest).as_object();
137
138 if (not dosguard_.get().request(connectionMetadata.ip(), parsedObject)) {
139 response = makeSlowDownResponse(request, parsedObject);
140 } else {
141 LOG(perfLog_.debug())
142 << connectionMetadata.tag() << "Adding to work queue";
143
144 if (not connectionMetadata.wasUpgraded() and
145 shouldReplaceParams(parsedObject)) {
146 parsedObject[JS(params)] =
147 boost::json::array({boost::json::object{}});
148 }
149
150 response = handleRequest(
151 innerYield,
152 request,
153 std::move(parsedObject),
154 connectionMetadata,
155 std::move(subscriptionContext)
156 );
157 }
158 }
159 } catch (std::exception const& ex) {
160 LOG(perfLog_.error())
161 << connectionMetadata.tag() << "Caught exception: " << ex.what();
162 rpcEngine_->notifyInternalError();
163 response = impl::ErrorHelper{request}.makeInternalError();
164 }
165
166 // notify the coroutine group that the foreign task is done
167 onTaskComplete();
168 },
169 connectionMetadata.ip()
170 );
171
172 if (not postSuccessful) {
173 // onTaskComplete must be called to notify coroutineGroup that the foreign task is done
174 (*onTaskComplete)(); // NOLINT(bugprone-unchecked-optional-access)
175 rpcEngine_->notifyTooBusy();
176 return impl::ErrorHelper{request}.makeTooBusyError();
177 }
178
179 // Put the coroutine to sleep until the foreign task is done
180 coroutineGroup.asyncWait(yield);
181 ASSERT(response.has_value(), "Woke up coroutine without setting response");
182
183 // NOLINTBEGIN(bugprone-unchecked-optional-access)
184 if (not dosguard_.get().add(connectionMetadata.ip(), response->message().size())) {
185 response->setMessage(makeLoadWarning(*response));
186 }
187
188 return *std::move(response);
189 // NOLINTEND(bugprone-unchecked-optional-access)
190 }
191
192private:
194 handleRequest(
195 boost::asio::yield_context yield,
196 Request const& rawRequest,
197 boost::json::object&& request,
198 ConnectionMetadata const& connectionMetadata,
199 SubscriptionContextPtr subscriptionContext
200 )
201 {
202 LOG(log_.info()) << connectionMetadata.tag()
203 << (connectionMetadata.wasUpgraded() ? "ws" : "http")
204 << " received request from work queue: " << util::removeSecret(request)
205 << " ip = " << connectionMetadata.ip();
206
207 try {
208 auto const range = backend_->fetchLedgerRange();
209 if (!range) {
210 // for error that happened before the handler, we don't attach any warnings
211 rpcEngine_->notifyNotReady();
212 return impl::ErrorHelper{rawRequest, std::move(request)}.makeNotReadyError();
213 }
214
215 auto const context = [&] {
216 if (connectionMetadata.wasUpgraded()) {
217 ASSERT(
218 subscriptionContext != nullptr,
219 "Subscription context must exist for a WS connection"
220 );
221 return rpc::makeWsContext(
222 yield,
223 request,
224 std::move(subscriptionContext),
225 tagFactory_.with(connectionMetadata.tag()),
226 *range,
227 connectionMetadata.ip(),
228 std::cref(apiVersionParser_),
229 connectionMetadata.isAdmin()
230 );
231 }
233 yield,
234 request,
235 tagFactory_.with(connectionMetadata.tag()),
236 *range,
237 connectionMetadata.ip(),
238 std::cref(apiVersionParser_),
239 connectionMetadata.isAdmin()
240 );
241 }();
242
243 if (!context) {
244 auto const err = context.error();
245 LOG(perfLog_.warn())
246 << connectionMetadata.tag() << "Could not create Web context: " << err;
247 LOG(log_.warn()) << connectionMetadata.tag()
248 << "Could not create Web context: " << err;
249
250 // we count all those as BadSyntax - as the WS path would.
251 // Although over HTTP these will yield a 400 status with a plain text response (for
252 // most).
253 rpcEngine_->notifyBadSyntax();
254 return impl::ErrorHelper(rawRequest, std::move(request)).makeError(err);
255 }
256
257 auto [result, timeDiff] =
258 util::timed([&]() { return rpcEngine_->buildResponse(*context); });
259
260 auto us = std::chrono::duration<int, std::milli>(timeDiff);
261 rpc::logDuration(request, context->tag(), us);
262
263 boost::json::object response;
264
265 if (!result.response.has_value()) {
266 // note: error statuses are counted/notified in buildResponse itself
267 response =
268 impl::ErrorHelper(rawRequest, request).composeError(result.response.error());
269 auto const responseStr = boost::json::serialize(response);
270
271 LOG(perfLog_.debug()) << context->tag() << "Encountered error: " << responseStr;
272 LOG(log_.debug()) << context->tag() << "Encountered error: " << responseStr;
273 } else {
274 auto& json = result.response.value();
275 auto const isForwarded = json.contains("forwarded") &&
276 json.at("forwarded").is_bool() && json.at("forwarded").as_bool();
277
278 // This can still technically be an error. Clio counts forwarded requests
279 // as successful.
280 rpcEngine_->notifyComplete(*context, us, isForwarded);
281
282 if (isForwarded)
283 json.erase("forwarded");
284
285 // if the result is forwarded - just use it as is
286 // if forwarded request has error, for http, error should be in "result"; for ws,
287 // error should be at top
288 if (isForwarded &&
289 (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) {
290 for (auto const& [k, v] : json)
291 response.insert_or_assign(k, v);
292 } else {
293 response[JS(result)] = json;
294 }
295
296 if (isForwarded)
297 response["forwarded"] = true;
298
299 // for ws there is an additional field "status" in the response,
300 // otherwise the "status" is in the "result" field
301 if (connectionMetadata.wasUpgraded()) {
302 auto const appendFieldIfExist = [&](auto const& field) {
303 if (request.contains(field) and not request.at(field).is_null())
304 response[field] = request.at(field);
305 };
306
307 appendFieldIfExist(JS(id));
308 appendFieldIfExist(JS(api_version));
309
310 if (!response.contains(JS(error)))
311 response[JS(status)] = JS(success);
312
313 response[JS(type)] = JS(response);
314 } else {
315 if (response.contains(JS(result)) &&
316 !response[JS(result)].as_object().contains(JS(error)))
317 response[JS(result)].as_object()[JS(status)] = JS(success);
318 }
319 }
320
321 boost::json::array warnings = std::move(result.warnings);
322 warnings.emplace_back(rpc::makeWarning(rpc::WarningCode::WarnRpcClio));
323
324 if (etl_->lastCloseAgeSeconds() >= 60)
325 warnings.emplace_back(rpc::makeWarning(rpc::WarningCode::WarnRpcOutdated));
326
327 response["warnings"] = warnings;
328 return Response{boost::beast::http::status::ok, response, rawRequest};
329 } catch (std::exception const& ex) {
330 // note: while we are catching this in buildResponse too, this is here to make sure
331 // that any other code that may throw is outside of buildResponse is also worked around.
332 LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
333 LOG(log_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
334
335 rpcEngine_->notifyInternalError();
336 return impl::ErrorHelper(rawRequest, std::move(request)).makeInternalError();
337 }
338 }
339
340 static Response
341 makeSlowDownResponse(Request const& request, std::optional<boost::json::value> requestJson)
342 {
343 auto error = rpc::makeError(rpc::RippledError::rpcSLOW_DOWN);
344
345 if (not request.isHttp()) {
346 try {
347 if (not requestJson.has_value()) {
348 requestJson = boost::json::parse(request.message());
349 }
350 if (requestJson->is_object() && requestJson->as_object().contains("id"))
351 error["id"] = requestJson->as_object().at("id");
352 error["request"] = request.message();
353 } catch (std::exception const&) {
354 error["request"] = request.message();
355 }
356 }
357 return web::ng::Response{boost::beast::http::status::service_unavailable, error, request};
358 }
359
360 static boost::json::object
361 makeLoadWarning(Response const& response)
362 {
363 auto jsonResponse = boost::json::parse(response.message()).as_object();
364 jsonResponse["warning"] = "load";
365 if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) {
366 jsonResponse["warnings"].as_array().push_back(
367 rpc::makeWarning(rpc::WarningCode::WarnRpcRateLimit)
368 );
369 } else {
370 jsonResponse["warnings"] =
371 boost::json::array{rpc::makeWarning(rpc::WarningCode::WarnRpcRateLimit)};
372 }
373 return jsonResponse;
374 }
375
376 [[nodiscard]] bool
377 shouldReplaceParams(boost::json::object const& req) const
378 {
379 auto const hasParams = req.contains(JS(params));
380 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
381 auto const paramsIsEmptyString =
382 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
383 auto const paramsIsEmptyObject =
384 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
385 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
386 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
387 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
388 auto const firstArgIsNull =
389 arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
390 auto const firstArgIsEmptyString = arrayIsNotEmpty and
391 req.at(JS(params)).as_array().at(0).is_string() and
392 req.at(JS(params)).as_array().at(0).as_string().empty();
393
394 // Note: all this compatibility dance is to match `rippled` as close as possible
395 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or
396 arrayIsEmpty or firstArgIsEmptyString or firstArgIsNull;
397 }
398};
399
400} // namespace web::ng
Definition APIVersionParser.hpp:15
CoroutineGroup is a helper class to manage a group of coroutines. It allows to spawn multiple corouti...
Definition CoroutineGroup.hpp:18
std::optional< std::function< void()> > registerForeign(boost::asio::yield_context yield)
Register a foreign coroutine this group should wait for.
Definition CoroutineGroup.cpp:48
void asyncWait(boost::asio::yield_context yield)
Wait for all the coroutines in the group to finish.
Definition CoroutineGroup.cpp:60
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:78
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:502
A factory for TagDecorator instantiation.
Definition Taggable.hpp:165
TagDecoratorFactory with(ParentType parent) const noexcept
Creates a new tag decorator factory with a bound parent tag decorator.
Definition Taggable.cpp:47
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:264
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
The interface of a denial of service guard.
Definition DOSGuardInterface.hpp:27
An interface for a connection metadata class.
Definition Connection.hpp:25
std::string const & ip() const
Get the ip of the client.
Definition Connection.cpp:21
virtual bool wasUpgraded() const =0
Whether the connection was upgraded. Upgraded connections are websocket connections.
bool isAdmin() const
Get whether the client is an admin.
Definition Connection.cpp:27
RPCServerHandler(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface const > const &backend, std::shared_ptr< RPCEngineType > const &rpcEngine, std::shared_ptr< etl::ETLServiceInterface const > const &etl, dosguard::DOSGuardInterface &dosguard)
Create a new server handler.
Definition RPCServerHandler.hpp:72
Response operator()(Request const &request, ConnectionMetadata const &connectionMetadata, SubscriptionContextPtr subscriptionContext, boost::asio::yield_context yield)
The callback when server receives a request.
Definition RPCServerHandler.hpp:98
Represents an HTTP or WebSocket request.
Definition Request.hpp:18
std::string_view message() const
Get the body (in case of an HTTP request) or the message (in case of a WebSocket request).
Definition Request.cpp:76
Represents an HTTP or Websocket response.
Definition Response.hpp:21
A helper that attempts to match rippled reporting mode HTTP errors as close as possible.
Definition ErrorHandling.hpp:22
Response makeTooBusyError() const
Make a response for when the server is too busy.
Definition ErrorHandling.cpp:126
Response makeJsonParsingError() const
Make a response when json parsing fails.
Definition ErrorHandling.cpp:144
Response makeNotReadyError() const
Make a response for when the server is not ready.
Definition ErrorHandling.cpp:120
Response makeInternalError() const
Make an internal error response.
Definition ErrorHandling.cpp:110
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:28
void logDuration(boost::json::object const &request, util::BaseTagDecorator const &tag, DurationType const &dur)
Log the duration of the request processing.
Definition RPCHelpers.hpp:756
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:160
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:84
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:63
boost::json::object removeSecret(boost::json::object const &object)
Removes any detected secret information from a response JSON object.
Definition JsonUtils.hpp:55
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:21
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:64