Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RPCServerHandler.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
23#include "rpc/Errors.hpp"
24#include "rpc/Factories.hpp"
25#include "rpc/JS.hpp"
26#include "rpc/RPCHelpers.hpp"
27#include "rpc/common/impl/APIVersionParser.hpp"
28#include "util/Assert.hpp"
29#include "util/CoroutineGroup.hpp"
30#include "util/JsonUtils.hpp"
31#include "util/Profiler.hpp"
32#include "util/Taggable.hpp"
33#include "util/log/Logger.hpp"
34#include "web/SubscriptionContextInterface.hpp"
35#include "web/ng/Connection.hpp"
36#include "web/ng/Request.hpp"
37#include "web/ng/Response.hpp"
38#include "web/ng/impl/ErrorHandling.hpp"
39
40#include <boost/asio/spawn.hpp>
41#include <boost/asio/steady_timer.hpp>
42#include <boost/beast/core/error.hpp>
43#include <boost/beast/http/status.hpp>
44#include <boost/json/array.hpp>
45#include <boost/json/object.hpp>
46#include <boost/json/parse.hpp>
47#include <boost/json/serialize.hpp>
48#include <boost/system/system_error.hpp>
49#include <xrpl/protocol/jss.h>
50
51#include <chrono>
52#include <exception>
53#include <functional>
54#include <memory>
55#include <optional>
56#include <ratio>
57#include <stdexcept>
58#include <string>
59#include <utility>
60
61namespace web::ng {
62
68template <typename RPCEngineType, typename ETLType>
70 std::shared_ptr<BackendInterface const> const backend_;
71 std::shared_ptr<RPCEngineType> const rpcEngine_;
72 std::shared_ptr<ETLType const> const etl_;
73 util::TagDecoratorFactory const tagFactory_;
74 rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed
75
76 util::Logger log_{"RPC"};
77 util::Logger perfLog_{"Performance"};
78
79public:
90 std::shared_ptr<BackendInterface const> const& backend,
91 std::shared_ptr<RPCEngineType> const& rpcEngine,
92 std::shared_ptr<ETLType const> const& etl
93 )
94 : backend_(backend)
95 , rpcEngine_(rpcEngine)
96 , etl_(etl)
97 , tagFactory_(config)
98 , apiVersionParser_(config.getObject("api_version"))
99 {
100 }
101
111 [[nodiscard]] Response
113 Request const& request,
114 ConnectionMetadata const& connectionMetadata,
115 SubscriptionContextPtr subscriptionContext,
116 boost::asio::yield_context yield
117 )
118 {
119 std::optional<Response> response;
120 util::CoroutineGroup coroutineGroup{yield, 1};
121 auto const onTaskComplete = coroutineGroup.registerForeign();
122 ASSERT(onTaskComplete.has_value(), "Coroutine group can't be full");
123
124 bool const postSuccessful = rpcEngine_->post(
125 [this,
126 &request,
127 &response,
128 &onTaskComplete = onTaskComplete.value(),
129 &connectionMetadata,
130 subscriptionContext = std::move(subscriptionContext)](boost::asio::yield_context yield) mutable {
131 try {
132 auto parsedRequest = boost::json::parse(request.message()).as_object();
133 LOG(perfLog_.debug()) << connectionMetadata.tag() << "Adding to work queue";
134
135 if (not connectionMetadata.wasUpgraded() and shouldReplaceParams(parsedRequest))
136 parsedRequest[JS(params)] = boost::json::array({boost::json::object{}});
137
138 response = handleRequest(
139 yield, request, std::move(parsedRequest), connectionMetadata, std::move(subscriptionContext)
140 );
141 } catch (boost::system::system_error const& ex) {
142 // system_error thrown when json parsing failed
143 rpcEngine_->notifyBadSyntax();
144 response = impl::ErrorHelper{request}.makeJsonParsingError();
145 LOG(log_.warn()) << "Error parsing JSON: " << ex.what() << ". For request: " << request.message();
146 } catch (std::invalid_argument const& ex) {
147 // thrown when json parses something that is not an object at top level
148 rpcEngine_->notifyBadSyntax();
149 LOG(log_.warn()) << "Invalid argument error: " << ex.what()
150 << ". For request: " << request.message();
151 response = impl::ErrorHelper{request}.makeJsonParsingError();
152 } catch (std::exception const& ex) {
153 LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
154 rpcEngine_->notifyInternalError();
155 response = impl::ErrorHelper{request}.makeInternalError();
156 }
157
158 // notify the coroutine group that the foreign task is done
159 onTaskComplete();
160 },
161 connectionMetadata.ip()
162 );
163
164 if (not postSuccessful) {
165 // onTaskComplete must be called to notify coroutineGroup that the foreign task is done
166 onTaskComplete->operator()();
167 rpcEngine_->notifyTooBusy();
168 return impl::ErrorHelper{request}.makeTooBusyError();
169 }
170
171 // Put the coroutine to sleep until the foreign task is done
172 coroutineGroup.asyncWait(yield);
173 ASSERT(response.has_value(), "Woke up coroutine without setting response");
174 return std::move(response).value();
175 }
176
177private:
178 Response
179 handleRequest(
180 boost::asio::yield_context yield,
181 Request const& rawRequest,
182 boost::json::object&& request,
183 ConnectionMetadata const& connectionMetadata,
184 SubscriptionContextPtr subscriptionContext
185 )
186 {
187 LOG(log_.info()) << connectionMetadata.tag() << (connectionMetadata.wasUpgraded() ? "ws" : "http")
188 << " received request from work queue: " << util::removeSecret(request)
189 << " ip = " << connectionMetadata.ip();
190
191 try {
192 auto const range = backend_->fetchLedgerRange();
193 if (!range) {
194 // for error that happened before the handler, we don't attach any warnings
195 rpcEngine_->notifyNotReady();
196 return impl::ErrorHelper{rawRequest, std::move(request)}.makeNotReadyError();
197 }
198
199 auto const context = [&] {
200 if (connectionMetadata.wasUpgraded()) {
201 ASSERT(subscriptionContext != nullptr, "Subscription context must exist for a WS connecton");
202 return rpc::makeWsContext(
203 yield,
204 request,
205 std::move(subscriptionContext),
206 tagFactory_.with(connectionMetadata.tag()),
207 *range,
208 connectionMetadata.ip(),
209 std::cref(apiVersionParser_),
210 connectionMetadata.isAdmin()
211 );
212 }
214 yield,
215 request,
216 tagFactory_.with(connectionMetadata.tag()),
217 *range,
218 connectionMetadata.ip(),
219 std::cref(apiVersionParser_),
220 connectionMetadata.isAdmin()
221 );
222 }();
223
224 if (!context) {
225 auto const err = context.error();
226 LOG(perfLog_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err;
227 LOG(log_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err;
228
229 // we count all those as BadSyntax - as the WS path would.
230 // Although over HTTP these will yield a 400 status with a plain text response (for most).
231 rpcEngine_->notifyBadSyntax();
232 return impl::ErrorHelper(rawRequest, std::move(request)).makeError(err);
233 }
234
235 auto [result, timeDiff] = util::timed([&]() { return rpcEngine_->buildResponse(*context); });
236
237 auto us = std::chrono::duration<int, std::milli>(timeDiff);
238 rpc::logDuration(*context, us);
239
240 boost::json::object response;
241
242 if (auto const status = std::get_if<rpc::Status>(&result.response)) {
243 // note: error statuses are counted/notified in buildResponse itself
244 response = impl::ErrorHelper(rawRequest, request).composeError(*status);
245 auto const responseStr = boost::json::serialize(response);
246
247 LOG(perfLog_.debug()) << context->tag() << "Encountered error: " << responseStr;
248 LOG(log_.debug()) << context->tag() << "Encountered error: " << responseStr;
249 } else {
250 // This can still technically be an error. Clio counts forwarded requests as successful.
251 rpcEngine_->notifyComplete(context->method, us);
252
253 auto& json = std::get<boost::json::object>(result.response);
254 auto const isForwarded =
255 json.contains("forwarded") && json.at("forwarded").is_bool() && json.at("forwarded").as_bool();
256
257 if (isForwarded)
258 json.erase("forwarded");
259
260 // if the result is forwarded - just use it as is
261 // if forwarded request has error, for http, error should be in "result"; for ws, error should
262 // be at top
263 if (isForwarded && (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) {
264 for (auto const& [k, v] : json)
265 response.insert_or_assign(k, v);
266 } else {
267 response[JS(result)] = json;
268 }
269
270 if (isForwarded)
271 response["forwarded"] = true;
272
273 // for ws there is an additional field "status" in the response,
274 // otherwise the "status" is in the "result" field
275 if (connectionMetadata.wasUpgraded()) {
276 auto const appendFieldIfExist = [&](auto const& field) {
277 if (request.contains(field) and not request.at(field).is_null())
278 response[field] = request.at(field);
279 };
280
281 appendFieldIfExist(JS(id));
282 appendFieldIfExist(JS(api_version));
283
284 if (!response.contains(JS(error)))
285 response[JS(status)] = JS(success);
286
287 response[JS(type)] = JS(response);
288 } else {
289 if (response.contains(JS(result)) && !response[JS(result)].as_object().contains(JS(error)))
290 response[JS(result)].as_object()[JS(status)] = JS(success);
291 }
292 }
293
294 boost::json::array warnings = std::move(result.warnings);
295 warnings.emplace_back(rpc::makeWarning(rpc::WarnRpcClio));
296
297 if (etl_->lastCloseAgeSeconds() >= 60)
298 warnings.emplace_back(rpc::makeWarning(rpc::WarnRpcOutdated));
299
300 response["warnings"] = warnings;
301 return Response{boost::beast::http::status::ok, response, rawRequest};
302 } catch (std::exception const& ex) {
303 // note: while we are catching this in buildResponse too, this is here to make sure
304 // that any other code that may throw is outside of buildResponse is also worked around.
305 LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
306 LOG(log_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what();
307
308 rpcEngine_->notifyInternalError();
309 return impl::ErrorHelper(rawRequest, std::move(request)).makeInternalError();
310 }
311 }
312
313 bool
314 shouldReplaceParams(boost::json::object const& req) const
315 {
316 auto const hasParams = req.contains(JS(params));
317 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
318 auto const paramsIsEmptyString =
319 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
320 auto const paramsIsEmptyObject =
321 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
322 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
323 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
324 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
325 auto const firstArgIsNull = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
326 auto const firstArgIsEmptyString = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_string() and
327 req.at(JS(params)).as_array().at(0).as_string().empty();
328
329 // Note: all this compatibility dance is to match `rippled` as close as possible
330 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or arrayIsEmpty or
331 firstArgIsEmptyString or firstArgIsNull;
332 }
333};
334
335} // namespace web::ng
Definition APIVersionParser.hpp:34
CoroutineGroup is a helper class to manage a group of coroutines. It allows to spawn multiple corouti...
Definition CoroutineGroup.hpp:37
std::optional< std::function< void()> > registerForeign()
Register a foreign coroutine this group should wait for.
Definition CoroutineGroup.cpp:59
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:110
Pump warn(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::WRN severity.
Definition Logger.cpp:210
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:215
A factory for TagDecorator instantiation.
Definition Taggable.hpp:169
TagDecoratorFactory with(ParentType parent) const noexcept
Creates a new tag decorator factory with a bound parent tag decorator.
Definition Taggable.cpp:66
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:267
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
An interface for a connection metadata class.
Definition Connection.hpp:43
The server handler for RPC requests called by web server.
Definition RPCServerHandler.hpp:69
Response operator()(Request const &request, ConnectionMetadata const &connectionMetadata, SubscriptionContextPtr subscriptionContext, boost::asio::yield_context yield)
The callback when server receives a request.
Definition RPCServerHandler.hpp:112
RPCServerHandler(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface const > const &backend, std::shared_ptr< RPCEngineType > const &rpcEngine, std::shared_ptr< ETLType const > const &etl)
Create a new server handler.
Definition RPCServerHandler.hpp:88
Represents an HTTP or WebSocket request.
Definition Request.hpp:37
std::string_view message() const
Get the body (in case of an HTTP request) or the message (in case of a WebSocket request).
Definition Request.cpp:93
Represents an HTTP or Websocket response.
Definition Response.hpp:40
A helper that attempts to match rippled reporting mode HTTP errors as close as possible.
Definition ErrorHandling.hpp:41
Response makeTooBusyError() const
Make a response for when the server is too busy.
Definition ErrorHandling.cpp:133
Response makeJsonParsingError() const
Make a response when json parsing fails.
Definition ErrorHandling.cpp:143
Response makeInternalError() const
Make an internal error response.
Definition ErrorHandling.cpp:121
This namespace contains everything to do with the ETL and ETL sources.
Definition CacheLoader.hpp:36
void logDuration(web::Context const &ctx, T const &dur)
Log the duration of the request processing.
Definition RPCHelpers.hpp:658
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47
boost::json::object makeWarning(WarningCode code)
Generate JSON from a rpc::WarningCode.
Definition Errors.cpp:65
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:77
boost::json::object removeSecret(boost::json::object const &object)
Removes any detected secret information from a response JSON object.
Definition JsonUtils.hpp:67
auto timed(FnType &&func)
Profiler function to measure the time a function execution consumes.
Definition Profiler.hpp:40
std::shared_ptr< SubscriptionContextInterface > SubscriptionContextPtr
An alias for shared pointer to a SubscriptionContextInterface.
Definition SubscriptionContextInterface.hpp:86