64 std::shared_ptr<BackendInterface const>
const backend_;
65 std::shared_ptr<RPCEngineType>
const rpcEngine_;
66 std::shared_ptr<etlng::ETLServiceInterface const>
const etl_;
84 std::shared_ptr<BackendInterface const>
const& backend,
85 std::shared_ptr<RPCEngineType>
const& rpcEngine,
86 std::shared_ptr<etlng::ETLServiceInterface const>
const&
etl
89 , rpcEngine_(rpcEngine)
92 , apiVersionParser_(config.getObject(
"api_version"))
103 operator()(std::string
const& request, std::shared_ptr<web::ConnectionBase>
const& connection)
106 auto req = boost::json::parse(request).as_object();
107 LOG(perfLog_.
debug()) << connection->tag() <<
"Adding to work queue";
109 if (not connection->upgraded and shouldReplaceParams(req))
110 req[JS(params)] = boost::json::array({boost::json::object{}});
112 if (!rpcEngine_->post(
113 [
this, request = std::move(req), connection](boost::asio::yield_context yield)
mutable {
114 handleRequest(yield, std::move(request), connection);
118 rpcEngine_->notifyTooBusy();
121 }
catch (boost::system::system_error
const& ex) {
123 rpcEngine_->notifyBadSyntax();
125 LOG(log_.
warn()) <<
"Error parsing JSON: " << ex.what() <<
". For request: " << request;
126 }
catch (std::invalid_argument
const& ex) {
128 rpcEngine_->notifyBadSyntax();
129 LOG(log_.
warn()) <<
"Invalid argument error: " << ex.what() <<
". For request: " << request;
131 }
catch (std::exception
const& ex) {
132 LOG(perfLog_.
error()) << connection->tag() <<
"Caught exception: " << ex.what();
133 rpcEngine_->notifyInternalError();
141 boost::asio::yield_context yield,
142 boost::json::object&& request,
143 std::shared_ptr<web::ConnectionBase>
const& connection
146 LOG(log_.
info()) << connection->tag() << (connection->upgraded ?
"ws" :
"http")
148 <<
" ip = " << connection->clientIp;
151 auto const range = backend_->fetchLedgerRange();
154 rpcEngine_->notifyNotReady();
160 auto const context = [&] {
161 if (connection->upgraded) {
165 connection->makeSubscriptionContext(tagFactory_),
166 tagFactory_.
with(connection->tag()),
168 connection->clientIp,
169 std::cref(apiVersionParser_),
170 connection->isAdmin()
176 tagFactory_.
with(connection->tag()),
178 connection->clientIp,
179 std::cref(apiVersionParser_),
180 connection->isAdmin()
185 auto const err = context.error();
186 LOG(perfLog_.
warn()) << connection->tag() <<
"Could not create Web context: " << err;
187 LOG(log_.
warn()) << connection->tag() <<
"Could not create Web context: " << err;
191 rpcEngine_->notifyBadSyntax();
197 auto [result, timeDiff] =
util::timed([&]() {
return rpcEngine_->buildResponse(*context); });
199 auto const us = std::chrono::duration<int, std::milli>(timeDiff);
202 boost::json::object response;
204 if (
auto const status = std::get_if<rpc::Status>(&result.response)) {
207 auto const responseStr = boost::json::serialize(response);
209 LOG(perfLog_.
debug()) << context->tag() <<
"Encountered error: " << responseStr;
210 LOG(log_.
debug()) << context->tag() <<
"Encountered error: " << responseStr;
213 rpcEngine_->notifyComplete(context->method, us);
215 auto& json = std::get<boost::json::object>(result.response);
216 auto const isForwarded =
217 json.contains(
"forwarded") && json.at(
"forwarded").is_bool() && json.at(
"forwarded").as_bool();
220 json.erase(
"forwarded");
225 if (isForwarded && (json.contains(JS(result)) || connection->upgraded)) {
226 for (
auto const& [k, v] : json)
227 response.insert_or_assign(k, v);
229 response[JS(result)] = json;
233 response[
"forwarded"] =
true;
237 if (connection->upgraded) {
238 auto const appendFieldIfExist = [&](
auto const& field) {
239 if (request.contains(field) and not request.at(field).is_null())
240 response[field] = request.at(field);
243 appendFieldIfExist(JS(
id));
244 appendFieldIfExist(JS(api_version));
246 if (!response.contains(JS(error)))
247 response[JS(status)] = JS(success);
249 response[JS(type)] = JS(response);
251 if (response.contains(JS(result)) && !response[JS(result)].as_object().contains(JS(error)))
252 response[JS(result)].as_object()[JS(status)] = JS(success);
256 boost::json::array warnings = std::move(result.warnings);
259 if (etl_->lastCloseAgeSeconds() >= 60)
262 response[
"warnings"] = warnings;
263 connection->send(boost::json::serialize(response));
264 }
catch (std::exception
const& ex) {
267 LOG(perfLog_.
error()) << connection->tag() <<
"Caught exception: " << ex.what();
268 LOG(log_.
error()) << connection->tag() <<
"Caught exception: " << ex.what();
270 rpcEngine_->notifyInternalError();
278 shouldReplaceParams(boost::json::object
const& req)
const
280 auto const hasParams = req.contains(JS(params));
281 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
282 auto const paramsIsEmptyString =
283 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
284 auto const paramsIsEmptyObject =
285 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
286 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
287 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
288 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
289 auto const firstArgIsNull = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
290 auto const firstArgIsEmptyString = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_string() and
291 req.at(JS(params)).as_array().at(0).as_string().empty();
294 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or arrayIsEmpty or
295 firstArgIsEmptyString or firstArgIsNull;
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:77