63 std::shared_ptr<BackendInterface const>
const backend_;
64 std::shared_ptr<RPCEngineType>
const rpcEngine_;
65 std::shared_ptr<ETLType const>
const etl_;
83 std::shared_ptr<BackendInterface const>
const& backend,
84 std::shared_ptr<RPCEngineType>
const& rpcEngine,
85 std::shared_ptr<ETLType const>
const&
etl
88 , rpcEngine_(rpcEngine)
91 , apiVersionParser_(config.getObject(
"api_version"))
102 operator()(std::string
const& request, std::shared_ptr<web::ConnectionBase>
const& connection)
105 auto req = boost::json::parse(request).as_object();
106 LOG(perfLog_.
debug()) << connection->tag() <<
"Adding to work queue";
108 if (not connection->upgraded and shouldReplaceParams(req))
109 req[JS(params)] = boost::json::array({boost::json::object{}});
111 if (!rpcEngine_->post(
112 [
this, request = std::move(req), connection](boost::asio::yield_context yield)
mutable {
113 handleRequest(yield, std::move(request), connection);
117 rpcEngine_->notifyTooBusy();
120 }
catch (boost::system::system_error
const& ex) {
122 rpcEngine_->notifyBadSyntax();
124 LOG(log_.
warn()) <<
"Error parsing JSON: " << ex.what() <<
". For request: " << request;
125 }
catch (std::invalid_argument
const& ex) {
127 rpcEngine_->notifyBadSyntax();
128 LOG(log_.
warn()) <<
"Invalid argument error: " << ex.what() <<
". For request: " << request;
130 }
catch (std::exception
const& ex) {
131 LOG(perfLog_.
error()) << connection->tag() <<
"Caught exception: " << ex.what();
132 rpcEngine_->notifyInternalError();
140 boost::asio::yield_context yield,
141 boost::json::object&& request,
142 std::shared_ptr<web::ConnectionBase>
const& connection
145 LOG(log_.
info()) << connection->tag() << (connection->upgraded ?
"ws" :
"http")
147 <<
" ip = " << connection->clientIp;
150 auto const range = backend_->fetchLedgerRange();
153 rpcEngine_->notifyNotReady();
159 auto const context = [&] {
160 if (connection->upgraded) {
164 connection->makeSubscriptionContext(tagFactory_),
165 tagFactory_.
with(connection->tag()),
167 connection->clientIp,
168 std::cref(apiVersionParser_),
169 connection->isAdmin()
175 tagFactory_.
with(connection->tag()),
177 connection->clientIp,
178 std::cref(apiVersionParser_),
179 connection->isAdmin()
184 auto const err = context.error();
185 LOG(perfLog_.
warn()) << connection->tag() <<
"Could not create Web context: " << err;
186 LOG(log_.
warn()) << connection->tag() <<
"Could not create Web context: " << err;
190 rpcEngine_->notifyBadSyntax();
196 auto [result, timeDiff] =
util::timed([&]() {
return rpcEngine_->buildResponse(*context); });
198 auto us = std::chrono::duration<int, std::milli>(timeDiff);
201 boost::json::object response;
203 if (
auto const status = std::get_if<rpc::Status>(&result.response)) {
206 auto const responseStr = boost::json::serialize(response);
208 LOG(perfLog_.
debug()) << context->tag() <<
"Encountered error: " << responseStr;
209 LOG(log_.
debug()) << context->tag() <<
"Encountered error: " << responseStr;
212 rpcEngine_->notifyComplete(context->method, us);
214 auto& json = std::get<boost::json::object>(result.response);
215 auto const isForwarded =
216 json.contains(
"forwarded") && json.at(
"forwarded").is_bool() && json.at(
"forwarded").as_bool();
219 json.erase(
"forwarded");
224 if (isForwarded && (json.contains(JS(result)) || connection->upgraded)) {
225 for (
auto const& [k, v] : json)
226 response.insert_or_assign(k, v);
228 response[JS(result)] = json;
232 response[
"forwarded"] =
true;
236 if (connection->upgraded) {
237 auto const appendFieldIfExist = [&](
auto const& field) {
238 if (request.contains(field) and not request.at(field).is_null())
239 response[field] = request.at(field);
242 appendFieldIfExist(JS(
id));
243 appendFieldIfExist(JS(api_version));
245 if (!response.contains(JS(error)))
246 response[JS(status)] = JS(success);
248 response[JS(type)] = JS(response);
250 if (response.contains(JS(result)) && !response[JS(result)].as_object().contains(JS(error)))
251 response[JS(result)].as_object()[JS(status)] = JS(success);
255 boost::json::array warnings = std::move(result.warnings);
258 if (etl_->lastCloseAgeSeconds() >= 60)
261 response[
"warnings"] = warnings;
262 connection->send(boost::json::serialize(response));
263 }
catch (std::exception
const& ex) {
266 LOG(perfLog_.
error()) << connection->tag() <<
"Caught exception: " << ex.what();
267 LOG(log_.
error()) << connection->tag() <<
"Caught exception: " << ex.what();
269 rpcEngine_->notifyInternalError();
277 shouldReplaceParams(boost::json::object
const& req)
const
279 auto const hasParams = req.contains(JS(params));
280 auto const paramsIsArray = hasParams and req.at(JS(params)).is_array();
281 auto const paramsIsEmptyString =
282 hasParams and req.at(JS(params)).is_string() and req.at(JS(params)).as_string().empty();
283 auto const paramsIsEmptyObject =
284 hasParams and req.at(JS(params)).is_object() and req.at(JS(params)).as_object().empty();
285 auto const paramsIsNull = hasParams and req.at(JS(params)).is_null();
286 auto const arrayIsEmpty = paramsIsArray and req.at(JS(params)).as_array().empty();
287 auto const arrayIsNotEmpty = paramsIsArray and not req.at(JS(params)).as_array().empty();
288 auto const firstArgIsNull = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_null();
289 auto const firstArgIsEmptyString = arrayIsNotEmpty and req.at(JS(params)).as_array().at(0).is_string() and
290 req.at(JS(params)).as_array().at(0).as_string().empty();
293 return not hasParams or paramsIsEmptyString or paramsIsNull or paramsIsEmptyObject or arrayIsEmpty or
294 firstArgIsEmptyString or firstArgIsNull;
std::expected< web::Context, Status > makeWsContext(boost::asio::yield_context yc, boost::json::object const &request, web::SubscriptionContextPtr session, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool isAdmin)
A factory function that creates a Websocket context.
Definition Factories.cpp:47
std::expected< web::Context, Status > makeHttpContext(boost::asio::yield_context yc, boost::json::object const &request, util::TagDecoratorFactory const &tagFactory, data::LedgerRange const &range, std::string const &clientIp, std::reference_wrapper< APIVersionParser const > apiVersionParser, bool const isAdmin)
A factory function that creates a HTTP context.
Definition Factories.cpp:77