71 std::shared_ptr<BackendInterface const>
const backend_;
72 std::shared_ptr<RPCEngineType>
const rpcEngine_;
73 std::shared_ptr<etlng::ETLServiceInterface const>
const etl_;
74 std::reference_wrapper<dosguard::DOSGuardInterface> dosguard_;
93 std::shared_ptr<BackendInterface const>
const& backend,
94 std::shared_ptr<RPCEngineType>
const& rpcEngine,
95 std::shared_ptr<etlng::ETLServiceInterface const>
const&
etl,
99 , rpcEngine_(rpcEngine)
101 , dosguard_(dosguard)
102 , tagFactory_(config)
103 , apiVersionParser_(config.getObject(
"api_version"))
121 boost::asio::yield_context yield
124 if (not dosguard_.get().isOk(connectionMetadata.
ip())) {
125 return makeSlowDownResponse(request, std::nullopt);
128 std::optional<Response> response;
131 ASSERT(onTaskComplete.has_value(),
"Coroutine group can't be full");
133 bool const postSuccessful = rpcEngine_->post(
137 &onTaskComplete = onTaskComplete.value(),
139 subscriptionContext = std::move(subscriptionContext)](boost::asio::yield_context innerYield)
mutable {
141 boost::system::error_code ec;
142 auto parsedRequest = boost::json::parse(request.message(), ec);
143 if (ec.failed() or not parsedRequest.is_object()) {
144 rpcEngine_->notifyBadSyntax();
145 response = impl::ErrorHelper{request}.makeJsonParsingError();
148 <<
"Error parsing JSON: " << ec.message() <<
". For request: " << request.message();
150 LOG(log_.warn()) <<
"Received not a JSON object. For request: " << request.message();
153 auto parsedObject = std::move(parsedRequest).as_object();
155 if (not dosguard_.get().request(connectionMetadata.ip(), parsedObject)) {
156 response = makeSlowDownResponse(request, parsedObject);
158 LOG(perfLog_.debug()) << connectionMetadata.tag() <<
"Adding to work queue";
160 if (not connectionMetadata.wasUpgraded() and shouldReplaceParams(parsedObject))
161 parsedObject[JS(params)] = boost::json::array({boost::json::object{}});
163 response = handleRequest(
166 std::move(parsedObject),
168 std::move(subscriptionContext)
172 } catch (std::exception
const& ex) {
173 LOG(perfLog_.
error()) << connectionMetadata.
tag() <<
"Caught exception: " << ex.what();
174 rpcEngine_->notifyInternalError();
181 connectionMetadata.ip()
184 if (not postSuccessful) {
186 onTaskComplete->operator()();
187 rpcEngine_->notifyTooBusy();
192 coroutineGroup.asyncWait(yield);
193 ASSERT(response.has_value(),
"Woke up coroutine without setting response");
195 if (not dosguard_.get().add(connectionMetadata.ip(), response->message().size())) {
196 response->setMessage(makeLoadWarning(*response));
199 return std::move(response).value();
205 boost::asio::yield_context yield,
206 Request
const& rawRequest,
207 boost::json::object&& request,
208 ConnectionMetadata
const& connectionMetadata,
212 LOG(log_.info()) << connectionMetadata.tag() << (connectionMetadata.wasUpgraded() ?
"ws" :
"http")
214 <<
" ip = " << connectionMetadata.ip();
217 auto const range = backend_->fetchLedgerRange();
220 rpcEngine_->notifyNotReady();
221 return impl::ErrorHelper{rawRequest, std::move(request)}.makeNotReadyError();
224 auto const context = [&] {
225 if (connectionMetadata.wasUpgraded()) {
226 ASSERT(subscriptionContext !=
nullptr,
"Subscription context must exist for a WS connection");
230 std::move(subscriptionContext),
231 tagFactory_.
with(connectionMetadata.tag()),
233 connectionMetadata.ip(),
234 std::cref(apiVersionParser_),
235 connectionMetadata.isAdmin()
241 tagFactory_.
with(connectionMetadata.tag()),
243 connectionMetadata.ip(),
244 std::cref(apiVersionParser_),
245 connectionMetadata.isAdmin()
250 auto const err = context.error();
251 LOG(perfLog_.warn()) << connectionMetadata.tag() <<
"Could not create Web context: " << err;
252 LOG(log_.warn()) << connectionMetadata.tag() <<
"Could not create Web context: " << err;
256 rpcEngine_->notifyBadSyntax();
257 return impl::ErrorHelper(rawRequest, std::move(request)).makeError(err);
260 auto [result, timeDiff] =
util::timed([&]() {
return rpcEngine_->buildResponse(*context); });
262 auto us = std::chrono::duration<int, std::milli>(timeDiff);
265 boost::json::object response;
267 if (
auto const status = std::get_if<rpc::Status>(&result.response)) {
269 response = impl::ErrorHelper(rawRequest, request).composeError(*status);
270 auto const responseStr = boost::json::serialize(response);
272 LOG(perfLog_.debug()) << context->tag() <<
"Encountered error: " << responseStr;
273 LOG(log_.debug()) << context->tag() <<
"Encountered error: " << responseStr;
276 rpcEngine_->notifyComplete(context->method, us);
278 auto& json = std::get<boost::json::object>(result.response);
279 auto const isForwarded =
280 json.contains(
"forwarded") && json.at(
"forwarded").is_bool() && json.at(
"forwarded").as_bool();
283 json.erase(
"forwarded");
288 if (isForwarded && (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) {
289 for (
auto const& [k, v] : json)
290 response.insert_or_assign(k, v);
292 response[JS(result)] = json;
296 response[
"forwarded"] =
true;
300 if (connectionMetadata.wasUpgraded()) {
301 auto const appendFieldIfExist = [&](
auto const& field) {
302 if (request.contains(field) and not request.at(field).is_null())
303 response[field] = request.at(field);
306 appendFieldIfExist(JS(
id));
307 appendFieldIfExist(JS(api_version));
309 if (!response.contains(JS(error)))
310 response[JS(status)] = JS(success);
312 response[JS(type)] = JS(response);
314 if (response.contains(JS(result)) && !response[JS(result)].as_object().contains(JS(error)))
315 response[JS(result)].as_object()[JS(status)] = JS(success);
319 boost::json::array warnings = std::move(result.warnings);
322 if (etl_->lastCloseAgeSeconds() >= 60)
325 response[
"warnings"] = warnings;
326 return Response{boost::beast::http::status::ok, response, rawRequest};
327 }
catch (std::exception
const& ex) {
330 LOG(perfLog_.error()) << connectionMetadata.tag() <<
"Caught exception: " << ex.what();
331 LOG(log_.error()) << connectionMetadata.tag() <<
"Caught exception: " << ex.what();
333 rpcEngine_->notifyInternalError();
334 return impl::ErrorHelper(rawRequest, std::move(request)).makeInternalError();
339 makeSlowDownResponse(Request
const& request, std::optional<boost::json::value> requestJson)
343 if (not request.isHttp()) {
345 if (not requestJson.has_value()) {
346 requestJson = boost::json::parse(request.message());
348 if (requestJson->is_object() && requestJson->as_object().contains(
"id"))
349 error[
"id"] = requestJson->as_object().at(
"id");
350 error[
"request"] = request.message();
351 }
catch (std::exception
const&) {
352 error[
"request"] = request.message();
355 return web::ng::Response{boost::beast::http::status::service_unavailable, error, request};
358 static boost::json::object
359 makeLoadWarning(Response
const& response)
361 auto jsonResponse = boost::json::parse(response.message()).as_object();
362 jsonResponse[
"warning"] =
"load";
363 if (jsonResponse.contains(
"warnings") && jsonResponse[
"warnings"].is_array()) {
364 jsonResponse[
"warnings"].as_array().push_back(
rpc::makeWarning(rpc::WarnRpcRateLimit));
366 jsonResponse[
"warnings"] = boost::json::array{
rpc::makeWarning(rpc::WarnRpcRateLimit)};
372 shouldReplaceParams(boost::json::object
const& req)
const
374 auto const hasParams = req.contains(JS(params));
375 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
376 auto const paramsIsEmptyString =
377 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
378 auto const paramsIsEmptyObject =
379 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
380 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
381 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
382 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
383 auto const firstArgIsNull = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
384 auto const firstArgIsEmptyString = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_string() and
385 req.at(JS(params)).as_array().at(0).as_string().empty();
388 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or arrayIsEmpty or
389 firstArgIsEmptyString or firstArgIsNull;
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47