rippled
Loading...
Searching...
No Matches
GRPCServer.h
1#ifndef XRPL_CORE_GRPCSERVER_H_INCLUDED
2#define XRPL_CORE_GRPCSERVER_H_INCLUDED
3
4#include <xrpld/app/main/Application.h>
5#include <xrpld/core/JobQueue.h>
6#include <xrpld/rpc/Context.h>
7#include <xrpld/rpc/GRPCHandlers.h>
8#include <xrpld/rpc/InfoSub.h>
9#include <xrpld/rpc/Role.h>
10#include <xrpld/rpc/detail/Handler.h>
11
12#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
13#include <xrpl/resource/Charge.h>
14
15#include <grpcpp/grpcpp.h>
16
17namespace ripple {
18
19// Interface that CallData implements
21{
22public:
23 virtual ~Processor() = default;
24
25 Processor() = default;
26
27 Processor(Processor const&) = delete;
28
30 operator=(Processor const&) = delete;
31
32 // process a request that has arrived. Can only be called once per instance
33 virtual void
34 process() = 0;
35
36 // create a new instance of this CallData object, with the same type
37 //(same template parameters) as original. This is called when a CallData
38 // object starts processing a request. Creating a new instance allows the
39 // server to handle additional requests while the first is being processed
41 clone() = 0;
42
43 // true if this object has finished processing the request. Object will be
44 // deleted once this function returns true
45 virtual bool
47};
48
49class GRPCServerImpl final
50{
51private:
52 // CompletionQueue returns events that have occurred, or events that have
53 // been cancelled
55
57
58 // The gRPC service defined by the .proto files
59 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_;
60
62
64
67
69
71
72 // typedef for function to bind a listener
73 // This is always of the form:
74 // org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
75 template <class Request, class Response>
77 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService&,
78 grpc::ServerContext*,
79 Request*,
80 grpc::ServerAsyncResponseWriter<Response>*,
81 grpc::CompletionQueue*,
82 grpc::ServerCompletionQueue*,
83 void*)>;
84
85 // typedef for actual handler (that populates a response)
86 // handlers are defined in rpc/GRPCHandlers.h
87 template <class Request, class Response>
90 // This implementation is currently limited to v1 of the API
91 static unsigned constexpr apiVersion = 1;
92
93 template <class Request, class Response>
94 using Forward = std::function<grpc::Status(
95 org::xrpl::rpc::v1::XRPLedgerAPIService::Stub*,
96 grpc::ClientContext*,
97 Request,
98 Response*)>;
99
100public:
101 explicit GRPCServerImpl(Application& app);
102
104
106 operator=(GRPCServerImpl const&) = delete;
107
108 void
109 shutdown();
110
111 // setup the server and listeners
112 // returns true if server started successfully
113 bool
114 start();
115
116 // the main event loop
117 void
118 handleRpcs();
119
120 // Create a CallData object for each RPC. Return created objects in vector
123
124 // Obtaining actually binded endpoint (if port 0 was used for server setup).
125 boost::asio::ip::tcp::endpoint
126 getEndpoint() const;
127
128private:
129 // Class encompasing the state and logic needed to serve a request.
130 template <class Request, class Response>
132 : public Processor,
133 public std::enable_shared_from_this<CallData<Request, Response>>
134 {
135 private:
136 // The means of communication with the gRPC runtime for an asynchronous
137 // server.
138 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service_;
139
140 // The producer-consumer queue for asynchronous server notifications.
141 grpc::ServerCompletionQueue& cq_;
142
143 // Context for the rpc, allowing to tweak aspects of it such as the use
144 // of compression, authentication, as well as to send metadata back to
145 // the client.
146 grpc::ServerContext ctx_;
147
148 // true if finished processing request
149 // Note, this variable does not need to be atomic, since it is
150 // currently only accessed from one thread. However, isFinished(),
151 // which returns the value of this variable, is public facing. In the
152 // interest of avoiding future concurrency bugs, we make it atomic.
154
156
157 // What we get from the client.
158 Request request_;
159
160 // The means to get back to the client.
161 grpc::ServerAsyncResponseWriter<Response> responder_;
162
163 // Function that creates a listener for specific request type
165
166 // Function that processes a request
168
169 // Function to call to forward to another server
171
172 // Condition required for this RPC
174
175 // Load type for this RPC
177
179
180 public:
181 virtual ~CallData() = default;
182
183 // Take in the "service" instance (in this case representing an
184 // asynchronous server) and the completion queue "cq" used for
185 // asynchronous communication with the gRPC runtime.
186 explicit CallData(
187 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
188 grpc::ServerCompletionQueue& cq,
189 Application& app,
193 RPC::Condition requiredCondition,
194 Resource::Charge loadType,
195 std::vector<boost::asio::ip::address> const& secureGatewayIPs);
196
197 CallData(CallData const&) = delete;
198
199 CallData&
200 operator=(CallData const&) = delete;
201
202 virtual void
203 process() override;
204
205 virtual bool
206 isFinished() override;
207
209 clone() override;
210
211 private:
212 // process the request. Called inside the coroutine passed to JobQueue
213 void
215
216 // return load type of this RPC
218 getLoadType();
219
220 // return the Role used for this RPC
221 Role
222 getRole(bool isUnlimited);
223
224 // register endpoint with ResourceManager and return usage
226 getUsage();
227
228 // Returns the ip of the client
229 // Empty optional if there was an error decoding the client ip
232
233 // Returns the endpoint of the client.
234 // Empty optional if there was an error decoding the client
235 // endpoint
238
239 // If the request was proxied through
240 // another rippled node, returns the ip of the originating client.
241 // Empty optional if request was not proxied or there was an error
242 // decoding the client ip
245
246 // If the request was proxied through
247 // another rippled node, returns the endpoint of the originating client.
248 // Empty optional if request was not proxied or there was an error
249 // decoding the client endpoint
252
253 // Returns the user specified in the request. Empty optional if no user
254 // was specified
256 getUser();
257
258 // Sets is_unlimited in response to value of clientIsUnlimited
259 // Does nothing if is_unlimited is not a field of the response
260 void
261 setIsUnlimited(Response& response, bool isUnlimited);
262
263 // True if the client is exempt from resource controls
264 bool
266
267 // True if the request was proxied through another rippled node prior
268 // to arriving here
269 bool
271
272 // forward request to a p2p node
273 void
275
276 }; // CallData
277
278}; // GRPCServerImpl
279
281{
282public:
283 explicit GRPCServer(Application& app) : impl_(app)
284 {
285 }
286
287 GRPCServer(GRPCServer const&) = delete;
288
290 operator=(GRPCServer const&) = delete;
291
292 bool
293 start();
294
295 void
296 stop();
297
298 ~GRPCServer();
299
300 boost::asio::ip::tcp::endpoint
301 getEndpoint() const;
302
303private:
306 bool running_ = false;
307};
308} // namespace ripple
309#endif
A generic endpoint for log messages.
Definition Journal.h:41
CallData(CallData const &)=delete
Forward< Request, Response > forward_
Definition GRPCServer.h:170
Resource::Consumer getUsage()
std::optional< std::string > getUser()
grpc::ServerCompletionQueue & cq_
Definition GRPCServer.h:141
Handler< Request, Response > handler_
Definition GRPCServer.h:167
CallData & operator=(CallData const &)=delete
void setIsUnlimited(Response &response, bool isUnlimited)
std::optional< boost::asio::ip::address > getClientIpAddress()
std::optional< boost::asio::ip::tcp::endpoint > getProxiedClientEndpoint()
BindListener< Request, Response > bindListener_
Definition GRPCServer.h:164
grpc::ServerAsyncResponseWriter< Response > responder_
Definition GRPCServer.h:161
Role getRole(bool isUnlimited)
std::vector< boost::asio::ip::address > const & secureGatewayIPs_
Definition GRPCServer.h:178
std::shared_ptr< Processor > clone() override
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
std::optional< boost::asio::ip::address > getProxiedClientIpAddress()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition GRPCServer.h:138
virtual void process() override
virtual bool isFinished() override
void forwardToP2p(RPC::GRPCContext< Request > &context)
std::string serverAddress_
Definition GRPCServer.h:65
Application & app_
Definition GRPCServer.h:63
std::vector< std::shared_ptr< Processor > > setupListeners()
std::unique_ptr< grpc::Server > server_
Definition GRPCServer.h:61
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition GRPCServer.h:54
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition GRPCServer.h:59
std::vector< std::shared_ptr< Processor > > requests_
Definition GRPCServer.h:56
GRPCServerImpl(GRPCServerImpl const &)=delete
GRPCServerImpl & operator=(GRPCServerImpl const &)=delete
static unsigned constexpr apiVersion
Definition GRPCServer.h:91
boost::asio::ip::tcp::endpoint getEndpoint() const
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition GRPCServer.h:68
std::uint16_t serverPort_
Definition GRPCServer.h:66
beast::Journal journal_
Definition GRPCServer.h:70
GRPCServer(Application &app)
Definition GRPCServer.h:283
GRPCServer & operator=(GRPCServer const &)=delete
std::thread thread_
Definition GRPCServer.h:305
boost::asio::ip::tcp::endpoint getEndpoint() const
GRPCServerImpl impl_
Definition GRPCServer.h:304
GRPCServer(GRPCServer const &)=delete
virtual void process()=0
virtual bool isFinished()=0
Processor()=default
virtual std::shared_ptr< Processor > clone()=0
Processor(Processor const &)=delete
Processor & operator=(Processor const &)=delete
virtual ~Processor()=default
A consumption charge.
Definition Charge.h:11
An endpoint that consumes resources.
Definition Consumer.h:17
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:106
Role
Indicates the level of administrative permission to grant.
Definition Role.h:25