rippled
Loading...
Searching...
No Matches
GRPCServer.h
1#pragma once
2
3#include <xrpld/app/main/Application.h>
4#include <xrpld/rpc/Context.h>
5#include <xrpld/rpc/GRPCHandlers.h>
6#include <xrpld/rpc/Role.h>
7#include <xrpld/rpc/detail/Handler.h>
8
9#include <xrpl/core/JobQueue.h>
10#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
11#include <xrpl/resource/Charge.h>
12#include <xrpl/server/InfoSub.h>
13
14#include <grpcpp/grpcpp.h>
15
16namespace xrpl {
17
18// Interface that CallData implements
20{
21public:
22 virtual ~Processor() = default;
23
24 Processor() = default;
25
26 Processor(Processor const&) = delete;
27
29 operator=(Processor const&) = delete;
30
31 // process a request that has arrived. Can only be called once per instance
32 virtual void
33 process() = 0;
34
35 // create a new instance of this CallData object, with the same type
36 //(same template parameters) as original. This is called when a CallData
37 // object starts processing a request. Creating a new instance allows the
38 // server to handle additional requests while the first is being processed
40 clone() = 0;
41
42 // true if this object has finished processing the request. Object will be
43 // deleted once this function returns true
44 virtual bool
46};
47
48class GRPCServerImpl final
49{
50private:
51 // CompletionQueue returns events that have occurred, or events that have
52 // been cancelled
54
56
57 // The gRPC service defined by the .proto files
58 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_;
59
61
63
66
68
70
71 // typedef for function to bind a listener
72 // This is always of the form:
73 // org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
74 template <class Request, class Response>
76 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService&,
77 grpc::ServerContext*,
78 Request*,
79 grpc::ServerAsyncResponseWriter<Response>*,
80 grpc::CompletionQueue*,
81 grpc::ServerCompletionQueue*,
82 void*)>;
83
84 // typedef for actual handler (that populates a response)
85 // handlers are defined in rpc/GRPCHandlers.h
86 template <class Request, class Response>
88 // This implementation is currently limited to v1 of the API
89 static unsigned constexpr apiVersion = 1;
90
91 template <class Request, class Response>
93 grpc::Status(org::xrpl::rpc::v1::XRPLedgerAPIService::Stub*, grpc::ClientContext*, Request, Response*)>;
94
95public:
96 explicit GRPCServerImpl(Application& app);
97
99
101 operator=(GRPCServerImpl const&) = delete;
102
103 void
104 shutdown();
105
106 // setup the server and listeners
107 // returns true if server started successfully
108 bool
109 start();
110
111 // the main event loop
112 void
113 handleRpcs();
114
115 // Create a CallData object for each RPC. Return created objects in vector
118
119 // Obtaining actually binded endpoint (if port 0 was used for server setup).
120 boost::asio::ip::tcp::endpoint
121 getEndpoint() const;
122
123private:
124 // Class encompassing the state and logic needed to serve a request.
125 template <class Request, class Response>
126 class CallData : public Processor, public std::enable_shared_from_this<CallData<Request, Response>>
127 {
128 private:
129 // The means of communication with the gRPC runtime for an asynchronous
130 // server.
131 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service_;
132
133 // The producer-consumer queue for asynchronous server notifications.
134 grpc::ServerCompletionQueue& cq_;
135
136 // Context for the rpc, allowing to tweak aspects of it such as the use
137 // of compression, authentication, as well as to send metadata back to
138 // the client.
139 grpc::ServerContext ctx_;
140
141 // true if finished processing request
142 // Note, this variable does not need to be atomic, since it is
143 // currently only accessed from one thread. However, isFinished(),
144 // which returns the value of this variable, is public facing. In the
145 // interest of avoiding future concurrency bugs, we make it atomic.
147
149
150 // What we get from the client.
151 Request request_;
152
153 // The means to get back to the client.
154 grpc::ServerAsyncResponseWriter<Response> responder_;
155
156 // Function that creates a listener for specific request type
158
159 // Function that processes a request
161
162 // Function to call to forward to another server
164
165 // Condition required for this RPC
167
168 // Load type for this RPC
170
172
173 public:
174 virtual ~CallData() = default;
175
176 // Take in the "service" instance (in this case representing an
177 // asynchronous server) and the completion queue "cq" used for
178 // asynchronous communication with the gRPC runtime.
179 explicit CallData(
180 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
181 grpc::ServerCompletionQueue& cq,
182 Application& app,
186 RPC::Condition requiredCondition,
187 Resource::Charge loadType,
188 std::vector<boost::asio::ip::address> const& secureGatewayIPs);
189
190 CallData(CallData const&) = delete;
191
192 CallData&
193 operator=(CallData const&) = delete;
194
195 virtual void
196 process() override;
197
198 virtual bool
199 isFinished() override;
200
202 clone() override;
203
204 private:
205 // process the request. Called inside the coroutine passed to JobQueue
206 void
208
209 // return load type of this RPC
211 getLoadType();
212
213 // return the Role used for this RPC
214 Role
215 getRole(bool isUnlimited);
216
217 // register endpoint with ResourceManager and return usage
219 getUsage();
220
221 // Returns the ip of the client
222 // Empty optional if there was an error decoding the client ip
225
226 // Returns the endpoint of the client.
227 // Empty optional if there was an error decoding the client
228 // endpoint
231
232 // If the request was proxied through
233 // another rippled node, returns the ip of the originating client.
234 // Empty optional if request was not proxied or there was an error
235 // decoding the client ip
238
239 // If the request was proxied through
240 // another rippled node, returns the endpoint of the originating client.
241 // Empty optional if request was not proxied or there was an error
242 // decoding the client endpoint
245
246 // Returns the user specified in the request. Empty optional if no user
247 // was specified
249 getUser();
250
251 // Sets is_unlimited in response to value of clientIsUnlimited
252 // Does nothing if is_unlimited is not a field of the response
253 void
254 setIsUnlimited(Response& response, bool isUnlimited);
255
256 // True if the client is exempt from resource controls
257 bool
259
260 // True if the request was proxied through another rippled node prior
261 // to arriving here
262 bool
264
265 // forward request to a p2p node
266 void
268
269 }; // CallData
270
271}; // GRPCServerImpl
272
274{
275public:
276 explicit GRPCServer(Application& app) : impl_(app)
277 {
278 }
279
280 GRPCServer(GRPCServer const&) = delete;
281
283 operator=(GRPCServer const&) = delete;
284
285 bool
286 start();
287
288 void
289 stop();
290
291 ~GRPCServer();
292
293 boost::asio::ip::tcp::endpoint
294 getEndpoint() const;
295
296private:
299 bool running_ = false;
300};
301} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:40
Forward< Request, Response > forward_
Definition GRPCServer.h:163
Resource::Consumer getUsage()
CallData(CallData const &)=delete
std::vector< boost::asio::ip::address > const & secureGatewayIPs_
Definition GRPCServer.h:171
std::optional< boost::asio::ip::address > getClientIpAddress()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition GRPCServer.h:131
virtual bool isFinished() override
Handler< Request, Response > handler_
Definition GRPCServer.h:160
RPC::Condition requiredCondition_
Definition GRPCServer.h:166
std::optional< std::string > getUser()
BindListener< Request, Response > bindListener_
Definition GRPCServer.h:157
Role getRole(bool isUnlimited)
CallData & operator=(CallData const &)=delete
Resource::Charge getLoadType()
std::optional< boost::asio::ip::address > getProxiedClientIpAddress()
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
virtual void process() override
grpc::ServerAsyncResponseWriter< Response > responder_
Definition GRPCServer.h:154
std::shared_ptr< Processor > clone() override
void setIsUnlimited(Response &response, bool isUnlimited)
grpc::ServerCompletionQueue & cq_
Definition GRPCServer.h:134
void forwardToP2p(RPC::GRPCContext< Request > &context)
std::optional< boost::asio::ip::tcp::endpoint > getProxiedClientEndpoint()
grpc::ServerContext ctx_
Definition GRPCServer.h:139
static unsigned constexpr apiVersion
Definition GRPCServer.h:89
std::uint16_t serverPort_
Definition GRPCServer.h:65
GRPCServerImpl & operator=(GRPCServerImpl const &)=delete
Application & app_
Definition GRPCServer.h:62
GRPCServerImpl(GRPCServerImpl const &)=delete
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition GRPCServer.h:67
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition GRPCServer.h:53
std::vector< std::shared_ptr< Processor > > setupListeners()
beast::Journal journal_
Definition GRPCServer.h:69
std::unique_ptr< grpc::Server > server_
Definition GRPCServer.h:60
boost::asio::ip::tcp::endpoint getEndpoint() const
std::vector< std::shared_ptr< Processor > > requests_
Definition GRPCServer.h:55
std::string serverAddress_
Definition GRPCServer.h:64
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition GRPCServer.h:58
std::thread thread_
Definition GRPCServer.h:298
boost::asio::ip::tcp::endpoint getEndpoint() const
GRPCServerImpl impl_
Definition GRPCServer.h:297
GRPCServer & operator=(GRPCServer const &)=delete
GRPCServer(Application &app)
Definition GRPCServer.h:276
GRPCServer(GRPCServer const &)=delete
Processor(Processor const &)=delete
Processor()=default
virtual ~Processor()=default
virtual bool isFinished()=0
Processor & operator=(Processor const &)=delete
virtual void process()=0
virtual std::shared_ptr< Processor > clone()=0
A consumption charge.
Definition Charge.h:10
An endpoint that consumes resources.
Definition Consumer.h:16
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
Role
Indicates the level of administrative permission to grant.
Definition Role.h:24
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:96