65 std::shared_ptr<BackendInterface const>
const backend_;
66 std::shared_ptr<RPCEngineType>
const rpcEngine_;
67 std::shared_ptr<etl::ETLServiceInterface const>
const etl_;
70 std::reference_wrapper<web::dosguard::DOSGuardInterface> dosguard_;
87 std::shared_ptr<BackendInterface const>
const& backend,
88 std::shared_ptr<RPCEngineType>
const& rpcEngine,
89 std::shared_ptr<etl::ETLServiceInterface const>
const& etl,
93 , rpcEngine_(rpcEngine)
96 , apiVersionParser_(config.getObject(
"api_version"))
108 operator()(std::string
const& request, std::shared_ptr<web::ConnectionBase>
const& connection)
110 if (not dosguard_.get().isOk(connection->clientIp())) {
111 connection->sendSlowDown(request);
116 auto req = boost::json::parse(request).as_object();
117 LOG(perfLog_.debug()) << connection->tag() <<
"Adding to work queue";
119 if (not connection->upgraded and shouldReplaceParams(req))
120 req[JS(params)] = boost::json::array({boost::json::object{}});
122 if (not dosguard_.get().request(connection->clientIp(), req)) {
123 connection->sendSlowDown(request);
127 if (!rpcEngine_->post(
128 [
this, request = std::move(req), connection](
129 boost::asio::yield_context yield
130 )
mutable { handleRequest(yield, std::move(request), connection); },
131 connection->clientIp()
133 rpcEngine_->notifyTooBusy();
136 }
catch (boost::system::system_error
const& ex) {
138 rpcEngine_->notifyBadSyntax();
140 LOG(log_.warn()) <<
"Error parsing JSON: " << ex.what() <<
". For request: " << request;
141 }
catch (std::invalid_argument
const& ex) {
143 rpcEngine_->notifyBadSyntax();
144 LOG(log_.warn()) <<
"Invalid argument error: " << ex.what()
145 <<
". For request: " << request;
147 }
catch (std::exception
const& ex) {
148 LOG(perfLog_.error()) << connection->tag() <<
"Caught exception: " << ex.what();
149 rpcEngine_->notifyInternalError();
157 boost::asio::yield_context yield,
158 boost::json::object&& request,
159 std::shared_ptr<web::ConnectionBase>
const& connection
162 LOG(log_.
info()) << connection->tag() << (connection->upgraded ?
"ws" :
"http")
164 <<
" ip = " << connection->clientIp();
167 auto const range = backend_->fetchLedgerRange();
170 rpcEngine_->notifyNotReady();
176 auto const context = [&] {
177 if (connection->upgraded) {
181 connection->makeSubscriptionContext(tagFactory_),
182 tagFactory_.
with(connection->tag()),
184 connection->clientIp(),
185 std::cref(apiVersionParser_),
186 connection->isAdmin()
192 tagFactory_.with(connection->tag()),
194 connection->clientIp(),
195 std::cref(apiVersionParser_),
196 connection->isAdmin()
201 auto const err = context.error();
203 << connection->tag() <<
"Could not create Web context: " << err;
204 LOG(log_.warn()) << connection->tag() <<
"Could not create Web context: " << err;
209 rpcEngine_->notifyBadSyntax();
210 web::impl::ErrorHelper(connection, std::move(request)).sendError(err);
215 auto [result, timeDiff] =
216 util::timed([&]() {
return rpcEngine_->buildResponse(*context); });
218 auto const us = std::chrono::duration<int, std::milli>(timeDiff);
221 boost::json::object response;
223 if (!result.response.has_value()) {
225 response = web::impl::ErrorHelper(connection, request)
226 .composeError(result.response.error());
227 auto const responseStr = boost::json::serialize(response);
229 LOG(perfLog_.debug()) << context->tag() <<
"Encountered error: " << responseStr;
230 LOG(log_.debug()) << context->tag() <<
"Encountered error: " << responseStr;
232 auto& json = result.response.value();
233 auto const isForwarded = json.contains(
"forwarded") &&
234 json.at(
"forwarded").is_bool() && json.at(
"forwarded").as_bool();
238 rpcEngine_->notifyComplete(*context, us, isForwarded);
241 json.erase(
"forwarded");
246 if (isForwarded && (json.contains(JS(result)) || connection->upgraded)) {
247 for (
auto const& [k, v] : json)
248 response.insert_or_assign(k, v);
250 response[JS(result)] = json;
254 response[
"forwarded"] =
true;
258 if (connection->upgraded) {
259 auto const appendFieldIfExist = [&](
auto const& field) {
260 if (request.contains(field) and not request.at(field).is_null())
261 response[field] = request.at(field);
264 appendFieldIfExist(JS(
id));
265 appendFieldIfExist(JS(api_version));
267 if (!response.contains(JS(error)))
268 response[JS(status)] = JS(success);
270 response[JS(type)] = JS(response);
272 if (response.contains(JS(result)) &&
273 !response[JS(result)].as_object().contains(JS(error)))
274 response[JS(result)].as_object()[JS(status)] = JS(success);
278 boost::json::array warnings = std::move(result.warnings);
281 if (etl_->lastCloseAgeSeconds() >= 60)
284 response[
"warnings"] = warnings;
285 connection->send(boost::json::serialize(response));
286 }
catch (std::exception
const& ex) {
289 LOG(perfLog_.error()) << connection->tag() <<
"Caught exception: " << ex.what();
290 LOG(log_.error()) << connection->tag() <<
"Caught exception: " << ex.what();
292 rpcEngine_->notifyInternalError();
293 web::impl::ErrorHelper(connection, std::move(request)).sendInternalError();
300 shouldReplaceParams(boost::json::object
const& req)
const
302 auto const hasParams = req.contains(JS(params));
303 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
304 auto const paramsIsEmptyString =
305 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
306 auto const paramsIsEmptyObject =
307 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
308 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
309 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
310 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
311 auto const firstArgIsNull =
312 arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
313 auto const firstArgIsEmptyString = arrayIsNotEmpty and
314 req.at(JS(params)).as_array().at(0).is_string() and
315 req.at(JS(params)).as_array().at(0).as_string().empty();
318 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or
319 arrayIsEmpty or firstArgIsEmptyString or firstArgIsNull;
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:81