Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RPCServerHandler.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "etlng/ETLServiceInterface.hpp"
24#include "rpc/Errors.hpp"
25#include "rpc/Factories.hpp"
26#include "rpc/JS.hpp"
27#include "rpc/RPCHelpers.hpp"
28#include "rpc/common/impl/APIVersionParser.hpp"
29#include "util/Assert.hpp"
30#include "util/CoroutineGroup.hpp"
31#include "util/JsonUtils.hpp"
32#include "util/Profiler.hpp"
33#include "util/Taggable.hpp"
34#include "util/log/Logger.hpp"
35#include "web/SubscriptionContextInterface.hpp"
36#include "web/ng/Connection.hpp"
37#include "web/ng/Request.hpp"
38#include "web/ng/Response.hpp"
39#include "web/ng/impl/ErrorHandling.hpp"
40
41#include <boost/asio/spawn.hpp>
42#include <boost/asio/steady_timer.hpp>
43#include <boost/beast/core/error.hpp>
44#include <boost/beast/http/status.hpp>
45#include <boost/json/array.hpp>
46#include <boost/json/object.hpp>
47#include <boost/json/parse.hpp>
48#include <boost/json/serialize.hpp>
49#include <boost/system/system_error.hpp>
50#include <xrpl/protocol/jss.h>
51
52#include <chrono>
53#include <exception>
54#include <functional>
55#include <memory>
56#include <optional>
57#include <ratio>
58#include <string>
59#include <utility>
60
61namespace web::ng {
62
68template <typename RPCEngineType>
70 std::shared_ptr<BackendInterface const> const backend_;
71 std::shared_ptr<RPCEngineType> const rpcEngine_;
72 std::shared_ptr<etlng::ETLServiceInterface const> const etl_;
73 util::TagDecoratorFactory const tagFactory_;
74 rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed
75
76 util::Logger log_{"RPC"};
77 util::Logger perfLog_{"Performance"};
78
79public:
90 std::shared_ptr<BackendInterface const> const& backend,
91 std::shared_ptr<RPCEngineType> const& rpcEngine,
92 std::shared_ptr<etlng::ETLServiceInterface const> const& etl
93 )
94 : backend_(backend)
95 , rpcEngine_(rpcEngine)
96 , etl_(etl)
97 , tagFactory_(config)
98 , apiVersionParser_(config.getObject("api_version"))
99 {
100 }
101
111 [[nodiscard]] Response
113 Request const& request,
114 ConnectionMetadata const& connectionMetadata,
115 SubscriptionContextPtr subscriptionContext,
116 boost::asio::yield_context yield
117 )
118 {
119 std::optional<Response> response;
120 util::CoroutineGroup coroutineGroup{yield, 1};
121 auto const onTaskComplete = coroutineGroup.registerForeign(yield);
122 ASSERT(onTaskComplete.has_value(), "Coroutine group can't be full");
123
124 bool const postSuccessful = rpcEngine_->post(
125 [this,
126 &request,
127 &response,
128 &onTaskComplete = onTaskComplete.value(),
129 &connectionMetadata,
130 subscriptionContext = std::move(subscriptionContext)](boost::asio::yield_context innerYield) mutable {
131 try {
132 boost::system::error_code ec;
133 auto parsedRequest = boost::json::parse(request.message(), ec);
134 if (ec.failed() or not parsedRequest.is_object()) {
135 rpcEngine_->notifyBadSyntax();
136 response = impl::ErrorHelper{request}.makeJsonParsingError();
137 if (ec.failed()) {
138 LOG(log_.warn())
139 << "Error parsing JSON: " << ec.message() << ". For request: " << request.message();
140 } else {
141 LOG(log_.warn()) << "Received not a JSON object. For request: " << request.message();
142 }
143 } else {
144 auto parsedObject = std::move(parsedRequest).as_object();
145 LOG(perfLog_.debug()) << connectionMetadata.tag() << "Adding to work queue";
146
147 if (not connectionMetadata.wasUpgraded() and shouldReplaceParams(parsedObject))
148 parsedObject[JS(params)] = boost::json::array({boost::json::object{}});
149
150 response = handleRequest(
151 innerYield,
152 request,
153 std::move(parsedObject),
154 connectionMetadata,
155 std::move(subscriptionContext)
156 );
157 }
158 } catch (std::exception const& ex) {
159 LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
160 rpcEngine_->notifyInternalError();
161 response = impl::ErrorHelper{request}.makeInternalError();
162 }
163
164 // notify the coroutine group that the foreign task is done
165 onTaskComplete();
166 },
167 connectionMetadata.ip()
168 );
169
170 if (not postSuccessful) {
171 // onTaskComplete must be called to notify coroutineGroup that the foreign task is done
172 onTaskComplete->operator()();
173 rpcEngine_->notifyTooBusy();
174 return impl::ErrorHelper{request}.makeTooBusyError();
175 }
176
177 // Put the coroutine to sleep until the foreign task is done
178 coroutineGroup.asyncWait(yield);
179 ASSERT(response.has_value(), "Woke up coroutine without setting response");
180 return std::move(response).value();
181 }
182
183private:
184 Response
185 handleRequest(
186 boost::asio::yield_context yield,
187 Request const& rawRequest,
188 boost::json::object&& request,
189 ConnectionMetadata const& connectionMetadata,
190 SubscriptionContextPtr subscriptionContext
191 )
192 {
193 LOG(log_.info()) << connectionMetadata.tag() << (connectionMetadata.wasUpgraded() ? "ws" : "http")
194 << " received request from work queue: " << util::removeSecret(request)
195 << " ip = " << connectionMetadata.ip();
196
197 try {
198 auto const range = backend_->fetchLedgerRange();
199 if (!range) {
200 // for error that happened before the handler, we don't attach any warnings
201 rpcEngine_->notifyNotReady();
202 return impl::ErrorHelper{rawRequest, std::move(request)}.makeNotReadyError();
203 }
204
205 auto const context = [&] {
206 if (connectionMetadata.wasUpgraded()) {
207 ASSERT(subscriptionContext != nullptr, "Subscription context must exist for a WS connecton");
208 return rpc::makeWsContext(
209 yield,
210 request,
211 std::move(subscriptionContext),
212 tagFactory_.with(connectionMetadata.tag()),
213 *range,
214 connectionMetadata.ip(),
215 std::cref(apiVersionParser_),
216 connectionMetadata.isAdmin()
217 );
218 }
220 yield,
221 request,
222 tagFactory_.with(connectionMetadata.tag()),
223 *range,
224 connectionMetadata.ip(),
225 std::cref(apiVersionParser_),
226 connectionMetadata.isAdmin()
227 );
228 }();
229
230 if (!context) {
231 auto const err = context.error();
232 LOG(perfLog_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err;
233 LOG(log_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err;
234
235 // we count all those as BadSyntax - as the WS path would.
236 // Although over HTTP these will yield a 400 status with a plain text response (for most).
237 rpcEngine_->notifyBadSyntax();
238 return impl::ErrorHelper(rawRequest, std::move(request)).makeError(err);
239 }
240
241 auto [result, timeDiff] = util::timed([&]() { return rpcEngine_->buildResponse(*context); });
242
243 auto us = std::chrono::duration<int, std::milli>(timeDiff);
244 rpc::logDuration(request, context->tag(), us);
245
246 boost::json::object response;
247
248 if (auto const status = std::get_if<rpc::Status>(&result.response)) {
249 // note: error statuses are counted/notified in buildResponse itself
250 response = impl::ErrorHelper(rawRequest, request).composeError(*status);
251 auto const responseStr = boost::json::serialize(response);
252
253 LOG(perfLog_.debug()) << context->tag() << "Encountered error: " << responseStr;
254 LOG(log_.debug()) << context->tag() << "Encountered error: " << responseStr;
255 } else {
256 // This can still technically be an error. Clio counts forwarded requests as successful.
257 rpcEngine_->notifyComplete(context->method, us);
258
259 auto& json = std::get<boost::json::object>(result.response);
260 auto const isForwarded =
261 json.contains("forwarded") && json.at("forwarded").is_bool() && json.at("forwarded").as_bool();
262
263 if (isForwarded)
264 json.erase("forwarded");
265
266 // if the result is forwarded - just use it as is
267 // if forwarded request has error, for http, error should be in "result"; for ws, error should
268 // be at top
269 if (isForwarded && (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) {
270 for (auto const& [k, v] : json)
271 response.insert_or_assign(k, v);
272 } else {
273 response[JS(result)] = json;
274 }
275
276 if (isForwarded)
277 response["forwarded"] = true;
278
279 // for ws there is an additional field "status" in the response,
280 // otherwise the "status" is in the "result" field
281 if (connectionMetadata.wasUpgraded()) {
282 auto const appendFieldIfExist = [&](auto const& field) {
283 if (request.contains(field) and not request.at(field).is_null())
284 response[field] = request.at(field);
285 };
286
287 appendFieldIfExist(JS(id));
288 appendFieldIfExist(JS(api_version));
289
290 if (!response.contains(JS(error)))
291 response[JS(status)] = JS(success);
292
293 response[JS(type)] = JS(response);
294 } else {
295 if (response.contains(JS(result)) && !response[JS(result)].as_object().contains(JS(error)))
296 response[JS(result)].as_object()[JS(status)] = JS(success);
297 }
298 }
299
300 boost::json::array warnings = std::move(result.warnings);
301 warnings.emplace_back(rpc::makeWarning(rpc::WarnRpcClio));
302
303 if (etl_->lastCloseAgeSeconds() >= 60)
304 warnings.emplace_back(rpc::makeWarning(rpc::WarnRpcOutdated));
305
306 response["warnings"] = warnings;
307 return Response{boost::beast::http::status::ok, response, rawRequest};
308 } catch (std::exception const& ex) {
309 // note: while we are catching this in buildResponse too, this is here to make sure
310 // that any other code that may throw is outside of buildResponse is also worked around.
311 LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
312 LOG(log_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
313
314 rpcEngine_->notifyInternalError();
315 return impl::ErrorHelper(rawRequest, std::move(request)).makeInternalError();
316 }
317 }
318
319 bool
320 shouldReplaceParams(boost::json::object const& req) const
321 {
322 auto const hasParams = req.contains(JS(params));
323 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
324 auto const paramsIsEmptyString =
325 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
326 auto const paramsIsEmptyObject =
327 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
328 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
329 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
330 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
331 auto const firstArgIsNull = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
332 auto const firstArgIsEmptyString = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_string() and
333 req.at(JS(params)).as_array().at(0).as_string().empty();
334
335 // Note: all this compatibility dance is to match `rippled` as close as possible
336 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or arrayIsEmpty or
337 firstArgIsEmptyString or firstArgIsNull;
338 }
339};
340
341} // namespace web::ng
Definition APIVersionParser.hpp:34
CoroutineGroup is a helper class to manage a group of coroutines. It allows to spawn multiple corouti...
Definition CoroutineGroup.hpp:37
std::optional< std::function< void()> > registerForeign(boost::asio::yield_context yield)
Register a foreign coroutine this group should wait for.
Definition CoroutineGroup.cpp:59
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
A factory for TagDecorator instantiation.
Definition Taggable.hpp:169
TagDecoratorFactory with(ParentType parent) const noexcept
Creates a new tag decorator factory with a bound parent tag decorator.
Definition Taggable.cpp:66
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:267
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
An interface for a connection metadata class.
Definition Connection.hpp:43
The server handler for RPC requests called by web server.
Definition RPCServerHandler.hpp:69
RPCServerHandler(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface const > const &backend, std::shared_ptr< RPCEngineType > const &rpcEngine, std::shared_ptr< etlng::ETLServiceInterface const > const &etl)
Create a new server handler.
Definition RPCServerHandler.hpp:88
Response operator()(Request const &request, ConnectionMetadata const &connectionMetadata, SubscriptionContextPtr subscriptionContext, boost::asio::yield_context yield)
The callback when server receives a request.
Definition RPCServerHandler.hpp:112
Represents an HTTP or WebSocket request.
Definition Request.hpp:37
Represents an HTTP or Websocket response.
Definition Response.hpp:40
A helper that attempts to match rippled reporting mode HTTP errors as close as possible.
Definition ErrorHandling.hpp:41
Response makeTooBusyError() const
Make a response for when the server is too busy.
Definition ErrorHandling.cpp:133
Response makeInternalError() const
Make an internal error response.
Definition ErrorHandling.cpp:121
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:37
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47
void logDuration(boost::json::object const &request, util::BaseTagDecorator const &tag, DurationType const &dur)
Log the duration of the request processing.
Definition RPCHelpers.hpp:754
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:77
boost::json::object removeSecret(boost::json::object const &object)
Removes any detected secret information from a response JSON object.
Definition JsonUtils.hpp:67
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86