1#include <xrpld/app/main/GRPCServer.h>
3#include <xrpld/app/main/Application.h>
4#include <xrpld/rpc/Context.h>
5#include <xrpld/rpc/GRPCHandlers.h>
6#include <xrpld/rpc/Role.h>
7#include <xrpld/rpc/detail/Handler.h>
9#include <xrpl/basics/FileUtilities.h>
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/contract.h>
12#include <xrpl/beast/core/CurrentThreadName.h>
13#include <xrpl/beast/net/IPAddressConversion.h>
14#include <xrpl/beast/net/IPEndpoint.h>
15#include <xrpl/beast/utility/instrumentation.h>
16#include <xrpl/config/BasicConfig.h>
17#include <xrpl/config/Constants.h>
18#include <xrpl/core/Job.h>
19#include <xrpl/core/JobQueue.h>
20#include <xrpl/protocol/ErrorCodes.h>
21#include <xrpl/resource/Charge.h>
22#include <xrpl/resource/Consumer.h>
23#include <xrpl/resource/Fees.h>
24#include <xrpl/server/InfoSub.h>
26#include <boost/algorithm/string/trim.hpp>
27#include <boost/asio/ip/address.hpp>
28#include <boost/asio/ip/tcp.hpp>
29#include <boost/icl/interval_set.hpp>
31#include <grpc/grpc_security_constants.h>
32#include <grpcpp/completion_queue.h>
33#include <grpcpp/security/server_credentials.h>
34#include <grpcpp/server_builder.h>
35#include <grpcpp/support/status.h>
36#include <org/xrpl/rpc/v1/get_ledger.pb.h>
37#include <org/xrpl/rpc/v1/get_ledger_data.pb.h>
38#include <org/xrpl/rpc/v1/get_ledger_diff.pb.h>
39#include <org/xrpl/rpc/v1/get_ledger_entry.pb.h>
40#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
60std::optional<boost::asio::ip::tcp::endpoint>
61getEndpoint(std::string
const& peer)
67 std::string peerClean(peer);
70 peerClean = peer.
substr(first + 1);
73 std::optional<beast::IP::Endpoint> endpoint =
78 catch (std::exception
const&)
86template <
class Request,
class Response>
88 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
89 grpc::ServerCompletionQueue& cq,
114template <
class Request,
class Response>
130template <
class Request,
class Response>
148 auto coro =
app_.getJobQueue().postCoro(
150 thisShared->process(coro);
156 grpc::Status
const status{grpc::StatusCode::INTERNAL,
"Job Queue is already stopped"};
161template <
class Request,
class Response>
171 grpc::Status
const status{
172 grpc::StatusCode::RESOURCE_EXHAUSTED,
"usage balance exceeds threshold"};
178 usage.charge(loadType);
183 toLog <<
"role = " << (int)role;
185 toLog <<
" address = ";
187 toLog << clientIp.value();
191 toLog << user.value();
194 JLOG(
app_.getJournal(
"GRPCServer::Calldata").debug()) << toLog.
str();
198 {
app_.getJournal(
"gRPCServer"),
202 app_.getLedgerMaster(),
216 grpc::Status
const status{
217 grpc::StatusCode::FAILED_PRECONDITION, errorInfo.
message.
cStr()};
230 grpc::Status
const status{grpc::StatusCode::INTERNAL, ex.
what()};
235template <
class Request,
class Response>
242template <
class Request,
class Response>
249template <
class Request,
class Response>
261template <
class Request,
class Response>
265 if (
auto descriptor = Request::GetDescriptor()->FindFieldByName(
"user"))
276template <
class Request,
class Response>
282 return endpoint->address();
286template <
class Request,
class Response>
290 return xrpl::getEndpoint(
ctx_.peer());
293template <
class Request,
class Response>
311template <
class Request,
class Response>
317 if (
auto descriptor = Response::GetDescriptor()->FindFieldByName(
"is_unlimited"))
319 Response::GetReflection()->SetBool(&response, descriptor,
true);
324template <
class Request,
class Response>
340 Section const& section = app_.config().section(Sections::kPortGrpc);
342 auto const optIp = section.get(Keys::kIp);
346 auto const optPort = section.get(Keys::kPort);
351 boost::asio::ip::tcp::endpoint const endpoint(
352 boost::asio::ip::make_address(*optIp), std::stoi(*optPort));
354 std::stringstream ss;
356 serverAddress_ = ss.str();
360 JLOG(
journal_.error()) <<
"Error setting grpc server address";
365 if (optSecureGateway)
373 boost::algorithm::trim(ip);
374 auto const addr = boost::asio::ip::make_address(ip);
376 if (addr.is_unspecified())
378 JLOG(
journal_.error()) <<
"Can't pass unspecified IP in "
379 <<
"secure_gateway section of port_grpc";
388 JLOG(
journal_.error()) <<
"Error parsing secure gateway IPs for grpc server";
402 if (!sslCertPath_.has_value() || !sslKeyPath_.has_value())
404 JLOG(journal_.error())
405 <<
"Both ssl_cert and ssl_key must be specified for gRPC TLS";
406 Throw<std::runtime_error>(
"Incomplete TLS configuration for gRPC");
413 if (sslCertChainPath_.has_value() &&
414 (!sslCertPath_.has_value() || !sslKeyPath_.has_value()))
416 JLOG(journal_.error())
417 <<
"ssl_cert_chain specified for gRPC without both ssl_cert and ssl_key; "
418 <<
"this is an invalid TLS configuration";
419 Throw<std::runtime_error>(
420 "Invalid gRPC TLS configuration: ssl_cert_chain requires both ssl_cert and "
426 if (sslClientCAPath_.has_value() && (!sslCertPath_.has_value() || !sslKeyPath_.has_value()))
428 JLOG(journal_.error())
429 <<
"ssl_client_ca specified for gRPC without both ssl_cert and ssl_key; "
430 <<
"this is an invalid TLS configuration";
431 Throw<std::runtime_error>(
432 "Invalid gRPC TLS configuration: ssl_client_ca requires both ssl_cert and ssl_key");
440 JLOG(
journal_.debug()) <<
"Shutting down";
450 JLOG(
journal_.debug()) <<
"Server has been shutdown";
456 JLOG(
journal_.debug()) <<
"Completion Queue has been shutdown";
470 BOOST_ASSERT(it != requests.
end());
471 it->swap(requests.
back());
491 while (
cq_->Next(&tag, &ok))
494 JLOG(
journal_.trace()) <<
"Processing CallData object."
495 <<
" ptr = " << ptr <<
" ok = " << ok;
499 JLOG(
journal_.debug()) <<
"Request listener cancelled. "
500 <<
"Destroying object";
505 if (!ptr->isFinished())
507 JLOG(
journal_.debug()) <<
"Received new request. Processing";
510 auto cloned = ptr->clone();
517 JLOG(
journal_.debug()) <<
"Sent response. Destroying object";
522 JLOG(
journal_.debug()) <<
"Completion Queue drained";
532 auto addToRequests = [&requests](
auto callData) { requests.
push_back(std::move(callData)); };
543 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedger,
545 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
546 Condition::NoCondition,
552 org::xrpl::rpc::v1::GetLedgerDataRequest,
553 org::xrpl::rpc::v1::GetLedgerDataResponse>;
560 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerData,
562 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
563 Condition::NoCondition,
569 org::xrpl::rpc::v1::GetLedgerDiffRequest,
570 org::xrpl::rpc::v1::GetLedgerDiffResponse>;
577 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerDiff,
579 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
580 Condition::NoCondition,
586 org::xrpl::rpc::v1::GetLedgerEntryRequest,
587 org::xrpl::rpc::v1::GetLedgerEntryResponse>;
594 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerEntry,
596 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
597 Condition::NoCondition,
609 JLOG(
journal_.info()) <<
"Configuring gRPC server without TLS";
610 return grpc::InsecureServerCredentials();
613 JLOG(
journal_.info()) <<
"Configuring gRPC server with TLS";
617 boost::system::error_code ec;
618 grpc::SslServerCredentialsOptions sslOpts;
619 grpc::SslServerCredentialsOptions::PemKeyCertPair keyCertPair;
625 <<
" - " << ec.message();
637 keyCertPair.private_key = keyContents;
665 if (clientCAContents.empty())
669 <<
" - failed to configure mutual TLS";
673 sslOpts.pem_root_certs = clientCAContents;
674 sslOpts.client_certificate_request =
675 GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY;
676 JLOG(
journal_.info()) <<
"gRPC mutual TLS enabled - client certificates will be "
677 "required and verified";
681 keyCertPair.cert_chain = certContents;
682 if (!certChainContents.
empty())
684 keyCertPair.cert_chain +=
'\n' + certChainContents;
685 JLOG(
journal_.info()) <<
"gRPC server certificate chain configured with "
686 "intermediate CA certificates";
689 sslOpts.pem_key_cert_pairs.
push_back(keyCertPair);
691 JLOG(
journal_.info()) <<
"gRPC TLS credentials configured successfully";
692 return grpc::SslServerCredentials(sslOpts);
696 JLOG(
journal_.error()) <<
"Exception while configuring gRPC TLS: "
716 tlsMode =
"with mutual TLS (mTLS)";
720 tlsMode =
"with TLS";
726 grpc::ServerBuilder builder;
734 <<
" (TLS mode: " << tlsMode
735 <<
") - server will not start";
748 cq_ = builder.AddCompletionQueue();
751 server_ = builder.BuildAndStart();
761 <<
"Failed to start gRPC server at " <<
serverAddress_ <<
" (TLS mode: " << tlsMode
762 <<
"); Possible causes: address already in use, invalid address format, or permission "
769boost::asio::ip::tcp::endpoint
773 return boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(addr),
serverPort_);
785 this->
impl_.handleRpcs();
804 XRPL_ASSERT(!
running_,
"xrpl::GRPCServer::~GRPCServer : is not running");
807boost::asio::ip::tcp::endpoint
810 return impl_.getEndpoint();
static std::optional< Endpoint > fromStringChecked(std::string const &s)
Create an Endpoint from a string.
constexpr char const * cStr() const
Forward< Request, Response > forward_
Resource::Consumer getUsage()
std::vector< boost::asio::ip::address > const & secureGatewayIPs_
std::optional< boost::asio::ip::address > getClientIpAddress()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
bool isFinished() override
Handler< Request, Response > handler_
RPC::Condition requiredCondition_
std::optional< std::string > getUser()
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, Forward< Request, Response > forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector< boost::asio::ip::address > const &secureGatewayIPs)
BindListener< Request, Response > bindListener_
Role getRole(bool isUnlimited)
Resource::Charge getLoadType()
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
std::atomic_bool finished_
grpc::ServerAsyncResponseWriter< Response > responder_
std::shared_ptr< Processor > clone() override
Resource::Charge loadType_
void setIsUnlimited(Response &response, bool isUnlimited)
grpc::ServerCompletionQueue & cq_
std::uint16_t serverPort_
std::optional< std::string > sslKeyPath_
std::optional< std::string > sslCertPath_
std::function< grpc::Status( org::xrpl::rpc::v1::XRPLedgerAPIService::Stub *, grpc::ClientContext *, Request, Response *)> Forward
std::function< std::pair< Response, grpc::Status >(RPC::GRPCContext< Request > &)> Handler
std::vector< boost::asio::ip::address > secureGatewayIPs_
std::optional< std::string > sslCertChainPath_
std::unique_ptr< grpc::ServerCompletionQueue > cq_
std::vector< std::shared_ptr< Processor > > setupListeners()
std::optional< std::string > sslClientCAPath_
GRPCServerImpl(Application &app)
std::unique_ptr< grpc::Server > server_
boost::asio::ip::tcp::endpoint getEndpoint() const
std::string serverAddress_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
std::function< void( org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &, grpc::ServerContext *, Request *, grpc::ServerAsyncResponseWriter< Response > *, grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *)> BindListener
static constexpr unsigned kApiVersion
std::shared_ptr< grpc::ServerCredentials > createServerCredentials()
boost::asio::ip::tcp::endpoint getEndpoint() const
std::shared_ptr< InfoSub > pointer
An endpoint that consumes resources.
T find_first_of(T... args)
T find_last_of(T... args)
boost::asio::ip::tcp::endpoint toAsioEndpoint(Endpoint const &endpoint)
Convert to asio::ip::tcp::endpoint.
Endpoint fromAsio(boost::asio::ip::address const &address)
Convert to Endpoint.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
ErrorCodeI conditionMet(Condition conditionRequired, T &context)
ErrorInfo const & getErrorInfo(ErrorCodeI code)
Returns an ErrorInfo that reflects the error code.
Charge const kFeeMediumBurdenRpc
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string getFileContents(boost::system::error_code &ec, boost::filesystem::path const &sourcePath, std::optional< std::size_t > maxSize=std::nullopt)
std::pair< org::xrpl::rpc::v1::GetLedgerDiffResponse, grpc::Status > doLedgerDiffGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDiffRequest > &context)
std::pair< org::xrpl::rpc::v1::GetLedgerDataResponse, grpc::Status > doLedgerDataGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDataRequest > &context)
Role
Indicates the level of administrative permission to grant.
std::pair< org::xrpl::rpc::v1::GetLedgerResponse, grpc::Status > doLedgerGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerRequest > &context)
std::pair< org::xrpl::rpc::v1::GetLedgerEntryResponse, grpc::Status > doLedgerEntryGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerEntryRequest > &context)
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
T shared_from_this(T... args)
static constexpr auto kSslCertChain
static constexpr auto kSslKey
static constexpr auto kSslClientCa
static constexpr auto kSslCert
static constexpr auto kSecureGateway
Maps an rpc error code to its token, default message, and HTTP status.
json::StaticString message
static constexpr auto kPortGrpc