Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RPCEngine.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
5#include "rpc/Errors.hpp"
6#include "rpc/RPCHelpers.hpp"
7#include "rpc/WorkQueue.hpp"
8#include "rpc/common/HandlerProvider.hpp"
9#include "rpc/common/Types.hpp"
10#include "rpc/common/impl/ForwardingProxy.hpp"
11#include "util/ResponseExpirationCache.hpp"
12#include "util/log/Logger.hpp"
13#include "web/Context.hpp"
14#include "web/dosguard/DOSGuardInterface.hpp"
15
16#include <boost/asio/spawn.hpp>
17#include <boost/iterator/transform_iterator.hpp>
18#include <boost/json.hpp>
19#include <boost/json/object.hpp>
20#include <fmt/format.h>
21#include <xrpl/protocol/ErrorCodes.h>
22
23#include <chrono>
24#include <cstdint>
25#include <exception>
26#include <functional>
27#include <memory>
28#include <optional>
29#include <string>
30#include <unordered_set>
31#include <utility>
32
36namespace rpc {
37
41template <typename CountersType>
42class RPCEngine {
43 util::Logger perfLog_{"Performance"};
44 util::Logger log_{"RPC"};
45
46 std::shared_ptr<BackendInterface> backend_;
47 std::reference_wrapper<web::dosguard::DOSGuardInterface const> dosGuard_;
48 std::reference_wrapper<WorkQueue> workQueue_;
49 std::reference_wrapper<CountersType> counters_;
50
51 std::shared_ptr<HandlerProvider const> handlerProvider_;
52
54
55 std::optional<util::ResponseExpirationCache> responseCache_;
56
57public:
71 std::shared_ptr<BackendInterface> backend,
72 std::shared_ptr<etl::LoadBalancerInterface> const& balancer,
74 WorkQueue& workQueue,
75 CountersType& counters,
76 std::shared_ptr<HandlerProvider const> const& handlerProvider
77 )
78 : backend_{std::move(backend)}
79 , dosGuard_{std::cref(dosGuard)}
80 , workQueue_{std::ref(workQueue)}
81 , counters_{std::ref(counters)}
82 , handlerProvider_{handlerProvider}
83 , forwardingProxy_{balancer, counters, handlerProvider}
84 {
85 // Let main thread catch the exception if config type is wrong
86 auto const cacheTimeout = config.get<float>("rpc.cache_timeout");
87
88 if (cacheTimeout > 0.f) {
89 LOG(log_.info()) << fmt::format("Init RPC Cache, timeout: {} seconds", cacheTimeout);
90
91 responseCache_.emplace(
93 std::unordered_set<std::string>{"server_info"}
94 );
95 }
96 }
97
110 static std::shared_ptr<RPCEngine>
113 std::shared_ptr<BackendInterface> const& backend,
114 std::shared_ptr<etl::LoadBalancerInterface> const& balancer,
115 web::dosguard::DOSGuardInterface const& dosGuard,
116 WorkQueue& workQueue,
117 CountersType& counters,
118 std::shared_ptr<HandlerProvider const> const& handlerProvider
119 )
120 {
121 return std::make_shared<RPCEngine>(
122 config, backend, balancer, dosGuard, workQueue, counters, handlerProvider
123 );
124 }
125
132 Result
134 {
135 if (forwardingProxy_.shouldForward(ctx)) {
136 // Disallow forwarding of the admin api, only user api is allowed for security reasons.
137 if (isAdminCmd(ctx.method, ctx.params))
138 return Result{Status{RippledError::rpcNO_PERMISSION}};
139
140 return forwardingProxy_.forward(ctx);
141 }
142
143 if (not ctx.isAdmin and responseCache_) {
144 if (auto res = responseCache_->get(ctx.method); res.has_value())
145 return Result{std::move(res).value()};
146 }
147
148 if (backend_->isTooBusy()) {
149 LOG(log_.error()) << "Database is too busy. Rejecting request";
150 notifyTooBusy(); // TODO: should we add ctx.method if we have it?
151 return Result{Status{RippledError::rpcTOO_BUSY}};
152 }
153
154 auto const method = handlerProvider_->getHandler(ctx.method);
155 if (!method) {
157 return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
158 }
159
160 try {
161 LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
162
163 auto const context = Context{
164 .yield = ctx.yield,
165 .session = ctx.session,
166 .isAdmin = ctx.isAdmin,
167 .clientIp = ctx.clientIp,
168 .apiVersion = ctx.apiVersion
169 };
170 auto v = (*method).process(ctx.params, context);
171
172 LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
173
174 if (not v) {
175 notifyErrored(ctx.method);
176 } else if (not ctx.isAdmin and responseCache_) {
177 responseCache_->put(ctx.method, v.result->as_object());
178 }
179
180 return Result{std::move(v)};
181 } catch (data::DatabaseTimeout const& t) {
182 LOG(log_.error()) << "Database timeout";
184
185 return Result{Status{RippledError::rpcTOO_BUSY}};
186 } catch (std::exception const& ex) {
187 LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
189
190 return Result{Status{RippledError::rpcINTERNAL}};
191 }
192 }
193
202 template <typename FnType>
203 bool
204 post(FnType&& func, std::string const& ip)
205 {
206 return workQueue_.get().postCoro(
207 std::forward<FnType>(func), dosGuard_.get().isWhiteListed(ip)
208 );
209 }
210
218 void
220 web::Context const& context,
221 std::chrono::microseconds const& duration,
222 bool isForwarded
223 )
224 {
225 if (validHandler(context.method)) {
226 counters_.get().rpcComplete(context.method, duration);
227 if (not isForwarded) {
228 counters_.get().recordLedgerRequest(context.params, context.range.maxSequence);
229 }
230 }
231 }
232
239 void
240 recordLedgerMetrics(boost::json::object const& params, std::uint32_t currentLedgerSequence)
241 {
242 counters_.get().recordLedgerRequest(params, currentLedgerSequence);
243 }
244
253 void
254 notifyFailed(std::string const& method)
255 {
256 // FIXME: seems like this is not used?
257 if (validHandler(method))
258 counters_.get().rpcFailed(method);
259 }
260
268 void
269 notifyErrored(std::string const& method)
270 {
271 if (validHandler(method))
272 counters_.get().rpcErrored(method);
273 }
274
278 void
280 {
281 counters_.get().onTooBusy();
282 }
283
289 void
291 {
292 counters_.get().onNotReady();
293 }
294
298 void
300 {
301 counters_.get().onBadSyntax();
302 }
303
308 void
310 {
311 counters_.get().onUnknownCommand();
312 }
313
317 void
319 {
320 counters_.get().onInternalError();
321 }
322
323private:
324 bool
325 validHandler(std::string const& method) const
326 {
327 return handlerProvider_->contains(method) || forwardingProxy_.isProxied(method);
328 }
329
330 Result
331 buildResponseImpl(web::Context const& ctx)
332 {
333 if (backend_->isTooBusy()) {
334 LOG(log_.error()) << "Database is too busy. Rejecting request";
335 notifyTooBusy(); // TODO: should we add ctx.method if we have it?
336 return Result{Status{RippledError::rpcTOO_BUSY}};
337 }
338
339 auto const method = handlerProvider_->getHandler(ctx.method);
340 if (!method) {
342 return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
343 }
344
345 try {
346 LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
347
348 auto const context = Context{
349 .yield = ctx.yield,
350 .session = ctx.session,
351 .isAdmin = ctx.isAdmin,
352 .clientIp = ctx.clientIp,
353 .apiVersion = ctx.apiVersion
354 };
355 auto v = (*method).process(ctx.params, context);
356
357 LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
358
359 if (not v) {
360 notifyErrored(ctx.method);
361 }
362
363 return Result{std::move(v)};
364 } catch (data::DatabaseTimeout const& t) {
365 LOG(log_.error()) << "Database timeout";
367
368 return Result{Status{RippledError::rpcTOO_BUSY}};
369 } catch (std::exception const& ex) {
370 LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
372
373 return Result{Status{RippledError::rpcINTERNAL}};
374 }
375 }
376};
377
378} // namespace rpc
Represents a database timeout error.
Definition BackendInterface.hpp:40
void notifyNotReady()
Notify the system that the RPC system was not ready to handle an incoming request.
Definition RPCEngine.hpp:290
void notifyInternalError()
Notify the system that the incoming request lead to an internal error (unrecoverable).
Definition RPCEngine.hpp:318
void notifyErrored(std::string const &method)
Notify the system that specified method failed due to some unrecoverable error.
Definition RPCEngine.hpp:269
void notifyUnknownCommand()
Notify the system that the incoming request specified an unknown/unsupported method/command.
Definition RPCEngine.hpp:309
static std::shared_ptr< RPCEngine > makeRPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > const &backend, std::shared_ptr< etl::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Factory function to create a new instance of the RPC engine.
Definition RPCEngine.hpp:111
RPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > backend, std::shared_ptr< etl::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Construct a new RPCEngine object.
Definition RPCEngine.hpp:69
void notifyBadSyntax()
Notify the system that the incoming request did not specify the RPC method/command.
Definition RPCEngine.hpp:299
void recordLedgerMetrics(boost::json::object const &params, std::uint32_t currentLedgerSequence)
Record ledger request metrics.
Definition RPCEngine.hpp:240
void notifyFailed(std::string const &method)
Notify the system that specified method failed to execute due to a recoverable user error.
Definition RPCEngine.hpp:254
void notifyComplete(web::Context const &context, std::chrono::microseconds const &duration, bool isForwarded)
Notify the system that specified method was executed and record ledger metrics.
Definition RPCEngine.hpp:219
void notifyTooBusy()
Notify the system that the RPC system is too busy to handle an incoming request.
Definition RPCEngine.hpp:279
Result buildResponse(web::Context const &ctx)
Main request processor routine.
Definition RPCEngine.hpp:133
bool post(FnType &&func, std::string const &ip)
Used to schedule request processing onto the work queue.
Definition RPCEngine.hpp:204
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:43
Definition ForwardingProxy.hpp:22
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:498
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:264
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:31
static std::chrono::milliseconds toMilliseconds(float value)
Method to convert a float seconds value to milliseconds.
Definition ConfigDefinition.cpp:109
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:85
The interface of a denial of service guard.
Definition DOSGuardInterface.hpp:27
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:18
bool isAdminCmd(std::string const &method, boost::json::object const &request)
Check whether a request requires administrative privileges on rippled side.
Definition RPCHelpers.cpp:1604
Context of an RPC call.
Definition Types.hpp:99
Result type used to return responses or error statuses to the Webserver subsystem.
Definition Types.hpp:110
A status returned from any RPC handler.
Definition Errors.hpp:65
Context that is used by the Webserver to pass around information about an incoming request.
Definition Context.hpp:22