Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
RPCEngine.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2022, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/BackendInterface.hpp"
24#include "rpc/Errors.hpp"
25#include "rpc/RPCHelpers.hpp"
26#include "rpc/WorkQueue.hpp"
27#include "rpc/common/HandlerProvider.hpp"
28#include "rpc/common/Types.hpp"
29#include "rpc/common/impl/ForwardingProxy.hpp"
30#include "util/OverloadSet.hpp"
31#include "util/ResponseExpirationCache.hpp"
32#include "util/log/Logger.hpp"
33#include "web/Context.hpp"
34#include "web/dosguard/DOSGuardInterface.hpp"
35
36#include <boost/asio/spawn.hpp>
37#include <boost/iterator/transform_iterator.hpp>
38#include <boost/json.hpp>
39#include <boost/json/object.hpp>
40#include <fmt/core.h>
41#include <fmt/format.h>
42#include <xrpl/protocol/ErrorCodes.h>
43
44#include <chrono>
45#include <exception>
46#include <functional>
47#include <memory>
48#include <optional>
49#include <string>
50#include <unordered_set>
51#include <utility>
52
56namespace rpc {
57
61template <typename CountersType>
62class RPCEngine {
63 util::Logger perfLog_{"Performance"};
64 util::Logger log_{"RPC"};
65
66 std::shared_ptr<BackendInterface> backend_;
67 std::reference_wrapper<web::dosguard::DOSGuardInterface const> dosGuard_;
68 std::reference_wrapper<WorkQueue> workQueue_;
69 std::reference_wrapper<CountersType> counters_;
70
71 std::shared_ptr<HandlerProvider const> handlerProvider_;
72
74
75 std::optional<util::ResponseExpirationCache> responseCache_;
76
77public:
91 std::shared_ptr<BackendInterface> const& backend,
92 std::shared_ptr<etlng::LoadBalancerInterface> const& balancer,
94 WorkQueue& workQueue,
95 CountersType& counters,
96 std::shared_ptr<HandlerProvider const> const& handlerProvider
97 )
98 : backend_{backend}
99 , dosGuard_{std::cref(dosGuard)}
100 , workQueue_{std::ref(workQueue)}
101 , counters_{std::ref(counters)}
102 , handlerProvider_{handlerProvider}
103 , forwardingProxy_{balancer, counters, handlerProvider}
104 {
105 // Let main thread catch the exception if config type is wrong
106 auto const cacheTimeout = config.get<float>("rpc.cache_timeout");
107
108 if (cacheTimeout > 0.f) {
109 LOG(log_.info()) << fmt::format("Init RPC Cache, timeout: {} seconds", cacheTimeout);
110
111 responseCache_.emplace(
113 std::unordered_set<std::string>{"server_info"}
114 );
115 }
116 }
117
130 static std::shared_ptr<RPCEngine>
133 std::shared_ptr<BackendInterface> const& backend,
134 std::shared_ptr<etlng::LoadBalancerInterface> const& balancer,
135 web::dosguard::DOSGuardInterface const& dosGuard,
136 WorkQueue& workQueue,
137 CountersType& counters,
138 std::shared_ptr<HandlerProvider const> const& handlerProvider
139 )
140 {
141 return std::make_shared<RPCEngine>(config, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
142 }
143
150 Result
152 {
153 if (forwardingProxy_.shouldForward(ctx)) {
154 // Disallow forwarding of the admin api, only user api is allowed for security reasons.
155 if (isAdminCmd(ctx.method, ctx.params))
156 return Result{Status{RippledError::rpcNO_PERMISSION}};
157
158 return forwardingProxy_.forward(ctx);
159 }
160
161 if (not ctx.isAdmin and responseCache_ and responseCache_->shouldCache(ctx.method)) {
162 auto updater =
163 [this, &ctx](boost::asio::yield_context
164 ) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
165 auto result = buildResponseImpl(ctx);
166
167 auto const extracted =
168 [&result]() -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> {
169 if (result.response.has_value()) {
170 return std::move(result.response).value();
171 }
172 return std::unexpected{util::ResponseExpirationCache::Error{
173 .status = std::move(result.response).error(), .warnings = std::move(result.warnings)
174 }};
175 }();
176
177 if (extracted.has_value()) {
179 .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(extracted).value()
180 };
181 }
182 return std::unexpected{std::move(extracted).error()};
183 };
184
185 auto result = responseCache_->getOrUpdate(
186 ctx.yield,
187 ctx.method,
188 std::move(updater),
189 [&ctx](util::ResponseExpirationCache::EntryData const& entry) {
190 return not ctx.isAdmin and not entry.response.contains("error");
191 }
192 );
193 if (result.has_value()) {
194 return Result{std::move(result).value()};
195 }
196
197 auto error = std::move(result).error();
198 Result errorResult{std::move(error.status)};
199 errorResult.warnings = std::move(error.warnings);
200 return errorResult;
201 }
202
203 return buildResponseImpl(ctx);
204 }
205
214 template <typename FnType>
215 bool
216 post(FnType&& func, std::string const& ip)
217 {
218 return workQueue_.get().postCoro(std::forward<FnType>(func), dosGuard_.get().isWhiteListed(ip));
219 }
220
227 void
228 notifyComplete(std::string const& method, std::chrono::microseconds const& duration)
229 {
230 if (validHandler(method))
231 counters_.get().rpcComplete(method, duration);
232 }
233
241 void
242 notifyFailed(std::string const& method)
243 {
244 // FIXME: seems like this is not used?
245 if (validHandler(method))
246 counters_.get().rpcFailed(method);
247 }
248
256 void
257 notifyErrored(std::string const& method)
258 {
259 if (validHandler(method))
260 counters_.get().rpcErrored(method);
261 }
262
266 void
268 {
269 counters_.get().onTooBusy();
270 }
271
277 void
279 {
280 counters_.get().onNotReady();
281 }
282
286 void
288 {
289 counters_.get().onBadSyntax();
290 }
291
295 void
297 {
298 counters_.get().onUnknownCommand();
299 }
300
304 void
306 {
307 counters_.get().onInternalError();
308 }
309
310private:
311 bool
312 validHandler(std::string const& method) const
313 {
314 return handlerProvider_->contains(method) || forwardingProxy_.isProxied(method);
315 }
316
317 Result
318 buildResponseImpl(web::Context const& ctx)
319 {
320 if (backend_->isTooBusy()) {
321 LOG(log_.error()) << "Database is too busy. Rejecting request";
322 notifyTooBusy(); // TODO: should we add ctx.method if we have it?
323 return Result{Status{RippledError::rpcTOO_BUSY}};
324 }
325
326 auto const method = handlerProvider_->getHandler(ctx.method);
327 if (!method) {
329 return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
330 }
331
332 try {
333 LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
334
335 auto const context = Context{
336 .yield = ctx.yield,
337 .session = ctx.session,
338 .isAdmin = ctx.isAdmin,
339 .clientIp = ctx.clientIp,
340 .apiVersion = ctx.apiVersion
341 };
342 auto v = (*method).process(ctx.params, context);
343
344 LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
345
346 if (not v) {
347 notifyErrored(ctx.method);
348 }
349
350 return Result{std::move(v)};
351 } catch (data::DatabaseTimeout const& t) {
352 LOG(log_.error()) << "Database timeout";
354
355 return Result{Status{RippledError::rpcTOO_BUSY}};
356 } catch (std::exception const& ex) {
357 LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
359
360 return Result{Status{RippledError::rpcINTERNAL}};
361 }
362 }
363};
364
365} // namespace rpc
Represents a database timeout error.
Definition BackendInterface.hpp:58
The RPC engine that ties all RPC-related functionality together.
Definition RPCEngine.hpp:62
void notifyNotReady()
Notify the system that the RPC system was not ready to handle an incoming request.
Definition RPCEngine.hpp:278
RPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > const &backend, std::shared_ptr< etlng::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Construct a new RPCEngine object.
Definition RPCEngine.hpp:89
void notifyInternalError()
Notify the system that the incoming request lead to an internal error (unrecoverable).
Definition RPCEngine.hpp:305
void notifyErrored(std::string const &method)
Notify the system that specified method failed due to some unrecoverable error.
Definition RPCEngine.hpp:257
void notifyUnknownCommand()
Notify the system that the incoming request specified an unknown/unsupported method/command.
Definition RPCEngine.hpp:296
void notifyBadSyntax()
Notify the system that the incoming request did not specify the RPC method/command.
Definition RPCEngine.hpp:287
void notifyFailed(std::string const &method)
Notify the system that specified method failed to execute due to a recoverable user error.
Definition RPCEngine.hpp:242
void notifyTooBusy()
Notify the system that the RPC system is too busy to handle an incoming request.
Definition RPCEngine.hpp:267
void notifyComplete(std::string const &method, std::chrono::microseconds const &duration)
Notify the system that specified method was executed.
Definition RPCEngine.hpp:228
Result buildResponse(web::Context const &ctx)
Main request processor routine.
Definition RPCEngine.hpp:151
static std::shared_ptr< RPCEngine > makeRPCEngine(util::config::ClioConfigDefinition const &config, std::shared_ptr< BackendInterface > const &backend, std::shared_ptr< etlng::LoadBalancerInterface > const &balancer, web::dosguard::DOSGuardInterface const &dosGuard, WorkQueue &workQueue, CountersType &counters, std::shared_ptr< HandlerProvider const > const &handlerProvider)
Factory function to create a new instance of the RPC engine.
Definition RPCEngine.hpp:131
bool post(FnType &&func, std::string const &ip)
Used to schedule request processing onto the work queue.
Definition RPCEngine.hpp:216
An asynchronous, thread-safe queue for RPC requests.
Definition WorkQueue.hpp:46
Definition ForwardingProxy.hpp:41
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump error(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::ERR severity.
Definition Logger.cpp:229
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:219
BaseTagDecorator const & tag() const
Getter for tag decorator.
Definition Taggable.hpp:280
All the config data will be stored and extracted from this class.
Definition ConfigDefinition.hpp:54
static std::chrono::milliseconds toMilliseconds(float value)
Method to convert a float seconds value to milliseconds.
Definition ConfigDefinition.cpp:123
T get(std::string_view fullKey) const
Returns the specified value of given string if value exists.
Definition ConfigDefinition.hpp:108
The interface of a denial of service guard.
Definition DOSGuardInterface.hpp:46
This namespace contains all the RPC logic and handlers.
Definition AMMHelpers.cpp:37
bool isAdminCmd(std::string const &method, boost::json::object const &request)
Check whether a request requires administrative privileges on rippled side.
Definition RPCHelpers.cpp:1517
Result type used to return responses or error statuses to the Webserver subsystem.
Definition Types.hpp:129
A status returned from any RPC handler.
Definition Errors.hpp:82
A data structure to store a cache entry with its timestamp.
Definition ResponseExpirationCache.hpp:50
A data structure to represent errors that can occur during an update of the cache.
Definition ResponseExpirationCache.hpp:58
Context that is used by the Webserver to pass around information about an incoming request.
Definition Context.hpp:40