1#include <xrpld/overlay/Cluster.h> 
    2#include <xrpld/overlay/detail/ConnectAttempt.h> 
    3#include <xrpld/overlay/detail/PeerImp.h> 
    4#include <xrpld/overlay/detail/ProtocolVersion.h> 
    6#include <xrpl/json/json_reader.h> 
   14    boost::asio::io_context& io_context,
 
   27    , remote_endpoint_(remote_endpoint)
 
   29    , strand_(
boost::asio::make_strand(io_context))
 
   31    , stepTimer_(io_context)
 
   35    , socket_(stream_ptr_->next_layer().socket())
 
   36    , stream_(*stream_ptr_)
 
 
   52    if (!
strand_.running_in_this_thread())
 
   53        return boost::asio::post(
 
 
   67    if (!
strand_.running_in_this_thread())
 
   68        return boost::asio::post(
 
   78    stream_.next_layer().async_connect(
 
   80        boost::asio::bind_executor(
 
   85                std::placeholders::_1)));
 
 
   94        strand_.running_in_this_thread(),
 
   95        "ripple::ConnectAttempt::shutdown: strand in this thread");
 
  101    boost::beast::get_lowest_layer(
stream_).cancel();
 
 
  110        strand_.running_in_this_thread(),
 
  111        "ripple::ConnectAttempt::tryAsyncShutdown : strand in this thread");
 
  124        return stream_.async_shutdown(bind_executor(
 
  129                std::placeholders::_1)));
 
 
  149            (ec != boost::asio::error::eof &&
 
  150             ec != boost::asio::error::operation_aborted &&
 
  151             ec.message().find(
"application data after close notify") ==
 
 
  167        strand_.running_in_this_thread(),
 
  168        "ripple::ConnectAttempt::close : strand in this thread");
 
 
  198    if (
timer_.expiry() == std::chrono::steady_clock::time_point{})
 
  203            timer_.async_wait(boost::asio::bind_executor(
 
  208                    std::placeholders::_1)));
 
  245        stepTimer_.async_wait(boost::asio::bind_executor(
 
  250                std::placeholders::_1)));
 
  253                               << 
" timeout=" << stepTimeout.
count() << 
"s";
 
 
  271    catch (boost::system::system_error 
const&)
 
 
  286        if (ec == boost::asio::error::operation_aborted)
 
  296    bool globalExpired = (
timer_.expiry() <= now);
 
  297    bool stepExpired = (
stepTimer_.expiry() <= now);
 
  304    else if (stepExpired)
 
  311        JLOG(
journal_.
warn()) << 
"onTimer: Unexpected timer callback";
 
 
  324        if (ec == boost::asio::error::operation_aborted)
 
  327        return fail(
"onConnect", ec);
 
  336        return fail(
"onConnect", ec);
 
  345    stream_.set_verify_mode(boost::asio::ssl::verify_none);
 
  347        boost::asio::ssl::stream_base::client,
 
  348        boost::asio::bind_executor(
 
  353                std::placeholders::_1)));
 
 
  363        if (ec == boost::asio::error::operation_aborted)
 
  366        return fail(
"onHandshake", ec);
 
  369    auto const local_endpoint = 
socket_.local_endpoint(ec);
 
  371        return fail(
"onHandshake", ec);
 
  378        return fail(
"Self connection");
 
  404    boost::beast::http::async_write(
 
  407        boost::asio::bind_executor(
 
  412                std::placeholders::_1)));
 
 
  422        if (ec == boost::asio::error::operation_aborted)
 
  425        return fail(
"onWrite", ec);
 
  435    boost::beast::http::async_read(
 
  439        boost::asio::bind_executor(
 
  444                std::placeholders::_1)));
 
 
  456        if (ec == boost::asio::error::eof)
 
  462        if (ec == boost::asio::error::operation_aborted)
 
  465        return fail(
"onRead", ec);
 
 
  484            boost::beast::http::status::service_unavailable)
 
  487                << 
"Unable to upgrade to peer protocol: " << 
response_.result()
 
  496        for (
auto const buffer : 
response_.body().data())
 
  498                static_cast<char const*
>(buffer.data()),
 
  499                boost::asio::buffer_size(buffer));
 
  503        auto const isValidJson = reader.
parse(responseBody, json);
 
  506        auto const isRedirect =
 
  513                << 
