Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RPCServerHandler.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etlng/ETLServiceInterface.hpp"
24#include "rpc/Errors.hpp"
25#include "rpc/Factories.hpp"
26#include "rpc/JS.hpp"
27#include "rpc/RPCHelpers.hpp"
28#include "rpc/common/impl/APIVersionParser.hpp"
29#include "util/Assert.hpp"
30#include "util/CoroutineGroup.hpp"
31#include "util/JsonUtils.hpp"
32#include "util/Profiler.hpp"
33#include "util/Taggable.hpp"
34#include "util/log/Logger.hpp"
35#include "web/SubscriptionContextInterface.hpp"
36#include "web/dosguard/DOSGuardInterface.hpp"
37#include "web/ng/Connection.hpp"
38#include "web/ng/Request.hpp"
39#include "web/ng/Response.hpp"
40#include "web/ng/impl/ErrorHandling.hpp"
41
42#include <boost/asio/spawn.hpp>
43#include <boost/asio/steady_timer.hpp>
44#include <boost/beast/core/error.hpp>
45#include <boost/beast/http/status.hpp>
46#include <boost/json/array.hpp>
47#include <boost/json/object.hpp>
48#include <boost/json/parse.hpp>
49#include <boost/json/serialize.hpp>
50#include <boost/system/system_error.hpp>
51#include <xrpl/protocol/jss.h>
52
53#include <chrono>
54#include <exception>
55#include <functional>
56#include <memory>
57#include <optional>
58#include <ratio>
59#include <string>
60#include <utility>
61
62namespace web::ng {
63
69template <typename RPCEngineType>
71 std::shared_ptr<BackendInterface const> const backend_;
72 std::shared_ptr<RPCEngineType> const rpcEngine_;
73 std::shared_ptr<etlng::ETLServiceInterface const> const etl_;
74 std::reference_wrapper<dosguard::DOSGuardInterface> dosguard_;
75 util::TagDecoratorFactory const tagFactory_;
76 rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed
77
78 util::Logger log_{"RPC"};
79 util::Logger perfLog_{"Performance"};
80
81public:
93 std::shared_ptr<BackendInterface const> const& backend,
94 std::shared_ptr<RPCEngineType> const& rpcEngine,
95 std::shared_ptr<etlng::ETLServiceInterface const> const& etl,
97 )
98 : backend_(backend)
99 , rpcEngine_(rpcEngine)
100 , etl_(etl)
101 , dosguard_(dosguard)
102 , tagFactory_(config)
103 , apiVersionParser_(config.getObject("api_version"))
104 {
105 }
106
116 [[nodiscard]] Response
118 Request const& request,
119 ConnectionMetadata const& connectionMetadata,
120 SubscriptionContextPtr subscriptionContext,
121 boost::asio::yield_context yield
122 )
123 {
124 if (not dosguard_.get().isOk(connectionMetadata.ip())) {
125 return makeSlowDownResponse(request, std::nullopt);
126 }
127
128 std::optional<Response> response;
129 util::CoroutineGroup coroutineGroup{yield, 1};
130 auto const onTaskComplete = coroutineGroup.registerForeign(yield);
131 ASSERT(onTaskComplete.has_value(), "Coroutine group can't be full");
132
133 bool const postSuccessful = rpcEngine_->post(
134 [this,
135 &request,
136 &response,
137 &onTaskComplete = onTaskComplete.value(),
138 &connectionMetadata,
139 subscriptionContext = std::move(subscriptionContext)](boost::asio::yield_context innerYield) mutable {
140 try {
141 boost::system::error_code ec;
142 auto parsedRequest = boost::json::parse(request.message(), ec);
143 if (ec.failed() or not parsedRequest.is_object()) {
144 rpcEngine_->notifyBadSyntax();
145 response = impl::ErrorHelper{request}.makeJsonParsingError();
146 if (ec.failed()) {
147 LOG(log_.warn())
148 << "Error parsing JSON: " << ec.message() << ". For request: " << request.message();
149 } else {
150 LOG(log_.warn()) << "Received not a JSON object. For request: " << request.message();
151 }
152 } else {
153 auto parsedObject = std::move(parsedRequest).as_object();
154
155 if (not dosguard_.get().request(connectionMetadata.ip(), parsedObject)) {
156 response = makeSlowDownResponse(request, parsedObject);
157 } else {
158 LOG(perfLog_.debug()) << connectionMetadata.tag() << "Adding to work queue";
159
160 if (not connectionMetadata.wasUpgraded() and shouldReplaceParams(parsedObject))
161 parsedObject[JS(params)] = boost::json::array({boost::json::object{}});
162
163 response = handleRequest(
164 innerYield,
165 request,
166 std::move(parsedObject),
167 connectionMetadata,
168 std::move(subscriptionContext)
169 );
170 }
171 }
172 } catch (std::exception const& ex) {
173 LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
174 rpcEngine_->notifyInternalError();
175 response = impl::ErrorHelper{request}.makeInternalError();
176 }
177
178 // notify the coroutine group that the foreign task is done
179 onTaskComplete();
180 },
181 connectionMetadata.ip()
182 );
183
184 if (not postSuccessful) {
185 // onTaskComplete must be called to notify coroutineGroup that the foreign task is done
186 onTaskComplete->operator()();
187 rpcEngine_->notifyTooBusy();
188 return impl::ErrorHelper{request}.makeTooBusyError();
189 }
190
191 // Put the coroutine to sleep until the foreign task is done
192 coroutineGroup.asyncWait(yield);
193 ASSERT(response.has_value(), "Woke up coroutine without setting response");
194
195 if (not dosguard_.get().add(connectionMetadata.ip(), response->message().size())) {
196 response->setMessage(makeLoadWarning(*response));
197 }
198
199 return std::move(response).value();
200 }
201
202private:
203 Response
204 handleRequest(
205 boost::asio::yield_context yield,
206 Request const& rawRequest,
207 boost::json::object&& request,
208 ConnectionMetadata const& connectionMetadata,
209 SubscriptionContextPtr subscriptionContext
210 )
211 {
212 LOG(log_.info()) << connectionMetadata.tag() << (connectionMetadata.wasUpgraded() ? "ws" : "http")
213 << " received request from work queue: " << util::removeSecret(request)
214 << " ip = " << connectionMetadata.ip();
215
216 try {
217 auto const range = backend_->fetchLedgerRange();
218 if (!range) {
219 // for error that happened before the handler, we don't attach any warnings
220 rpcEngine_->notifyNotReady();
221 return impl::ErrorHelper{rawRequest, std::move(request)}.makeNotReadyError();
222 }
223
224 auto const context = [&] {
225 if (connectionMetadata.wasUpgraded()) {
226 ASSERT(subscriptionContext != nullptr, "Subscription context must exist for a WS connection");
227 return rpc::makeWsContext(
228 yield,
229 request,
230 std::move(subscriptionContext),
231 tagFactory_.with(connectionMetadata.tag()),
232 *range,
233 connectionMetadata.ip(),
234 std::cref(apiVersionParser_),
235 connectionMetadata.isAdmin()
236 );
237 }
239 yield,
240 request,
241 tagFactory_.with(connectionMetadata.tag()),
242 *range,
243 connectionMetadata.ip(),
244 std::cref(apiVersionParser_),
245 connectionMetadata.isAdmin()
246 );
247 }();
248
249 if (!context) {
250 auto const err = context.error();
251 LOG(perfLog_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err;
252 LOG(log_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err;
253
254 // we count all those as BadSyntax - as the WS path would.
255 // Although over HTTP these will yield a 400 status with a plain text response (for most).
256 rpcEngine_->notifyBadSyntax();
257 return impl::ErrorHelper(rawRequest, std::move(request)).makeError(err);
258 }
259
260 auto [result, timeDiff] = util::timed([&]() { return rpcEngine_->buildResponse(*context); });
261
262 auto us = std::chrono::duration<int, std::milli>(timeDiff);
263 rpc::logDuration(request, context->tag(), us);
264
265 boost::json::object response;
266
267 if (auto const status = std::get_if<rpc::Status>(&result.response)) {
268 // note: error statuses are counted/notified in buildResponse itself
269 response = impl::ErrorHelper(rawRequest, request).composeError(*status);
270 auto const responseStr = boost::json::serialize(response);
271
272 LOG(perfLog_.debug()) << context->tag() << "Encountered error: " << responseStr;
273 LOG(log_.debug()) << context->tag() << "Encountered error: " << responseStr;
274 } else {
275 // This can still technically be an error. Clio counts forwarded requests as successful.
276 rpcEngine_->notifyComplete(context->method, us);
277
278 auto& json = std::get<boost::json::object>(result.response);
279 auto const isForwarded =
280 json.contains("forwarded") && json.at("forwarded").is_bool() && json.at("forwarded").as_bool();
281
282 if (isForwarded)
283 json.erase("forwarded");
284
285 // if the result is forwarded - just use it as is
286 // if forwarded request has error, for http, error should be in "result"; for ws, error should
287 // be at top
288 if (isForwarded && (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) {
289 for (auto const& [k, v] : json)
290 response.insert_or_assign(k, v);
291 } else {
292 response[JS(result)] = json;
293 }
294
295 if (isForwarded)
296 response["forwarded"] = true;
297
298 // for ws there is an additional field "status" in the response,
299 // otherwise the "status" is in the "result" field
300 if (connectionMetadata.wasUpgraded()) {
301 auto const appendFieldIfExist = [&](auto const& field) {
302 if (request.contains(field) and not request.at(field).is_null())
303 response[field] = request.at(field);
304 };
305
306 appendFieldIfExist(JS(id));
307 appendFieldIfExist(JS(api_version));
308
309 if (!response.contains(JS(error)))
310 response[JS(status)] = JS(success);
311
312 response[JS(type)] = JS(response);
313 } else {
314 if (response.contains(JS(result)) && !response[JS(result)].as_object().contains(JS(error)))
315 response[JS(result)].as_object()[JS(status)] = JS(success);
316 }
317 }
318
319 boost::json::array warnings = std::move(result.warnings);
320 warnings.emplace_back(rpc::makeWarning(rpc::WarnRpcClio));
321
322 if (etl_->lastCloseAgeSeconds() >= 60)
323 warnings.emplace_back(rpc::makeWarning(rpc::WarnRpcOutdated));
324
325 response["warnings"] = warnings;
326 return Response{boost::beast::http::status::ok, response, rawRequest};
327 } catch (std::exception const& ex) {
328 // note: while we are catching this in buildResponse too, this is here to make sure
329 // that any other code that may throw is outside of buildResponse is also worked around.
330 LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
331 LOG(log_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
332
333 rpcEngine_->notifyInternalError();
334 return impl::ErrorHelper(rawRequest, std::move(request)).makeInternalError();
335 }
336 }
337
338 static Response
339 makeSlowDownResponse(Request const& request, std::optional<boost::json::value> requestJson)
340 {
341 auto error = rpc::makeError(rpc::RippledError::rpcSLOW_DOWN);
342
343 if (not request.isHttp()) {
344 try {
345 if (not requestJson.has_value()) {
346 requestJson = boost::json::parse(request.message());
347 }
348 if (requestJson->is_object() && requestJson->as_object().contains("id"))
349 error["id"] = requestJson->as_object().at("id");
350 error["request"] = request.message();
351 } catch (std::exception const&) {
352 error["request"] = request.message();
353 }
354 }
355 return web::ng::Response{boost::beast::http::status::service_unavailable, error, request};
356 }
357
358 static boost::json::object
359 makeLoadWarning(Response const& response)
360 {
361 auto jsonResponse = boost::json::parse(response.message()).as_object();
362 jsonResponse["warning"] = "load";
363 if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) {
364 jsonResponse["warnings"].as_array().push_back(rpc::makeWarning(rpc::WarnRpcRateLimit));
365 } else {
366 jsonResponse["warnings"] = boost::json::array{rpc::makeWarning(rpc::WarnRpcRateLimit)};
367 }
368 return jsonResponse;
369 }
370
371 bool
372 shouldReplaceParams(boost::json::object const& req) const
373 {
374 auto const hasParams = req.contains(JS(params));
375 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
376 auto const paramsIsEmptyString =
377 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
378 auto const paramsIsEmptyObject =
379 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
380 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
381 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
382 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
383 auto const firstArgIsNull = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
384 auto const firstArgIsEmptyString = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_string() and
385 req.at(JS(params)).as_array().at(0).as_string().empty();
386
387 // Note: all this compatibility dance is to match `rippled` as close as possible
388 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or arrayIsEmpty or
389 firstArgIsEmptyString or firstArgIsNull;
390 }
391};
392
393} // namespace web::ng
Definition APIVersionParser.hpp:34
CoroutineGroup is a helper class to manage a group of coroutines. It allows to spawn multiple corouti...
Definition CoroutineGroup.hpp:37
std::optional< std::function< void()> > registerForeign(boost::asio::yield_context yield)
Register a foreign coroutine this group should wait for.
Definition CoroutineGroup.cpp:59
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
A factory for TagDecorator instantiation.
Definition Taggable.hpp:182
TagDecoratorFactory with(ParentType parent) const noexcept
Creates a new tag decorator factory with a bound parent tag decorator.
Definition Taggable.cpp:66
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:280
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
The interface of a denial of service guard.
Definition DOSGuardInterface.hpp:46
An interface for a connection metadata class.
Definition Connection.hpp:43
std::string const & ip() const
Get the ip of the client.
Definition Connection.cpp:37
The server handler for RPC requests called by web server.
Definition RPCServerHandler.hpp:70
RPCServerHandler(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface const > const &backend, std::shared_ptr< RPCEngineType > const &rpcEngine, std::shared_ptr< etlng::ETLServiceInterface const > const &etl, dosguard::DOSGuardInterface &dosguard)
Create a new server handler.
Definition RPCServerHandler.hpp:91
Response operator()(Request const &request, ConnectionMetadata const &connectionMetadata, SubscriptionContextPtr subscriptionContext, boost::asio::yield_context yield)
The callback when server receives a request.
Definition RPCServerHandler.hpp:117
Represents an HTTP or WebSocket request.
Definition Request.hpp:37
Represents an HTTP or Websocket response.
Definition Response.hpp:40
A helper that attempts to match rippled reporting mode HTTP errors as close as possible.
Definition ErrorHandling.hpp:41
Response makeTooBusyError() const
Make a response for when the server is too busy.
Definition ErrorHandling.cpp:133
Response makeInternalError() const
Make an internal error response.
Definition ErrorHandling.cpp:121
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:39
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47
void logDuration(boost::json::object const &request, util::BaseTagDecorator const &tag, DurationType const &dur)
Log the duration of the request processing.
Definition RPCHelpers.hpp:754
boost::json::object makeError(RippledError err, std::optional< std::string_view > customError, std::optional< std::string_view > customMessage)
Generate JSON from a rpc::RippledError.
Definition Errors.cpp:120
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:77
boost::json::object removeSecret(boost::json::object const &object)
Removes any detected secret information from a response JSON object.
Definition JsonUtils.hpp:67
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86