rippled
Loading...
Searching...
No Matches
GRPCServer.cpp
1#include <xrpld/app/main/GRPCServer.h>
2#include <xrpld/core/ConfigSections.h>
3
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/net/IPAddressConversion.h>
6#include <xrpl/resource/Fees.h>
7
8namespace xrpl {
9
10namespace {
11
12// helper function. converts string to endpoint. handles ipv4 and ipv6, with or
13// without port, with or without prepended scheme
15getEndpoint(std::string const& peer)
16{
17 try
18 {
19 std::size_t const first = peer.find_first_of(':');
20 std::size_t const last = peer.find_last_of(':');
21 std::string peerClean(peer);
22 if (first != last)
23 {
24 peerClean = peer.substr(first + 1);
25 }
26
29 if (endpoint)
30 return beast::IP::to_asio_endpoint(endpoint.value());
31 }
32 catch (std::exception const&) // NOLINT(bugprone-empty-catch)
33 {
34 }
35 return {};
36}
37
38} // namespace
39
40template <class Request, class Response>
42 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
43 grpc::ServerCompletionQueue& cq,
44 Application& app,
48 RPC::Condition requiredCondition,
49 Resource::Charge loadType,
50 std::vector<boost::asio::ip::address> const& secureGatewayIPs)
51 : service_(service)
52 , cq_(cq)
53 , finished_(false)
54 , app_(app)
55 , responder_(&ctx_)
56 , bindListener_(std::move(bindListener))
57 , handler_(std::move(handler))
58 , forward_(std::move(forward))
59 , requiredCondition_(requiredCondition)
60 , loadType_(std::move(loadType))
61 , secureGatewayIPs_(secureGatewayIPs)
62{
63 // Bind a listener. When a request is received, "this" will be returned
64 // from CompletionQueue::Next
66}
67
68template <class Request, class Response>
71{
74 cq_,
75 app_,
76 bindListener_,
77 handler_,
78 forward_,
79 requiredCondition_,
80 loadType_,
82}
83
84template <class Request, class Response>
85void
87{
88 // sanity check
89 BOOST_ASSERT(!finished_);
90
91 std::shared_ptr<CallData<Request, Response>> const thisShared = this->shared_from_this();
92
93 // Need to set finished to true before processing the response,
94 // because as soon as the response is posted to the completion
95 // queue (via responder_.Finish(...) or responder_.FinishWithError(...)),
96 // the CallData object is returned as a tag in handleRpcs().
97 // handleRpcs() checks the finished variable, and if true, destroys
98 // the object. Setting finished to true before calling process
99 // ensures that finished is always true when this CallData object
100 // is returned as a tag in handleRpcs(), after sending the response
101 finished_ = true;
102 auto coro = app_.getJobQueue().postCoro(
103 JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
104 thisShared->process(coro);
105 });
106
107 // If coro is null, then the JobQueue has already been shutdown
108 if (!coro)
109 {
110 grpc::Status const status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
111 responder_.FinishWithError(status, this);
112 }
113}
114
115template <class Request, class Response>
116void
118{
119 try
120 {
121 auto usage = getUsage();
122 bool const isUnlimited = clientIsUnlimited();
123 if (!isUnlimited && usage.disconnect(app_.getJournal("gRPCServer")))
124 {
125 grpc::Status const status{
126 grpc::StatusCode::RESOURCE_EXHAUSTED, "usage balance exceeds threshold"};
127 responder_.FinishWithError(status, this);
128 }
129 else
130 {
131 auto loadType = getLoadType();
132 usage.charge(loadType);
133 auto role = getRole(isUnlimited);
134
135 {
136 std::stringstream toLog;
137 toLog << "role = " << (int)role;
138
139 toLog << " address = ";
140 if (auto clientIp = getClientIpAddress())
141 toLog << clientIp.value();
142
143 toLog << " user = ";
144 if (auto user = getUser())
145 toLog << user.value();
146 toLog << " isUnlimited = " << isUnlimited;
147
148 JLOG(app_.getJournal("GRPCServer::Calldata").debug()) << toLog.str();
149 }
150
152 {app_.getJournal("gRPCServer"),
153 app_,
154 loadType,
155 app_.getOPs(),
157 usage,
158 role,
159 coro,
161 apiVersion},
162 request_};
163
164 // Make sure we can currently handle the rpc
165 error_code_i const conditionMetRes = RPC::conditionMet(requiredCondition_, context);
166
167 if (conditionMetRes != rpcSUCCESS)
168 {
169 RPC::ErrorInfo const errorInfo = RPC::get_error_info(conditionMetRes);
170 grpc::Status const status{
171 grpc::StatusCode::FAILED_PRECONDITION, errorInfo.message.c_str()};
172 responder_.FinishWithError(status, this);
173 }
174 else
175 {
176 std::pair<Response, grpc::Status> result = handler_(context);
177 setIsUnlimited(result.first, isUnlimited);
178 responder_.Finish(result.first, result.second, this);
179 }
180 }
181 }
182 catch (std::exception const& ex)
183 {
184 grpc::Status const status{grpc::StatusCode::INTERNAL, ex.what()};
185 responder_.FinishWithError(status, this);
186 }
187}
188
189template <class Request, class Response>
190bool
195
196template <class Request, class Response>
202
203template <class Request, class Response>
204Role
206{
207 if (isUnlimited)
208 {
209 return Role::IDENTIFIED;
210 }
211
212 return Role::USER;
213}
214
215template <class Request, class Response>
218{
219 if (auto descriptor = Request::GetDescriptor()->FindFieldByName("user"))
220 {
221 std::string user = Request::GetReflection()->GetString(request_, descriptor);
222 if (!user.empty())
223 {
224 return user;
225 }
226 }
227 return {};
228}
229
230template <class Request, class Response>
233{
234 auto endpoint = getClientEndpoint();
235 if (endpoint)
236 return endpoint->address();
237 return {};
238}
239
240template <class Request, class Response>
243{
244 return xrpl::getEndpoint(ctx_.peer());
245}
246
247template <class Request, class Response>
248bool
250{
251 if (!getUser())
252 return false;
253 auto clientIp = getClientIpAddress();
254 if (clientIp)
255 {
256 for (auto& ip : secureGatewayIPs_)
257 {
258 if (ip == clientIp)
259 return true;
260 }
261 }
262 return false;
263}
264
265template <class Request, class Response>
266void
268{
269 if (isUnlimited)
270 {
271 if (auto descriptor = Response::GetDescriptor()->FindFieldByName("is_unlimited"))
272 {
273 Response::GetReflection()->SetBool(&response, descriptor, true);
274 }
275 }
276}
277
278template <class Request, class Response>
281{
282 auto endpoint = getClientEndpoint();
283 if (endpoint)
285 Throw<std::runtime_error>("Failed to get client endpoint");
286}
287
289 : app_(app), journal_(app_.getJournal("gRPC Server"))
290{
291 // if present, get endpoint from config
292 if (app_.config().exists(SECTION_PORT_GRPC))
293 {
294 Section const& section = app_.config().section(SECTION_PORT_GRPC);
295
296 auto const optIp = section.get("ip");
297 if (!optIp)
298 return;
299
300 auto const optPort = section.get("port");
301 if (!optPort)
302 return;
303 try
304 {
305 boost::asio::ip::tcp::endpoint const endpoint(
306 boost::asio::ip::make_address(*optIp), std::stoi(*optPort));
307
309 ss << endpoint;
310 serverAddress_ = ss.str();
311 }
312 catch (std::exception const&)
313 {
314 JLOG(journal_.error()) << "Error setting grpc server address";
315 Throw<std::runtime_error>("Error setting grpc server address");
316 }
317
318 auto const optSecureGateway = section.get("secure_gateway");
319 if (optSecureGateway)
320 {
321 try
322 {
323 std::stringstream ss{*optSecureGateway};
324 std::string ip;
325 while (std::getline(ss, ip, ','))
326 {
327 boost::algorithm::trim(ip);
328 auto const addr = boost::asio::ip::make_address(ip);
329
330 if (addr.is_unspecified())
331 {
332 JLOG(journal_.error()) << "Can't pass unspecified IP in "
333 << "secure_gateway section of port_grpc";
334 Throw<std::runtime_error>("Unspecified IP in secure_gateway section");
335 }
336
338 }
339 }
340 catch (std::exception const&)
341 {
342 JLOG(journal_.error()) << "Error parsing secure gateway IPs for grpc server";
343 Throw<std::runtime_error>("Error parsing secure_gateway section");
344 }
345 }
346 }
347}
348
349void
351{
352 JLOG(journal_.debug()) << "Shutting down";
353
354 // The below call cancels all "listeners" (CallData objects that are waiting
355 // for a request, as opposed to processing a request), and blocks until all
356 // requests being processed are completed. CallData objects in the midst of
357 // processing requests need to actually send data back to the client, via
358 // responder_.Finish(...) or responder_.FinishWithError(...), for this call
359 // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok
360 // set to false
361 server_->Shutdown();
362 JLOG(journal_.debug()) << "Server has been shutdown";
363
364 // Always shutdown the completion queue after the server. This call allows
365 // cq_.Next() to return false, once all events posted to the completion
366 // queue have been processed. See handleRpcs() for more details.
367 cq_->Shutdown();
368 JLOG(journal_.debug()) << "Completion Queue has been shutdown";
369}
370
371void
373{
374 // This collection should really be an unordered_set. However, to delete
375 // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below
376 // while loop) sets the tag to a raw pointer.
378
379 auto erase = [&requests](Processor* ptr) {
380 auto it =
381 std::find_if(requests.begin(), requests.end(), [ptr](std::shared_ptr<Processor>& sPtr) {
382 return sPtr.get() == ptr;
383 });
384 BOOST_ASSERT(it != requests.end());
385 it->swap(requests.back());
386 requests.pop_back();
387 };
388
389 void* tag = nullptr; // uniquely identifies a request.
390 bool ok = false;
391 // Block waiting to read the next event from the completion queue. The
392 // event is uniquely identified by its tag, which in this case is the
393 // memory address of a CallData instance.
394 // The return value of Next should always be checked. This return value
395 // tells us whether there is any kind of event or cq_ is shutting down.
396 // When cq_.Next(...) returns false, all work has been completed and the
397 // loop can exit. When the server is shutdown, each CallData object that is
398 // listening for a request is forcibly cancelled, and is returned by
399 // cq_->Next() with ok set to false. Then, each CallData object processing
400 // a request must complete (by sending data to the client), each of which
401 // will be returned from cq_->Next() with ok set to true. After all
402 // cancelled listeners and all CallData objects processing requests are
403 // returned via cq_->Next(), cq_->Next() will return false, causing the
404 // loop to exit.
405 while (cq_->Next(&tag, &ok))
406 {
407 auto ptr = static_cast<Processor*>(tag);
408 JLOG(journal_.trace()) << "Processing CallData object."
409 << " ptr = " << ptr << " ok = " << ok;
410
411 if (!ok)
412 {
413 JLOG(journal_.debug()) << "Request listener cancelled. "
414 << "Destroying object";
415 erase(ptr);
416 }
417 else
418 {
419 if (!ptr->isFinished())
420 {
421 JLOG(journal_.debug()) << "Received new request. Processing";
422 // ptr is now processing a request, so create a new CallData
423 // object to handle additional requests
424 auto cloned = ptr->clone();
425 requests.push_back(cloned);
426 // process the request
427 ptr->process();
428 }
429 else
430 {
431 JLOG(journal_.debug()) << "Sent response. Destroying object";
432 erase(ptr);
433 }
434 }
435 }
436 JLOG(journal_.debug()) << "Completion Queue drained";
437}
438
439// create a CallData instance for each RPC
442{
444
445 auto addToRequests = [&requests](auto callData) { requests.push_back(std::move(callData)); };
446
447 {
448 using cd =
450
451 addToRequests(
453 service_,
454 *cq_,
455 app_,
456 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedger,
458 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
462 }
463 {
464 using cd = CallData<
465 org::xrpl::rpc::v1::GetLedgerDataRequest,
466 org::xrpl::rpc::v1::GetLedgerDataResponse>;
467
468 addToRequests(
470 service_,
471 *cq_,
472 app_,
473 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerData,
475 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
479 }
480 {
481 using cd = CallData<
482 org::xrpl::rpc::v1::GetLedgerDiffRequest,
483 org::xrpl::rpc::v1::GetLedgerDiffResponse>;
484
485 addToRequests(
487 service_,
488 *cq_,
489 app_,
490 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerDiff,
492 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
496 }
497 {
498 using cd = CallData<
499 org::xrpl::rpc::v1::GetLedgerEntryRequest,
500 org::xrpl::rpc::v1::GetLedgerEntryResponse>;
501
502 addToRequests(
504 service_,
505 *cq_,
506 app_,
507 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerEntry,
509 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
513 }
514 return requests;
515}
516
517bool
519{
520 // if config does not specify a grpc server address, don't start
521 if (serverAddress_.empty())
522 return false;
523
524 JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_;
525
526 grpc::ServerBuilder builder;
527
528 // Listen on the given address without any authentication mechanism.
529 // Actually binded port will be returned into "port" variable.
530 int port = 0;
531 builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials(), &port);
532 // Register "service_" as the instance through which we'll communicate with
533 // clients. In this case it corresponds to an *asynchronous* service.
534 builder.RegisterService(&service_);
535 // Get hold of the completion queue used for the asynchronous communication
536 // with the gRPC runtime.
537 cq_ = builder.AddCompletionQueue();
538 // Finally assemble the server.
539 server_ = builder.BuildAndStart();
540 serverPort_ = static_cast<std::uint16_t>(port);
541
542 return static_cast<bool>(serverPort_);
543}
544
545boost::asio::ip::tcp::endpoint
547{
549 return boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(addr), serverPort_);
550}
551
552bool
554{
555 // Start the server and setup listeners
556 if (running_ = impl_.start(); running_)
557 {
558 thread_ = std::thread([this]() {
559 // Start the event loop and begin handling requests
560 beast::setCurrentThreadName("rippled: grpc");
561 this->impl_.handleRpcs();
562 });
563 }
564 return running_;
565}
566
567void
569{
570 if (running_)
571 {
572 impl_.shutdown();
573 thread_.join();
574 running_ = false;
575 }
576}
577
579{
580 XRPL_ASSERT(!running_, "xrpl::GRPCServer::~GRPCServer : is not running");
581}
582
583boost::asio::ip::tcp::endpoint
585{
586 return impl_.getEndpoint();
587}
588
589} // namespace xrpl
T back(T... args)
T begin(T... args)
constexpr char const * c_str() const
Definition json_value.h:57
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
virtual Config & config()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Resource::Consumer getUsage()
std::optional< boost::asio::ip::address > getClientIpAddress()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition GRPCServer.h:135
virtual bool isFinished() override
std::optional< std::string > getUser()
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, Forward< Request, Response > forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector< boost::asio::ip::address > const &secureGatewayIPs)
BindListener< Request, Response > bindListener_
Definition GRPCServer.h:161
Role getRole(bool isUnlimited)
Resource::Charge getLoadType()
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
virtual void process() override
grpc::ServerAsyncResponseWriter< Response > responder_
Definition GRPCServer.h:158
std::shared_ptr< Processor > clone() override
void setIsUnlimited(Response &response, bool isUnlimited)
grpc::ServerCompletionQueue & cq_
Definition GRPCServer.h:138
grpc::ServerContext ctx_
Definition GRPCServer.h:143
static unsigned constexpr apiVersion
Definition GRPCServer.h:89
std::uint16_t serverPort_
Definition GRPCServer.h:65
Application & app_
Definition GRPCServer.h:62
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition GRPCServer.h:67
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition GRPCServer.h:53
std::vector< std::shared_ptr< Processor > > setupListeners()
GRPCServerImpl(Application &app)
beast::Journal journal_
Definition GRPCServer.h:69
std::unique_ptr< grpc::Server > server_
Definition GRPCServer.h:60
boost::asio::ip::tcp::endpoint getEndpoint() const
std::string serverAddress_
Definition GRPCServer.h:64
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition GRPCServer.h:58
std::thread thread_
Definition GRPCServer.h:302
boost::asio::ip::tcp::endpoint getEndpoint() const
GRPCServerImpl impl_
Definition GRPCServer.h:301
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:33
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:390
A consumption charge.
Definition Charge.h:10
An endpoint that consumes resources.
Definition Consumer.h:16
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
Holds a collection of configuration values.
Definition BasicConfig.h:24
std::optional< T > get(std::string const &name) const
virtual JobQueue & getJobQueue()=0
virtual beast::Journal getJournal(std::string const &name)=0
virtual NetworkOPs & getOPs()=0
virtual Resource::Manager & getResourceManager()=0
virtual LedgerMaster & getLedgerMaster()=0
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T find_first_of(T... args)
T find_if(T... args)
T find_last_of(T... args)
T getline(T... args)
T is_same_v
T join(T... args)
boost::asio::ip::tcp::endpoint to_asio_endpoint(Endpoint const &endpoint)
Convert to asio::ip::tcp::endpoint.
Endpoint from_asio(boost::asio::ip::address const &address)
Convert to Endpoint.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
@ NO_CONDITION
Definition Handler.h:20
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
error_code_i conditionMet(Condition condition_required, T &context)
Definition Handler.h:59
Charge const feeMediumBurdenRPC
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::pair< org::xrpl::rpc::v1::GetLedgerDiffResponse, grpc::Status > doLedgerDiffGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDiffRequest > &context)
Definition LedgerDiff.cpp:6
std::pair< org::xrpl::rpc::v1::GetLedgerDataResponse, grpc::Status > doLedgerDataGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDataRequest > &context)
Role
Indicates the level of administrative permission to grant.
Definition Role.h:24
std::pair< org::xrpl::rpc::v1::GetLedgerResponse, grpc::Status > doLedgerGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerRequest > &context)
@ jtRPC
Definition Job.h:31
std::pair< org::xrpl::rpc::v1::GetLedgerEntryResponse, grpc::Status > doLedgerEntryGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerEntryRequest > &context)
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition STExchange.h:148
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:98
error_code_i
Definition ErrorCodes.h:20
@ rpcSUCCESS
Definition ErrorCodes.h:24
T pop_back(T... args)
T push_back(T... args)
T stoi(T... args)
T str(T... args)
Maps an rpc error code to its token, default message, and HTTP status.
Definition ErrorCodes.h:168
Json::StaticString message
Definition ErrorCodes.h:191
T substr(T... args)
T value(T... args)
T what(T... args)