22#include "data/BackendInterface.hpp"
26#include "rpc/WorkQueue.hpp"
27#include "rpc/common/HandlerProvider.hpp"
28#include "rpc/common/Types.hpp"
29#include "rpc/common/impl/ForwardingProxy.hpp"
30#include "util/OverloadSet.hpp"
31#include "util/ResponseExpirationCache.hpp"
32#include "util/log/Logger.hpp"
33#include "web/Context.hpp"
34#include "web/dosguard/DOSGuardInterface.hpp"
36#include <boost/asio/spawn.hpp>
37#include <boost/iterator/transform_iterator.hpp>
38#include <boost/json.hpp>
39#include <boost/json/object.hpp>
41#include <fmt/format.h>
42#include <xrpl/protocol/ErrorCodes.h>
50#include <unordered_set>
61template <
typename CountersType>
66 std::shared_ptr<BackendInterface> backend_;
67 std::reference_wrapper<web::dosguard::DOSGuardInterface const> dosGuard_;
68 std::reference_wrapper<WorkQueue> workQueue_;
69 std::reference_wrapper<CountersType> counters_;
71 std::shared_ptr<HandlerProvider const> handlerProvider_;
75 std::optional<util::ResponseExpirationCache> responseCache_;
91 std::shared_ptr<BackendInterface>
const& backend,
92 std::shared_ptr<etlng::LoadBalancerInterface>
const& balancer,
95 CountersType& counters,
96 std::shared_ptr<HandlerProvider const>
const& handlerProvider
99 , dosGuard_{std::cref(dosGuard)}
100 , workQueue_{std::ref(workQueue)}
101 , counters_{std::ref(counters)}
102 , handlerProvider_{handlerProvider}
103 , forwardingProxy_{balancer, counters, handlerProvider}
106 auto const cacheTimeout = config.
get<
float>(
"rpc.cache_timeout");
108 if (cacheTimeout > 0.f) {
109 LOG(log_.
info()) << fmt::format(
"Init RPC Cache, timeout: {} seconds", cacheTimeout);
111 responseCache_.emplace(
113 std::unordered_set<std::string>{
"server_info"}
130 static std::shared_ptr<RPCEngine>
133 std::shared_ptr<BackendInterface>
const& backend,
134 std::shared_ptr<etlng::LoadBalancerInterface>
const& balancer,
137 CountersType& counters,
138 std::shared_ptr<HandlerProvider const>
const& handlerProvider
141 return std::make_shared<RPCEngine>(config, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
153 if (forwardingProxy_.shouldForward(ctx)) {
158 return forwardingProxy_.forward(ctx);
161 if (not ctx.isAdmin and responseCache_ and responseCache_->shouldCache(ctx.method)) {
163 [
this, &ctx](boost::asio::yield_context
164 ) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
165 auto result = buildResponseImpl(ctx);
166 auto extracted = std::visit(
169 ) -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> {
171 .status = std::move(status), .warnings = std::move(result.warnings)
174 [](boost::json::object obj
175 ) -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> {
return obj; }
177 std::move(result.response)
179 if (extracted.has_value()) {
181 .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(extracted).value()
184 return std::unexpected{std::move(extracted).error()};
187 auto result = responseCache_->getOrUpdate(
192 return not ctx.isAdmin and not entry.response.contains(
"error");
195 if (result.has_value()) {
196 return Result{std::move(result).value()};
199 auto error = std::move(result).error();
200 Result errorResult{std::move(error.status)};
201 errorResult.warnings = std::move(error.warnings);
205 return buildResponseImpl(ctx);
216 template <
typename FnType>
218 post(FnType&& func, std::string
const& ip)
220 return workQueue_.get().postCoro(std::forward<FnType>(func), dosGuard_.get().isWhiteListed(ip));
230 notifyComplete(std::string
const& method, std::chrono::microseconds
const& duration)
232 if (validHandler(method))
233 counters_.get().rpcComplete(method, duration);
247 if (validHandler(method))
248 counters_.get().rpcFailed(method);
261 if (validHandler(method))
262 counters_.get().rpcErrored(method);
271 counters_.get().onTooBusy();
282 counters_.get().onNotReady();
291 counters_.get().onBadSyntax();
300 counters_.get().onUnknownCommand();
309 counters_.get().onInternalError();
314 validHandler(std::string
const& method)
const
316 return handlerProvider_->contains(method) || forwardingProxy_.isProxied(method);
322 if (backend_->isTooBusy()) {
323 LOG(log_.
error()) <<
"Database is too busy. Rejecting request";
325 return Result{Status{RippledError::rpcTOO_BUSY}};
328 auto const method = handlerProvider_->getHandler(ctx.method);
331 return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
335 LOG(perfLog_.
debug()) << ctx.
tag() <<
" start executing rpc `" << ctx.method <<
'`';
337 auto const context = Context{
339 .session = ctx.session,
340 .isAdmin = ctx.isAdmin,
341 .clientIp = ctx.clientIp,
342 .apiVersion = ctx.apiVersion
344 auto v = (*method).process(ctx.params, context);
346 LOG(perfLog_.
debug()) << ctx.
tag() <<
" finish executing rpc `" << ctx.method <<
'`';
352 return Result{std::move(v)};
354 LOG(log_.
error()) <<
"Database timeout";
357 return Result{Status{RippledError::rpcTOO_BUSY}};
358 }
catch (std::exception
const& ex) {
359 LOG(log_.
error()) << ctx.
tag() <<
"Caught exception: " << ex.what();
362 return Result{Status{RippledError::rpcINTERNAL}};
Represents a database timeout error.
Definition BackendInterface.hpp:58
The RPC engine that ties all RPC-related functionality together.
Definition RPCEngine.hpp:62
void notifyNotReady()
Notify the system that the RPC system was not ready to handle an incoming request.
Definition RPCEngine.hpp:280
RPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > const &backend, std::shared_ptr< etlng::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Construct a new RPCEngine object.
Definition RPCEngine.hpp:89
void notifyInternalError()
Notify the system that the incoming request lead to an internal error (unrecoverable).
Definition RPCEngine.hpp:307
void notifyErrored(std::string const &method)
Notify the system that specified method failed due to some unrecoverable error.
Definition RPCEngine.hpp:259
void notifyUnknownCommand()
Notify the system that the incoming request specified an unknown/unsupported method/command.
Definition RPCEngine.hpp:298
void notifyBadSyntax()
Notify the system that the incoming request did not specify the RPC method/command.
Definition RPCEngine.hpp:289
void notifyFailed(std::string const &method)
Notify the system that specified method failed to execute due to a recoverable user error.
Definition RPCEngine.hpp:244
void notifyTooBusy()
Notify the system that the RPC system is too busy to handle an incoming request.
Definition RPCEngine.hpp:269
void notifyComplete(std::string const &method, std::chrono::microseconds const &duration)
Notify the system that specified method was executed.
Definition RPCEngine.hpp:230
Result buildResponse(web::Context const &ctx)
Main request processor routine.
Definition RPCEngine.hpp:151
static std::shared_ptr< RPCEngine > makeRPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > const &backend, std::shared_ptr< etlng::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Factory function to create a new instance of the RPC engine.
Definition RPCEngine.hpp:131
bool post(FnType &&func, std::string const &ip)
Used to schedule request processing onto the work queue.
Definition RPCEngine.hpp:218
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:46
Definition ForwardingProxy.hpp:40
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:267
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
static std::chrono::milliseconds toMilliseconds(float value)
Method to convert a float seconds value to milliseconds.
Definition ConfigDefinition.cpp:123
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
The interface of a denial of service guard.
Definition DOSGuardInterface.hpp:44
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
bool isAdminCmd(std::string const &method, boost::json::object const &request)
Check whether a request requires administrative privileges on rippled side.
Definition RPCHelpers.cpp:1487
Result type used to return responses or error statuses to the Webserver subsystem.
Definition Types.hpp:129
A status returned from any RPC handler.
Definition Errors.hpp:82
Overload set for lambdas.
Definition OverloadSet.hpp:30
A data structure to store a cache entry with its timestamp.
Definition ResponseExpirationCache.hpp:50
A data structure to represent errors that can occur during an update of the cache.
Definition ResponseExpirationCache.hpp:58
Context that is used by the Webserver to pass around information about an incoming request.
Definition Context.hpp:40