1#include <xrpld/app/main/GRPCServer.h> 
    2#include <xrpld/core/ConfigSections.h> 
    4#include <xrpl/beast/core/CurrentThreadName.h> 
    5#include <xrpl/beast/net/IPAddressConversion.h> 
    6#include <xrpl/resource/Fees.h> 
   24            peerClean = peer.
substr(first + 1);
 
   40template <
class Request, 
class Response>
 
   42    org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
 
   43    grpc::ServerCompletionQueue& cq,
 
   56    , bindListener_(
std::move(bindListener))
 
   57    , handler_(
std::move(handler))
 
   59    , requiredCondition_(
std::move(requiredCondition))
 
   60    , loadType_(
std::move(loadType))
 
   61    , secureGatewayIPs_(secureGatewayIPs)
 
 
   68template <
class Request, 
class Response>
 
   84template <
class Request, 
class Response>
 
   89    BOOST_ASSERT(!finished_);
 
   92        this->shared_from_this();
 
  107            thisShared->process(coro);
 
  114            grpc::StatusCode::INTERNAL, 
"Job Queue is already stopped"};
 
  115        responder_.FinishWithError(status, 
this);
 
 
  119template <
class Request, 
class Response>
 
  126        auto usage = getUsage();
 
  131                grpc::StatusCode::RESOURCE_EXHAUSTED,
 
  132                "usage balance exceeds threshold"};
 
  133            responder_.FinishWithError(status, 
this);
 
  137            auto loadType = getLoadType();
 
  138            usage.charge(loadType);
 
  143                toLog << 
"role = " << (int)role;
 
  145                toLog << 
" address = ";
 
  146                if (
auto clientIp = getClientIpAddress())
 
  147                    toLog << clientIp.value();
 
  150                if (
auto user = getUser())
 
  151                    toLog << user.value();
 
  179                    grpc::StatusCode::FAILED_PRECONDITION,
 
  181                responder_.FinishWithError(status, 
this);
 
  187                responder_.Finish(result.
first, result.
second, 
this);
 
  193        grpc::Status status{grpc::StatusCode::INTERNAL, ex.
what()};
 
  194        responder_.FinishWithError(status, 
this);
 
 
  198template <
class Request, 
class Response>
 
  205template <
class Request, 
class Response>
 
  212template <
class Request, 
class Response>
 
  222template <
class Request, 
class Response>
 
  226    if (
auto descriptor = Request::GetDescriptor()->FindFieldByName(
"user"))
 
  229            Request::GetReflection()->GetString(request_, descriptor);
 
 
  238template <
class Request, 
class Response>
 
  242    auto endpoint = getClientEndpoint();
 
  244        return endpoint->address();
 
 
  248template <
class Request, 
class Response>
 
  252    return ripple::getEndpoint(ctx_.peer());
 
 
  255template <
class Request, 
class Response>
 
  261    auto clientIp = getClientIpAddress();
 
 
  273template <
class Request, 
class Response>
 
  281        if (
auto descriptor =
 
  282                Response::GetDescriptor()->FindFieldByName(
"is_unlimited"))
 
  284            Response::GetReflection()->SetBool(&response, descriptor, 
true);
 
 
  289template <
class Request, 
class Response>
 
  293    auto endpoint = getClientEndpoint();
 
  297    Throw<std::runtime_error>(
"Failed to get client endpoint");
 
 
  308        auto const optIp = section.
get(
"ip");
 
  312        auto const optPort = section.
get(
"port");
 
  317            boost::asio::ip::tcp::endpoint endpoint(
 
  318                boost::asio::ip::make_address(*optIp), 
std::stoi(*optPort));
 
  326            JLOG(
journal_.
error()) << 
"Error setting grpc server address";
 
  327            Throw<std::runtime_error>(
"Error setting grpc server address");
 
  330        auto const optSecureGateway = section.
get(
"secure_gateway");
 
  331        if (optSecureGateway)
 
  339                    boost::algorithm::trim(ip);
 
  340                    auto const addr = boost::asio::ip::make_address(ip);
 
  342                    if (addr.is_unspecified())
 
  345                            << 
"Can't pass unspecified IP in " 
  346                            << 
"secure_gateway section of port_grpc";
 
  347                        Throw<std::runtime_error>(
 
  348                            "Unspecified IP in secure_gateway section");
 
  357                    << 
"Error parsing secure gateway IPs for grpc server";
 
  358                Throw<std::runtime_error>(
 
  359                    "Error parsing secure_gateway section");
 
 
  384    JLOG(
journal_.
debug()) << 
"Completion Queue has been shutdown";
 
 
  400                return sPtr.get() == ptr;
 
  402        BOOST_ASSERT(it != requests.
end());
 
  403        it->swap(requests.
back());
 
  423    while (
cq_->Next(&tag, &ok))
 
  427                               << 
" ptr = " << ptr << 
" ok = " << ok;
 
  432                                   << 
