rippled
Loading...
Searching...
No Matches
GRPCServer.cpp
1#include <xrpld/app/main/GRPCServer.h>
2#include <xrpld/core/ConfigSections.h>
3
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/net/IPAddressConversion.h>
6#include <xrpl/resource/Fees.h>
7
8namespace ripple {
9
10namespace {
11
12// helper function. converts string to endpoint. handles ipv4 and ipv6, with or
13// without port, with or without prepended scheme
15getEndpoint(std::string const& peer)
16{
17 try
18 {
19 std::size_t first = peer.find_first_of(":");
20 std::size_t last = peer.find_last_of(":");
21 std::string peerClean(peer);
22 if (first != last)
23 {
24 peerClean = peer.substr(first + 1);
25 }
26
29 if (endpoint)
30 return beast::IP::to_asio_endpoint(endpoint.value());
31 }
32 catch (std::exception const&)
33 {
34 }
35 return {};
36}
37
38} // namespace
39
40template <class Request, class Response>
42 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
43 grpc::ServerCompletionQueue& cq,
44 Application& app,
48 RPC::Condition requiredCondition,
49 Resource::Charge loadType,
50 std::vector<boost::asio::ip::address> const& secureGatewayIPs)
51 : service_(service)
52 , cq_(cq)
53 , finished_(false)
54 , app_(app)
55 , responder_(&ctx_)
56 , bindListener_(std::move(bindListener))
57 , handler_(std::move(handler))
58 , forward_(std::move(forward))
59 , requiredCondition_(std::move(requiredCondition))
60 , loadType_(std::move(loadType))
61 , secureGatewayIPs_(secureGatewayIPs)
62{
63 // Bind a listener. When a request is received, "this" will be returned
64 // from CompletionQueue::Next
66}
67
68template <class Request, class Response>
71{
74 cq_,
75 app_,
76 bindListener_,
77 handler_,
78 forward_,
79 requiredCondition_,
80 loadType_,
82}
83
84template <class Request, class Response>
85void
87{
88 // sanity check
89 BOOST_ASSERT(!finished_);
90
92 this->shared_from_this();
93
94 // Need to set finished to true before processing the response,
95 // because as soon as the response is posted to the completion
96 // queue (via responder_.Finish(...) or responder_.FinishWithError(...)),
97 // the CallData object is returned as a tag in handleRpcs().
98 // handleRpcs() checks the finished variable, and if true, destroys
99 // the object. Setting finished to true before calling process
100 // ensures that finished is always true when this CallData object
101 // is returned as a tag in handleRpcs(), after sending the response
102 finished_ = true;
103 auto coro = app_.getJobQueue().postCoro(
105 "gRPC-Client",
106 [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
107 thisShared->process(coro);
108 });
109
110 // If coro is null, then the JobQueue has already been shutdown
111 if (!coro)
112 {
113 grpc::Status status{
114 grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
115 responder_.FinishWithError(status, this);
116 }
117}
118
119template <class Request, class Response>
120void
123{
124 try
125 {
126 auto usage = getUsage();
127 bool isUnlimited = clientIsUnlimited();
128 if (!isUnlimited && usage.disconnect(app_.journal("gRPCServer")))
129 {
130 grpc::Status status{
131 grpc::StatusCode::RESOURCE_EXHAUSTED,
132 "usage balance exceeds threshold"};
133 responder_.FinishWithError(status, this);
134 }
135 else
136 {
137 auto loadType = getLoadType();
138 usage.charge(loadType);
139 auto role = getRole(isUnlimited);
140
141 {
142 std::stringstream toLog;
143 toLog << "role = " << (int)role;
144
145 toLog << " address = ";
146 if (auto clientIp = getClientIpAddress())
147 toLog << clientIp.value();
148
149 toLog << " user = ";
150 if (auto user = getUser())
151 toLog << user.value();
152 toLog << " isUnlimited = " << isUnlimited;
153
154 JLOG(app_.journal("GRPCServer::Calldata").debug())
155 << toLog.str();
156 }
157
159 {app_.journal("gRPCServer"),
160 app_,
161 loadType,
162 app_.getOPs(),
164 usage,
165 role,
166 coro,
168 apiVersion},
169 request_};
170
171 // Make sure we can currently handle the rpc
172 error_code_i conditionMetRes =
173 RPC::conditionMet(requiredCondition_, context);
174
175 if (conditionMetRes != rpcSUCCESS)
176 {
177 RPC::ErrorInfo errorInfo = RPC::get_error_info(conditionMetRes);
178 grpc::Status status{
179 grpc::StatusCode::FAILED_PRECONDITION,
180 errorInfo.message.c_str()};
181 responder_.FinishWithError(status, this);
182 }
183 else
184 {
185 std::pair<Response, grpc::Status> result = handler_(context);
186 setIsUnlimited(result.first, isUnlimited);
187 responder_.Finish(result.first, result.second, this);
188 }
189 }
190 }
191 catch (std::exception const& ex)
192 {
193 grpc::Status status{grpc::StatusCode::INTERNAL, ex.what()};
194 responder_.FinishWithError(status, this);
195 }
196}
197
198template <class Request, class Response>
199bool
204
205template <class Request, class Response>
211
212template <class Request, class Response>
213Role
221
222template <class Request, class Response>
225{
226 if (auto descriptor = Request::GetDescriptor()->FindFieldByName("user"))
227 {
228 std::string user =
229 Request::GetReflection()->GetString(request_, descriptor);
230 if (!user.empty())
231 {
232 return user;
233 }
234 }
235 return {};
236}
237
238template <class Request, class Response>
241{
242 auto endpoint = getClientEndpoint();
243 if (endpoint)
244 return endpoint->address();
245 return {};
246}
247
248template <class Request, class Response>
251{
252 return ripple::getEndpoint(ctx_.peer());
253}
254
255template <class Request, class Response>
256bool
258{
259 if (!getUser())
260 return false;
261 auto clientIp = getClientIpAddress();
262 if (clientIp)
263 {
264 for (auto& ip : secureGatewayIPs_)
265 {
266 if (ip == clientIp)
267 return true;
268 }
269 }
270 return false;
271}
272
273template <class Request, class Response>
274void
276 Response& response,
277 bool isUnlimited)
278{
279 if (isUnlimited)
280 {
281 if (auto descriptor =
282 Response::GetDescriptor()->FindFieldByName("is_unlimited"))
283 {
284 Response::GetReflection()->SetBool(&response, descriptor, true);
285 }
286 }
287}
288
289template <class Request, class Response>
292{
293 auto endpoint = getClientEndpoint();
294 if (endpoint)
296 beast::IP::from_asio(endpoint.value()));
297 Throw<std::runtime_error>("Failed to get client endpoint");
298}
299
301 : app_(app), journal_(app_.journal("gRPC Server"))
302{
303 // if present, get endpoint from config
304 if (app_.config().exists(SECTION_PORT_GRPC))
305 {
306 Section const& section = app_.config().section(SECTION_PORT_GRPC);
307
308 auto const optIp = section.get("ip");
309 if (!optIp)
310 return;
311
312 auto const optPort = section.get("port");
313 if (!optPort)
314 return;
315 try
316 {
317 boost::asio::ip::tcp::endpoint endpoint(
318 boost::asio::ip::make_address(*optIp), std::stoi(*optPort));
319
321 ss << endpoint;
322 serverAddress_ = ss.str();
323 }
324 catch (std::exception const&)
325 {
326 JLOG(journal_.error()) << "Error setting grpc server address";
327 Throw<std::runtime_error>("Error setting grpc server address");
328 }
329
330 auto const optSecureGateway = section.get("secure_gateway");
331 if (optSecureGateway)
332 {
333 try
334 {
335 std::stringstream ss{*optSecureGateway};
336 std::string ip;
337 while (std::getline(ss, ip, ','))
338 {
339 boost::algorithm::trim(ip);
340 auto const addr = boost::asio::ip::make_address(ip);
341
342 if (addr.is_unspecified())
343 {
344 JLOG(journal_.error())
345 << "Can't pass unspecified IP in "
346 << "secure_gateway section of port_grpc";
347 Throw<std::runtime_error>(
348 "Unspecified IP in secure_gateway section");
349 }
350
352 }
353 }
354 catch (std::exception const&)
355 {
356 JLOG(journal_.error())
357 << "Error parsing secure gateway IPs for grpc server";
358 Throw<std::runtime_error>(
359 "Error parsing secure_gateway section");
360 }
361 }
362 }
363}
364
365void
367{
368 JLOG(journal_.debug()) << "Shutting down";
369
370 // The below call cancels all "listeners" (CallData objects that are waiting
371 // for a request, as opposed to processing a request), and blocks until all
372 // requests being processed are completed. CallData objects in the midst of
373 // processing requests need to actually send data back to the client, via
374 // responder_.Finish(...) or responder_.FinishWithError(...), for this call
375 // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok
376 // set to false
377 server_->Shutdown();
378 JLOG(journal_.debug()) << "Server has been shutdown";
379
380 // Always shutdown the completion queue after the server. This call allows
381 // cq_.Next() to return false, once all events posted to the completion
382 // queue have been processed. See handleRpcs() for more details.
383 cq_->Shutdown();
384 JLOG(journal_.debug()) << "Completion Queue has been shutdown";
385}
386
387void
389{
390 // This collection should really be an unordered_set. However, to delete
391 // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below
392 // while loop) sets the tag to a raw pointer.
394
395 auto erase = [&requests](Processor* ptr) {
396 auto it = std::find_if(
397 requests.begin(),
398 requests.end(),
399 [ptr](std::shared_ptr<Processor>& sPtr) {
400 return sPtr.get() == ptr;
401 });
402 BOOST_ASSERT(it != requests.end());
403 it->swap(requests.back());
404 requests.pop_back();
405 };
406
407 void* tag; // uniquely identifies a request.
408 bool ok;
409 // Block waiting to read the next event from the completion queue. The
410 // event is uniquely identified by its tag, which in this case is the
411 // memory address of a CallData instance.
412 // The return value of Next should always be checked. This return value
413 // tells us whether there is any kind of event or cq_ is shutting down.
414 // When cq_.Next(...) returns false, all work has been completed and the
415 // loop can exit. When the server is shutdown, each CallData object that is
416 // listening for a request is forceably cancelled, and is returned by
417 // cq_->Next() with ok set to false. Then, each CallData object processing
418 // a request must complete (by sending data to the client), each of which
419 // will be returned from cq_->Next() with ok set to true. After all
420 // cancelled listeners and all CallData objects processing requests are
421 // returned via cq_->Next(), cq_->Next() will return false, causing the
422 // loop to exit.
423 while (cq_->Next(&tag, &ok))
424 {
425 auto ptr = static_cast<Processor*>(tag);
426 JLOG(journal_.trace()) << "Processing CallData object."
427 << " ptr = " << ptr << " ok = " << ok;
428
429 if (!ok)
430 {
431 JLOG(journal_.debug()) << "Request listener cancelled. "
432 << "Destroying object";
433 erase(ptr);
434 }
435 else
436 {
437 if (!ptr->isFinished())
438 {
439 JLOG(journal_.debug()) << "Received new request. Processing";
440 // ptr is now processing a request, so create a new CallData
441 // object to handle additional requests
442 auto cloned = ptr->clone();
443 requests.push_back(cloned);
444 // process the request
445 ptr->process();
446 }
447 else
448 {
449 JLOG(journal_.debug()) << "Sent response. Destroying object";
450 erase(ptr);
451 }
452 }
453 }
454 JLOG(journal_.debug()) << "Completion Queue drained";
455}
456
457// create a CallData instance for each RPC
460{
462
463 auto addToRequests = [&requests](auto callData) {
464 requests.push_back(std::move(callData));
465 };
466
467 {
468 using cd = CallData<
469 org::xrpl::rpc::v1::GetLedgerRequest,
470 org::xrpl::rpc::v1::GetLedgerResponse>;
471
472 addToRequests(std::make_shared<cd>(
473 service_,
474 *cq_,
475 app_,
476 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
477 RequestGetLedger,
479 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
483 }
484 {
485 using cd = CallData<
486 org::xrpl::rpc::v1::GetLedgerDataRequest,
487 org::xrpl::rpc::v1::GetLedgerDataResponse>;
488
489 addToRequests(std::make_shared<cd>(
490 service_,
491 *cq_,
492 app_,
493 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
494 RequestGetLedgerData,
496 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
500 }
501 {
502 using cd = CallData<
503 org::xrpl::rpc::v1::GetLedgerDiffRequest,
504 org::xrpl::rpc::v1::GetLedgerDiffResponse>;
505
506 addToRequests(std::make_shared<cd>(
507 service_,
508 *cq_,
509 app_,
510 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
511 RequestGetLedgerDiff,
513 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
517 }
518 {
519 using cd = CallData<
520 org::xrpl::rpc::v1::GetLedgerEntryRequest,
521 org::xrpl::rpc::v1::GetLedgerEntryResponse>;
522
523 addToRequests(std::make_shared<cd>(
524 service_,
525 *cq_,
526 app_,
527 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
528 RequestGetLedgerEntry,
530 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
534 }
535 return requests;
536}
537
538bool
540{
541 // if config does not specify a grpc server address, don't start
542 if (serverAddress_.empty())
543 return false;
544
545 JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_;
546
547 grpc::ServerBuilder builder;
548
549 // Listen on the given address without any authentication mechanism.
550 // Actually binded port will be returned into "port" variable.
551 int port = 0;
552 builder.AddListeningPort(
553 serverAddress_, grpc::InsecureServerCredentials(), &port);
554 // Register "service_" as the instance through which we'll communicate with
555 // clients. In this case it corresponds to an *asynchronous* service.
556 builder.RegisterService(&service_);
557 // Get hold of the completion queue used for the asynchronous communication
558 // with the gRPC runtime.
559 cq_ = builder.AddCompletionQueue();
560 // Finally assemble the server.
561 server_ = builder.BuildAndStart();
562 serverPort_ = static_cast<std::uint16_t>(port);
563
564 return static_cast<bool>(serverPort_);
565}
566
567boost::asio::ip::tcp::endpoint
569{
570 std::string const addr =
572 return boost::asio::ip::tcp::endpoint(
573 boost::asio::ip::make_address(addr), serverPort_);
574}
575
576bool
578{
579 // Start the server and setup listeners
580 if (running_ = impl_.start(); running_)
581 {
582 thread_ = std::thread([this]() {
583 // Start the event loop and begin handling requests
584 beast::setCurrentThreadName("rippled: grpc");
585 this->impl_.handleRpcs();
586 });
587 }
588 return running_;
589}
590
591void
593{
594 if (running_)
595 {
596 impl_.shutdown();
597 thread_.join();
598 running_ = false;
599 }
600}
601
603{
604 XRPL_ASSERT(!running_, "ripple::GRPCServer::~GRPCServer : is not running");
605}
606
607boost::asio::ip::tcp::endpoint
609{
610 return impl_.getEndpoint();
611}
612
613} // namespace ripple
T back(T... args)
T begin(T... args)
constexpr char const * c_str() const
Definition json_value.h:57
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
virtual Config & config()=0
virtual beast::Journal journal(std::string const &name)=0
virtual JobQueue & getJobQueue()=0
virtual Resource::Manager & getResourceManager()=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Resource::Consumer getUsage()
std::optional< std::string > getUser()
grpc::ServerCompletionQueue & cq_
Definition GRPCServer.h:141
void setIsUnlimited(Response &response, bool isUnlimited)
std::optional< boost::asio::ip::address > getClientIpAddress()
BindListener< Request, Response > bindListener_
Definition GRPCServer.h:164
grpc::ServerAsyncResponseWriter< Response > responder_
Definition GRPCServer.h:161
Role getRole(bool isUnlimited)
std::shared_ptr< Processor > clone() override
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, Forward< Request, Response > forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector< boost::asio::ip::address > const &secureGatewayIPs)
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition GRPCServer.h:138
virtual void process() override
virtual bool isFinished() override
std::string serverAddress_
Definition GRPCServer.h:65
Application & app_
Definition GRPCServer.h:63
std::vector< std::shared_ptr< Processor > > setupListeners()
std::unique_ptr< grpc::Server > server_
Definition GRPCServer.h:61
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition GRPCServer.h:54
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition GRPCServer.h:59
GRPCServerImpl(Application &app)
static unsigned constexpr apiVersion
Definition GRPCServer.h:91
boost::asio::ip::tcp::endpoint getEndpoint() const
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition GRPCServer.h:68
std::uint16_t serverPort_
Definition GRPCServer.h:66
beast::Journal journal_
Definition GRPCServer.h:70
std::thread thread_
Definition GRPCServer.h:305
boost::asio::ip::tcp::endpoint getEndpoint() const
GRPCServerImpl impl_
Definition GRPCServer.h:304
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:35
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:394
A consumption charge.
Definition Charge.h:11
An endpoint that consumes resources.
Definition Consumer.h:17
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
Holds a collection of configuration values.
Definition BasicConfig.h:26
std::optional< T > get(std::string const &name) const
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T find_first_of(T... args)
T find_if(T... args)
T find_last_of(T... args)
T getline(T... args)
T is_same_v
T join(T... args)
boost::asio::ip::tcp::endpoint to_asio_endpoint(Endpoint const &endpoint)
Convert to asio::ip::tcp::endpoint.
Endpoint from_asio(boost::asio::ip::address const &address)
Convert to Endpoint.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
@ NO_CONDITION
Definition Handler.h:21
error_code_i conditionMet(Condition condition_required, T &context)
Definition Handler.h:62
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
Charge const feeMediumBurdenRPC
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
@ rpcSUCCESS
Definition ErrorCodes.h:25
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:106
std::pair< org::xrpl::rpc::v1::GetLedgerResponse, grpc::Status > doLedgerGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerRequest > &context)
std::pair< org::xrpl::rpc::v1::GetLedgerEntryResponse, grpc::Status > doLedgerEntryGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerEntryRequest > &context)
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition STExchange.h:153
Role
Indicates the level of administrative permission to grant.
Definition Role.h:25
@ jtRPC
Definition Job.h:33
std::pair< org::xrpl::rpc::v1::GetLedgerDiffResponse, grpc::Status > doLedgerDiffGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDiffRequest > &context)
Definition LedgerDiff.cpp:6
std::pair< org::xrpl::rpc::v1::GetLedgerDataResponse, grpc::Status > doLedgerDataGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDataRequest > &context)
STL namespace.
T pop_back(T... args)
T push_back(T... args)
T stoi(T... args)
T str(T... args)
Maps an rpc error code to its token, default message, and HTTP status.
Definition ErrorCodes.h:170
Json::StaticString message
Definition ErrorCodes.h:202
T substr(T... args)
T value(T... args)
T what(T... args)