rippled
Loading...
Searching...
No Matches
GRPCServer.h
1#pragma once
2
3#include <xrpld/app/main/Application.h>
4#include <xrpld/rpc/Context.h>
5#include <xrpld/rpc/GRPCHandlers.h>
6#include <xrpld/rpc/Role.h>
7#include <xrpld/rpc/detail/Handler.h>
8
9#include <xrpl/core/JobQueue.h>
10#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
11#include <xrpl/resource/Charge.h>
12#include <xrpl/server/InfoSub.h>
13
14#include <grpcpp/grpcpp.h>
15
16namespace xrpl {
17
18// Interface that CallData implements
20{
21public:
22 virtual ~Processor() = default;
23
24 Processor() = default;
25
26 Processor(Processor const&) = delete;
27
29 operator=(Processor const&) = delete;
30
31 // process a request that has arrived. Can only be called once per instance
32 virtual void
33 process() = 0;
34
35 // create a new instance of this CallData object, with the same type
36 //(same template parameters) as original. This is called when a CallData
37 // object starts processing a request. Creating a new instance allows the
38 // server to handle additional requests while the first is being processed
40 clone() = 0;
41
42 // true if this object has finished processing the request. Object will be
43 // deleted once this function returns true
44 virtual bool
46};
47
48class GRPCServerImpl final
49{
50private:
51 // CompletionQueue returns events that have occurred, or events that have
52 // been cancelled
54
56
57 // The gRPC service defined by the .proto files
58 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_;
59
61
63
66
68
70
71 // typedef for function to bind a listener
72 // This is always of the form:
73 // org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
74 template <class Request, class Response>
76 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService&,
77 grpc::ServerContext*,
78 Request*,
79 grpc::ServerAsyncResponseWriter<Response>*,
80 grpc::CompletionQueue*,
81 grpc::ServerCompletionQueue*,
82 void*)>;
83
84 // typedef for actual handler (that populates a response)
85 // handlers are defined in rpc/GRPCHandlers.h
86 template <class Request, class Response>
88 // This implementation is currently limited to v1 of the API
89 static unsigned constexpr apiVersion = 1;
90
91 template <class Request, class Response>
92 using Forward = std::function<grpc::Status(
93 org::xrpl::rpc::v1::XRPLedgerAPIService::Stub*,
94 grpc::ClientContext*,
95 Request,
96 Response*)>;
97
98public:
99 explicit GRPCServerImpl(Application& app);
100
102
104 operator=(GRPCServerImpl const&) = delete;
105
106 void
107 shutdown();
108
109 // setup the server and listeners
110 // returns true if server started successfully
111 bool
112 start();
113
114 // the main event loop
115 void
116 handleRpcs();
117
118 // Create a CallData object for each RPC. Return created objects in vector
121
122 // Obtaining actually binded endpoint (if port 0 was used for server setup).
123 boost::asio::ip::tcp::endpoint
124 getEndpoint() const;
125
126private:
127 // Class encompassing the state and logic needed to serve a request.
128 template <class Request, class Response>
129 class CallData : public Processor,
130 public std::enable_shared_from_this<CallData<Request, Response>>
131 {
132 private:
133 // The means of communication with the gRPC runtime for an asynchronous
134 // server.
135 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service_;
136
137 // The producer-consumer queue for asynchronous server notifications.
138 grpc::ServerCompletionQueue& cq_;
139
140 // Context for the rpc, allowing to tweak aspects of it such as the use
141 // of compression, authentication, as well as to send metadata back to
142 // the client.
143 grpc::ServerContext ctx_;
144
145 // true if finished processing request
146 // Note, this variable does not need to be atomic, since it is
147 // currently only accessed from one thread. However, isFinished(),
148 // which returns the value of this variable, is public facing. In the
149 // interest of avoiding future concurrency bugs, we make it atomic.
151
153
154 // What we get from the client.
155 Request request_;
156
157 // The means to get back to the client.
158 grpc::ServerAsyncResponseWriter<Response> responder_;
159
160 // Function that creates a listener for specific request type
162
163 // Function that processes a request
165
166 // Function to call to forward to another server
168
169 // Condition required for this RPC
171
172 // Load type for this RPC
174
176
177 public:
178 virtual ~CallData() = default;
179
180 // Take in the "service" instance (in this case representing an
181 // asynchronous server) and the completion queue "cq" used for
182 // asynchronous communication with the gRPC runtime.
183 explicit CallData(
184 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
185 grpc::ServerCompletionQueue& cq,
186 Application& app,
190 RPC::Condition requiredCondition,
191 Resource::Charge loadType,
192 std::vector<boost::asio::ip::address> const& secureGatewayIPs);
193
194 CallData(CallData const&) = delete;
195
196 CallData&
197 operator=(CallData const&) = delete;
198
199 virtual void
200 process() override;
201
202 virtual bool
203 isFinished() override;
204
206 clone() override;
207
208 private:
209 // process the request. Called inside the coroutine passed to JobQueue
210 void
212
213 // return load type of this RPC
215 getLoadType();
216
217 // return the Role used for this RPC
218 Role
219 getRole(bool isUnlimited);
220
221 // register endpoint with ResourceManager and return usage
223 getUsage();
224
225 // Returns the ip of the client
226 // Empty optional if there was an error decoding the client ip
229
230 // Returns the endpoint of the client.
231 // Empty optional if there was an error decoding the client
232 // endpoint
235
236 // If the request was proxied through
237 // another rippled node, returns the ip of the originating client.
238 // Empty optional if request was not proxied or there was an error
239 // decoding the client ip
242
243 // If the request was proxied through
244 // another rippled node, returns the endpoint of the originating client.
245 // Empty optional if request was not proxied or there was an error
246 // decoding the client endpoint
249
250 // Returns the user specified in the request. Empty optional if no user
251 // was specified
253 getUser();
254
255 // Sets is_unlimited in response to value of clientIsUnlimited
256 // Does nothing if is_unlimited is not a field of the response
257 void
258 setIsUnlimited(Response& response, bool isUnlimited);
259
260 // True if the client is exempt from resource controls
261 bool
263
264 // True if the request was proxied through another rippled node prior
265 // to arriving here
266 bool
268
269 // forward request to a p2p node
270 void
272
273 }; // CallData
274
275}; // GRPCServerImpl
276
278{
279public:
280 explicit GRPCServer(Application& app) : impl_(app)
281 {
282 }
283
284 GRPCServer(GRPCServer const&) = delete;
285
287 operator=(GRPCServer const&) = delete;
288
289 bool
290 start();
291
292 void
293 stop();
294
295 ~GRPCServer();
296
297 boost::asio::ip::tcp::endpoint
298 getEndpoint() const;
299
300private:
303 bool running_ = false;
304};
305} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:40
Forward< Request, Response > forward_
Definition GRPCServer.h:167
Resource::Consumer getUsage()
CallData(CallData const &)=delete
std::vector< boost::asio::ip::address > const & secureGatewayIPs_
Definition GRPCServer.h:175
std::optional< boost::asio::ip::address > getClientIpAddress()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition GRPCServer.h:135
virtual bool isFinished() override
Handler< Request, Response > handler_
Definition GRPCServer.h:164
RPC::Condition requiredCondition_
Definition GRPCServer.h:170
std::optional< std::string > getUser()
BindListener< Request, Response > bindListener_
Definition GRPCServer.h:161
Role getRole(bool isUnlimited)
CallData & operator=(CallData const &)=delete
Resource::Charge getLoadType()
std::optional< boost::asio::ip::address > getProxiedClientIpAddress()
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
virtual void process() override
grpc::ServerAsyncResponseWriter< Response > responder_
Definition GRPCServer.h:158
std::shared_ptr< Processor > clone() override
void setIsUnlimited(Response &response, bool isUnlimited)
grpc::ServerCompletionQueue & cq_
Definition GRPCServer.h:138
void forwardToP2p(RPC::GRPCContext< Request > &context)
std::optional< boost::asio::ip::tcp::endpoint > getProxiedClientEndpoint()
grpc::ServerContext ctx_
Definition GRPCServer.h:143
static unsigned constexpr apiVersion
Definition GRPCServer.h:89
std::uint16_t serverPort_
Definition GRPCServer.h:65
GRPCServerImpl & operator=(GRPCServerImpl const &)=delete
Application & app_
Definition GRPCServer.h:62
GRPCServerImpl(GRPCServerImpl const &)=delete
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition GRPCServer.h:67
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition GRPCServer.h:53
std::vector< std::shared_ptr< Processor > > setupListeners()
beast::Journal journal_
Definition GRPCServer.h:69
std::unique_ptr< grpc::Server > server_
Definition GRPCServer.h:60
boost::asio::ip::tcp::endpoint getEndpoint() const
std::vector< std::shared_ptr< Processor > > requests_
Definition GRPCServer.h:55
std::string serverAddress_
Definition GRPCServer.h:64
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition GRPCServer.h:58
std::thread thread_
Definition GRPCServer.h:302
boost::asio::ip::tcp::endpoint getEndpoint() const
GRPCServerImpl impl_
Definition GRPCServer.h:301
GRPCServer & operator=(GRPCServer const &)=delete
GRPCServer(Application &app)
Definition GRPCServer.h:280
GRPCServer(GRPCServer const &)=delete
Processor(Processor const &)=delete
Processor()=default
virtual ~Processor()=default
virtual bool isFinished()=0
Processor & operator=(Processor const &)=delete
virtual void process()=0
virtual std::shared_ptr< Processor > clone()=0
A consumption charge.
Definition Charge.h:10
An endpoint that consumes resources.
Definition Consumer.h:16
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
Role
Indicates the level of administrative permission to grant.
Definition Role.h:24
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:98