" failed to upgrade to peer protocol: " << 
response_.result()
 
  521            return fail(
"processResponse: invalid peer-ips format");
 
  527        for (
auto const& ipValue : peerIps)
 
  529            if (!ipValue.isString())
 
  541        return fail(
"processResponse: failed to connect to peer: redirected");
 
  552            negotiatedProtocol = pvs[0];
 
  554        if (!negotiatedProtocol)
 
  556                "processResponse: Unable to negotiate protocol version");
 
  576            << 
"Protocol: " << 
to_string(*negotiatedProtocol);
 
  587            slot_, publicKey, member.has_value());
 
  588        if (result != PeerFinder::Result::success)
 
 
Unserialize a JSON document into a Value.
 
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
 
UInt size() const
Number of values in array or object.
 
bool isMember(char const *key) const
Return true if the object has a member named key.
 
A generic endpoint for log messages.
 
Stream trace() const
Severity stream access functions.
 
virtual Config & config()=0
 
virtual Cluster & cluster()=0
 
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
 
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
 
bool TX_REDUCE_RELAY_ENABLE
 
static constexpr std::chrono::seconds connectTimeout
 
void stop() override
Stop the connection attempt.
 
boost::asio::ip::tcp::socket socket_type
 
void cancelTimer()
Cancel both global and step timers.
 
boost::system::error_code error_code
 
std::unique_ptr< stream_type > stream_ptr_
 
std::shared_ptr< PeerFinder::Slot > slot_
 
ConnectAttempt(Application &app, boost::asio::io_context &io_context, endpoint_type const &remote_endpoint, Resource::Consumer usage, shared_context const &context, Peer::id_t id, std::shared_ptr< PeerFinder::Slot > const &slot, beast::Journal journal, OverlayImpl &overlay)
Construct a new ConnectAttempt object.
 
ConnectionStep currentStep_
 
Resource::Consumer usage_
 
boost::asio::strand< boost::asio::io_context::executor_type > strand_
 
static std::string stepToString(ConnectionStep step)
 
void run()
Begin the connection attempt.
 
void onHandshake(error_code ec)
 
boost::asio::ip::tcp::endpoint endpoint_type
 
void onWrite(error_code ec)
 
void processResponse()
Process the HTTP upgrade response from peer.
 
void onTimer(error_code ec)
Handle timer expiration events.
 
void setTimer(ConnectionStep step)
Set timers for the specified connection step.
 
void onShutdown(error_code ec)
 
boost::beast::ssl_stream< middle_type > stream_type
 
boost::beast::multi_buffer read_buf_
 
void onConnect(error_code ec)
 
void fail(std::string const &reason)
 
void onRead(error_code ec)
 
static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const &s, boost::system::error_code &ec)
 
beast::Journal const journal_
 
ConnectionStep
Represents the current phase of the connection establishment process.
 
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
 
endpoint_type remote_endpoint_
 
boost::asio::basic_waitable_timer< std::chrono::steady_clock > stepTimer_
 
PeerFinder::Manager & peerFinder()
 
void add_active(std::shared_ptr< PeerImp > const &peer)
 
static bool isPeerUpgrade(http_request_type const &request)
 
Setup const & setup() const
 
virtual bool onConnected(std::shared_ptr< Slot > const &slot, beast::IP::Endpoint const &local_endpoint)=0
Called when an outbound connection attempt succeeds.
 
virtual Config config()=0
Returns the configuration for the manager.
 
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
 
virtual void onRedirects(boost::asio::ip::tcp::endpoint const &remote_address, std::vector< boost::asio::ip::tcp::endpoint > const &eps)=0
Called when we received redirect IPs from a busy peer.
 
virtual Result activate(std::shared_ptr< Slot > const &slot, PublicKey const &key, bool reserved)=0
Request an active slot type.
 
An endpoint that consumes resources.
 
void setPublicKey(PublicKey const &publicKey)
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
 
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
 
void buildHandshake(boost::beast::http::fields &h, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application &app)
Insert fields headers necessary for upgrading the link to the peer protocol.
 
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
 
std::string to_string(base_uint< Bits, Tag > const &a)
 
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
 
bool isProtocolSupported(ProtocolVersion const &v)
Determine whether we support a specific protocol version.
 
PublicKey verifyHandshake(boost::beast::http::fields const &headers, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
 
T shared_from_this(T... args)
 
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
 
static constexpr std::chrono::seconds tcpConnect
 
static constexpr std::chrono::seconds tlsShutdown
 
static constexpr std::chrono::seconds tlsHandshake
 
static constexpr std::chrono::seconds httpWrite
 
static constexpr std::chrono::seconds httpRead
 
beast::IP::Address public_ip
 
std::optional< std::uint32_t > networkID
 
bool peerPrivate
true if we want our IP address kept private.