"Destroying object";
 
  437            if (!ptr->isFinished())
 
  439                JLOG(
journal_.
debug()) << 
"Received new request. Processing";
 
  442                auto cloned = ptr->clone();
 
  449                JLOG(
journal_.
debug()) << 
"Sent response. Destroying object";
 
 
  463    auto addToRequests = [&requests](
auto callData) {
 
  469            org::xrpl::rpc::v1::GetLedgerRequest,
 
  470            org::xrpl::rpc::v1::GetLedgerResponse>;
 
  476            &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
 
  479            &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
 
  486            org::xrpl::rpc::v1::GetLedgerDataRequest,
 
  487            org::xrpl::rpc::v1::GetLedgerDataResponse>;
 
  493            &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
 
  494                RequestGetLedgerData,
 
  496            &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
 
  503            org::xrpl::rpc::v1::GetLedgerDiffRequest,
 
  504            org::xrpl::rpc::v1::GetLedgerDiffResponse>;
 
  510            &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
 
  511                RequestGetLedgerDiff,
 
  513            &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
 
  520            org::xrpl::rpc::v1::GetLedgerEntryRequest,
 
  521            org::xrpl::rpc::v1::GetLedgerEntryResponse>;
 
  527            &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
 
  528                RequestGetLedgerEntry,
 
  530            &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
 
 
  547    grpc::ServerBuilder builder;
 
  552    builder.AddListeningPort(
 
  559    cq_ = builder.AddCompletionQueue();
 
  561    server_ = builder.BuildAndStart();
 
 
  567boost::asio::ip::tcp::endpoint
 
  572    return boost::asio::ip::tcp::endpoint(
 
 
  604    XRPL_ASSERT(!
running_, 
"ripple::GRPCServer::~GRPCServer : is not running");
 
 
  607boost::asio::ip::tcp::endpoint
 
constexpr char const * c_str() const
 
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
 
Stream trace() const
Severity stream access functions.
 
virtual Config & config()=0
 
virtual beast::Journal journal(std::string const &name)=0
 
virtual JobQueue & getJobQueue()=0
 
virtual Resource::Manager & getResourceManager()=0
 
virtual NetworkOPs & getOPs()=0
 
virtual LedgerMaster & getLedgerMaster()=0
 
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
 
Section & section(std::string const &name)
Returns the section with the given name.
 
Resource::Consumer getUsage()
 
std::optional< std::string > getUser()
 
grpc::ServerCompletionQueue & cq_
 
Resource::Charge getLoadType()
 
void setIsUnlimited(Response &response, bool isUnlimited)
 
std::optional< boost::asio::ip::address > getClientIpAddress()
 
BindListener< Request, Response > bindListener_
 
grpc::ServerAsyncResponseWriter< Response > responder_
 
Role getRole(bool isUnlimited)
 
std::shared_ptr< Processor > clone() override
 
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, Forward< Request, Response > forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector< boost::asio::ip::address > const &secureGatewayIPs)
 
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
 
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
 
virtual void process() override
 
virtual bool isFinished() override
 
std::string serverAddress_
 
std::vector< std::shared_ptr< Processor > > setupListeners()
 
std::unique_ptr< grpc::Server > server_
 
std::unique_ptr< grpc::ServerCompletionQueue > cq_
 
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
 
GRPCServerImpl(Application &app)
 
static unsigned constexpr apiVersion
 
boost::asio::ip::tcp::endpoint getEndpoint() const
 
std::vector< boost::asio::ip::address > secureGatewayIPs_
 
std::uint16_t serverPort_
 
boost::asio::ip::tcp::endpoint getEndpoint() const
 
std::shared_ptr< InfoSub > pointer
 
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
 
An endpoint that consumes resources.
 
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
 
Holds a collection of configuration values.
 
std::optional< T > get(std::string const &name) const
 
T emplace_back(T... args)
 
T find_first_of(T... args)
 
T find_last_of(T... args)
 
boost::asio::ip::tcp::endpoint to_asio_endpoint(Endpoint const &endpoint)
Convert to asio::ip::tcp::endpoint.
 
Endpoint from_asio(boost::asio::ip::address const &address)
Convert to Endpoint.
 
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
 
error_code_i conditionMet(Condition condition_required, T &context)
 
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
 
Charge const feeMediumBurdenRPC
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
 
std::pair< org::xrpl::rpc::v1::GetLedgerResponse, grpc::Status > doLedgerGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerRequest > &context)
 
std::pair< org::xrpl::rpc::v1::GetLedgerEntryResponse, grpc::Status > doLedgerEntryGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerEntryRequest > &context)
 
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
 
Role
Indicates the level of administrative permission to grant.
 
std::pair< org::xrpl::rpc::v1::GetLedgerDiffResponse, grpc::Status > doLedgerDiffGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDiffRequest > &context)
 
std::pair< org::xrpl::rpc::v1::GetLedgerDataResponse, grpc::Status > doLedgerDataGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDataRequest > &context)
 
Maps an rpc error code to its token, default message, and HTTP status.
 
Json::StaticString message