Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RPCEngine.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
24#include "rpc/Errors.hpp"
25#include "rpc/RPCHelpers.hpp"
26#include "rpc/WorkQueue.hpp"
27#include "rpc/common/HandlerProvider.hpp"
28#include "rpc/common/Types.hpp"
29#include "rpc/common/impl/ForwardingProxy.hpp"
30#include "util/OverloadSet.hpp"
31#include "util/ResponseExpirationCache.hpp"
32#include "util/log/Logger.hpp"
33#include "web/Context.hpp"
34#include "web/dosguard/DOSGuardInterface.hpp"
35
36#include <boost/asio/spawn.hpp>
37#include <boost/iterator/transform_iterator.hpp>
38#include <boost/json.hpp>
39#include <boost/json/object.hpp>
40#include <fmt/core.h>
41#include <fmt/format.h>
42#include <xrpl/protocol/ErrorCodes.h>
43
44#include <chrono>
45#include <exception>
46#include <functional>
47#include <memory>
48#include <optional>
49#include <string>
50#include <unordered_set>
51#include <utility>
52
56namespace rpc {
57
61template <typename CountersType>
62class RPCEngine {
63 util::Logger perfLog_{"Performance"};
64 util::Logger log_{"RPC"};
65
66 std::shared_ptr<BackendInterface> backend_;
67 std::reference_wrapper<web::dosguard::DOSGuardInterface const> dosGuard_;
68 std::reference_wrapper<WorkQueue> workQueue_;
69 std::reference_wrapper<CountersType> counters_;
70
71 std::shared_ptr<HandlerProvider const> handlerProvider_;
72
74
75 std::optional<util::ResponseExpirationCache> responseCache_;
76
77public:
91 std::shared_ptr<BackendInterface> const& backend,
92 std::shared_ptr<etlng::LoadBalancerInterface> const& balancer,
94 WorkQueue& workQueue,
95 CountersType& counters,
96 std::shared_ptr<HandlerProvider const> const& handlerProvider
97 )
98 : backend_{backend}
99 , dosGuard_{std::cref(dosGuard)}
100 , workQueue_{std::ref(workQueue)}
101 , counters_{std::ref(counters)}
102 , handlerProvider_{handlerProvider}
103 , forwardingProxy_{balancer, counters, handlerProvider}
104 {
105 // Let main thread catch the exception if config type is wrong
106 auto const cacheTimeout = config.get<float>("rpc.cache_timeout");
107
108 if (cacheTimeout > 0.f) {
109 LOG(log_.info()) << fmt::format("Init RPC Cache, timeout: {} seconds", cacheTimeout);
110
111 responseCache_.emplace(
113 std::unordered_set<std::string>{"server_info"}
114 );
115 }
116 }
117
130 static std::shared_ptr<RPCEngine>
133 std::shared_ptr<BackendInterface> const& backend,
134 std::shared_ptr<etlng::LoadBalancerInterface> const& balancer,
135 web::dosguard::DOSGuardInterface const& dosGuard,
136 WorkQueue& workQueue,
137 CountersType& counters,
138 std::shared_ptr<HandlerProvider const> const& handlerProvider
139 )
140 {
141 return std::make_shared<RPCEngine>(config, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
142 }
143
150 Result
152 {
153 if (forwardingProxy_.shouldForward(ctx)) {
154 // Disallow forwarding of the admin api, only user api is allowed for security reasons.
155 if (isAdminCmd(ctx.method, ctx.params))
156 return Result{Status{RippledError::rpcNO_PERMISSION}};
157
158 return forwardingProxy_.forward(ctx);
159 }
160
161 if (not ctx.isAdmin and responseCache_ and responseCache_->shouldCache(ctx.method)) {
162 auto updater =
163 [this, &ctx](boost::asio::yield_context
164 ) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
165 auto result = buildResponseImpl(ctx);
166 auto extracted = std::visit(
168 [&result](Status status
169 ) -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> {
170 return std::unexpected{util::ResponseExpirationCache::Error{
171 .status = std::move(status), .warnings = std::move(result.warnings)
172 }};
173 },
174 [](boost::json::object obj
175 ) -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> { return obj; }
176 },
177 std::move(result.response)
178 );
179 if (extracted.has_value()) {
181 .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(extracted).value()
182 };
183 }
184 return std::unexpected{std::move(extracted).error()};
185 };
186
187 auto result = responseCache_->getOrUpdate(
188 ctx.yield,
189 ctx.method,
190 std::move(updater),
191 [&ctx](util::ResponseExpirationCache::EntryData const& entry) {
192 return not ctx.isAdmin and not entry.response.contains("error");
193 }
194 );
195 if (result.has_value()) {
196 return Result{std::move(result).value()};
197 }
198
199 auto error = std::move(result).error();
200 Result errorResult{std::move(error.status)};
201 errorResult.warnings = std::move(error.warnings);
202 return errorResult;
203 }
204
205 return buildResponseImpl(ctx);
206 }
207
216 template <typename FnType>
217 bool
218 post(FnType&& func, std::string const& ip)
219 {
220 return workQueue_.get().postCoro(std::forward<FnType>(func), dosGuard_.get().isWhiteListed(ip));
221 }
222
229 void
230 notifyComplete(std::string const& method, std::chrono::microseconds const& duration)
231 {
232 if (validHandler(method))
233 counters_.get().rpcComplete(method, duration);
234 }
235
243 void
244 notifyFailed(std::string const& method)
245 {
246 // FIXME: seems like this is not used?
247 if (validHandler(method))
248 counters_.get().rpcFailed(method);
249 }
250
258 void
259 notifyErrored(std::string const& method)
260 {
261 if (validHandler(method))
262 counters_.get().rpcErrored(method);
263 }
264
268 void
270 {
271 counters_.get().onTooBusy();
272 }
273
279 void
281 {
282 counters_.get().onNotReady();
283 }
284
288 void
290 {
291 counters_.get().onBadSyntax();
292 }
293
297 void
299 {
300 counters_.get().onUnknownCommand();
301 }
302
306 void
308 {
309 counters_.get().onInternalError();
310 }
311
312private:
313 bool
314 validHandler(std::string const& method) const
315 {
316 return handlerProvider_->contains(method) || forwardingProxy_.isProxied(method);
317 }
318
319 Result
320 buildResponseImpl(web::Context const& ctx)
321 {
322 if (backend_->isTooBusy()) {
323 LOG(log_.error()) << "Database is too busy. Rejecting request";
324 notifyTooBusy(); // TODO: should we add ctx.method if we have it?
325 return Result{Status{RippledError::rpcTOO_BUSY}};
326 }
327
328 auto const method = handlerProvider_->getHandler(ctx.method);
329 if (!method) {
331 return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
332 }
333
334 try {
335 LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
336
337 auto const context = Context{
338 .yield = ctx.yield,
339 .session = ctx.session,
340 .isAdmin = ctx.isAdmin,
341 .clientIp = ctx.clientIp,
342 .apiVersion = ctx.apiVersion
343 };
344 auto v = (*method).process(ctx.params, context);
345
346 LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
347
348 if (not v) {
349 notifyErrored(ctx.method);
350 }
351
352 return Result{std::move(v)};
353 } catch (data::DatabaseTimeout const& t) {
354 LOG(log_.error()) << "Database timeout";
356
357 return Result{Status{RippledError::rpcTOO_BUSY}};
358 } catch (std::exception const& ex) {
359 LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
361
362 return Result{Status{RippledError::rpcINTERNAL}};
363 }
364 }
365};
366
367} // namespace rpc
Represents a database timeout error.
Definition BackendInterface.hpp:58
The RPC engine that ties all RPC-related functionality together.
Definition RPCEngine.hpp:62
void notifyNotReady()
Notify the system that the RPC system was not ready to handle an incoming request.
Definition RPCEngine.hpp:280
RPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > const &backend, std::shared_ptr< etlng::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Construct a new RPCEngine object.
Definition RPCEngine.hpp:89
void notifyInternalError()
Notify the system that the incoming request lead to an internal error (unrecoverable).
Definition RPCEngine.hpp:307
void notifyErrored(std::string const &method)
Notify the system that specified method failed due to some unrecoverable error.
Definition RPCEngine.hpp:259
void notifyUnknownCommand()
Notify the system that the incoming request specified an unknown/unsupported method/command.
Definition RPCEngine.hpp:298
void notifyBadSyntax()
Notify the system that the incoming request did not specify the RPC method/command.
Definition RPCEngine.hpp:289
void notifyFailed(std::string const &method)
Notify the system that specified method failed to execute due to a recoverable user error.
Definition RPCEngine.hpp:244
void notifyTooBusy()
Notify the system that the RPC system is too busy to handle an incoming request.
Definition RPCEngine.hpp:269
void notifyComplete(std::string const &method, std::chrono::microseconds const &duration)
Notify the system that specified method was executed.
Definition RPCEngine.hpp:230
Result buildResponse(web::Context const &ctx)
Main request processor routine.
Definition RPCEngine.hpp:151
static std::shared_ptr< RPCEngine > makeRPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > const &backend, std::shared_ptr< etlng::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Factory function to create a new instance of the RPC engine.
Definition RPCEngine.hpp:131
bool post(FnType &&func, std::string const &ip)
Used to schedule request processing onto the work queue.
Definition RPCEngine.hpp:218
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:46
Definition ForwardingProxy.hpp:40
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:267
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
static std::chrono::milliseconds toMilliseconds(float value)
Method to convert a float seconds value to milliseconds.
Definition ConfigDefinition.cpp:123
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
The interface of a denial of service guard.
Definition DOSGuardInterface.hpp:44
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
bool isAdminCmd(std::string const &method, boost::json::object const &request)
Check whether a request requires administrative privileges on rippled side.
Definition RPCHelpers.cpp:1487
Result type used to return responses or error statuses to the Webserver subsystem.
Definition Types.hpp:129
A status returned from any RPC handler.
Definition Errors.hpp:82
Overload set for lambdas.
Definition OverloadSet.hpp:30
A data structure to store a cache entry with its timestamp.
Definition ResponseExpirationCache.hpp:50
A data structure to represent errors that can occur during an update of the cache.
Definition ResponseExpirationCache.hpp:58
Context that is used by the Webserver to pass around information about an incoming request.
Definition Context.hpp:40