1#ifndef XRPL_PEERFINDER_LOGIC_H_INCLUDED 
    2#define XRPL_PEERFINDER_LOGIC_H_INCLUDED 
    4#include <xrpld/peerfinder/PeerfinderManager.h> 
    5#include <xrpld/peerfinder/detail/Bootcache.h> 
    6#include <xrpld/peerfinder/detail/Counts.h> 
    7#include <xrpld/peerfinder/detail/Fixed.h> 
    8#include <xrpld/peerfinder/detail/Handouts.h> 
    9#include <xrpld/peerfinder/detail/Livecache.h> 
   10#include <xrpld/peerfinder/detail/SlotImp.h> 
   11#include <xrpld/peerfinder/detail/Source.h> 
   12#include <xrpld/peerfinder/detail/Store.h> 
   13#include <xrpld/peerfinder/detail/iosformat.h> 
   15#include <xrpl/basics/Log.h> 
   16#include <xrpl/basics/contract.h> 
   17#include <xrpl/basics/random.h> 
   18#include <xrpl/beast/net/IPAddressConversion.h> 
   19#include <xrpl/beast/utility/WrappedSink.h> 
   34template <
class Checker>
 
  167        if (addresses.
empty())
 
  170                << 
"Could not resolve fixed slot '" << name << 
"'";
 
  174        for (
auto const& remote_address : addresses)
 
  176            if (remote_address.port() == 0)
 
  178                Throw<std::runtime_error>(
 
  179                    "Port not specified for address:" +
 
  180                    remote_address.to_string());
 
  183            auto result(
fixed_.emplace(
 
  192                    << 
"' at " << remote_address;
 
 
  205        boost::system::error_code ec)
 
  207        if (ec == boost::asio::error::operation_aborted)
 
  216                << 
beast::leftw(18) << 
"Logic tested " << checkedAddress
 
  217                << 
" but the connection was closed";
 
  232            JLOG(journal.error()) << 
"Logic testing " << iter->first
 
  233                                  << 
" with error, " << ec.message();
 
  240        JLOG(journal.debug())
 
  241            << 
"Logic testing " << checkedAddress << 
" succeeded";
 
 
  252            << 
beast::leftw(18) << 
"Logic accept" << remote_endpoint
 
  253            << 
" on local " << local_endpoint;
 
  258        if (is_public(remote_endpoint))
 
  266                    << remote_endpoint << 
" because of ip limits.";
 
  275                << 
beast::leftw(18) << 
"Logic dropping " << remote_endpoint
 
  276                << 
" as duplicate incoming";
 
  287        auto const result(
slots_.
emplace(slot->remote_endpoint(), slot));
 
  291            "ripple::PeerFinder::Logic::new_inbound_slot : remote endpoint " 
  299        return {result.first->second, Result::success};
 
 
  307            << 
beast::leftw(18) << 
"Logic connect " << remote_endpoint;
 
  315                << 
beast::leftw(18) << 
"Logic dropping " << remote_endpoint
 
  316                << 
" as duplicate connect";
 
  325        auto const result = 
slots_.
emplace(slot->remote_endpoint(), slot);
 
  329            "ripple::PeerFinder::Logic::new_outbound_slot : remote endpoint " 
  338        return {result.first->second, Result::success};
 
 
  349        JLOG(journal.trace()) << 
"Logic connected on local " << local_endpoint;
 
  356            "ripple::PeerFinder::Logic::onConnected : valid slot input");
 
  358        slot->local_endpoint(local_endpoint);
 
  366                    iter->second->local_endpoint() == slot->remote_endpoint(),
 
  367                    "ripple::PeerFinder::Logic::onConnected : local and remote " 
  368                    "endpoints do match");
 
  369                JLOG(journal.warn()) << 
"Logic dropping as self connect";
 
 
  387        JLOG(journal.debug())
 
  388            << 
"Logic handshake " << slot->remote_endpoint() << 
" with " 
  389            << (reserved ? 
"reserved " : 
"") << 
"key " << key;
 
  396            "ripple::PeerFinder::Logic::activate : valid slot input");
 
  400            "ripple::PeerFinder::Logic::activate : valid slot state");
 
  404            return Result::duplicatePeer;
 
  409        slot->reserved(reserved);
 
  415            if (!slot->inbound())
 
  418                return Result::inboundDisabled;
 
  424        slot->public_key(key);
 
  426            [[maybe_unused]] 
bool const inserted = 
keys_.insert(key).second;
 
  430                "ripple::PeerFinder::Logic::activate : public key inserted");
 
  438        if (!slot->inbound())
 
  442        if (slot->fixed() && !slot->inbound())
 
  444            auto iter(
fixed_.find(slot->remote_endpoint()));
 
  447                    "PeerFinder::Logic::activate(): remote_endpoint " 
  448                    "missing from fixed_");
 
  451            JLOG(journal.trace()) << 
"Logic fixed success";
 
  454        return Result::success;
 
 
  468        return std::move(h.
list());
 
 
  493        for (
auto const& s : 
slots_)
 
  545                    << ((h.
list().
size() > 1) ? 
"endpoints" : 
"endpoint");
 
  583                << ((h.
list().
size() > 1) ? 
"addresses" : 
"address");
 
 
  611                    [&slots](Slots::value_type 
const& value) {
 
  612                        if (value.second->state() == Slot::active)
 
  613                            slots.emplace_back(value.second);
 
  623                        targets.emplace_back(slot);
 
  652                for (
auto& t : targets)
 
  665            for (
auto const& t : targets)
 
  668                auto const& list = t.list();
 
  671                JLOG(journal.trace())
 
  672                    << 
"Logic sending " << list.size()
 
  673                    << ((list.size() == 1) ? 
" endpoint" : 
" endpoints");
 
 
  692        for (
auto const& entry : 
slots_)
 
  693            entry.second->expire();
 
 
  707        bool neighbor(
false);
 
  708        for (
auto iter = list.
begin(); iter != list.
end();)
 
  717                    << 
" for excess hops " << ep.
hops;
 
  718                iter = list.
erase(iter);
 
  730                        slot->remote_endpoint().at_port(ep.
address.
port());
 
  736                        << 
" for extra self";
 
  737                    iter = list.
erase(iter);
 
  746                                        << ep.
address << 
" as invalid";
 
  747                iter = list.
erase(iter);
 
  755                    [ep](Endpoints::value_type 
const& other) {
 
  756                        return ep.address == other.address;
 
  760                                        << ep.
address << 
" as duplicate";
 
  761                iter = list.
erase(iter);
 
 
  787        JLOG(journal.trace()) << 
"Endpoints contained " << list.
size()
 
  788                              << ((list.
size() > 1) ? 
" entries" : 
" entry");
 
  795            "ripple::PeerFinder::Logic::on_endpoints : valid slot input");
 
  800            "ripple::PeerFinder::Logic::on_endpoints : valid slot state");
 
  805        if (slot->whenAcceptEndpoints > now)
 
  810        for (
auto const& ep : list)
 
  814                "ripple::PeerFinder::Logic::on_endpoints : nonzero hops");
 
  816            slot->recent.insert(ep.address, ep.hops);
 
  823                if (slot->connectivityCheckInProgress)
 
  825                    JLOG(journal.debug()) << 
"Logic testing " << ep.address
 
  826                                          << 
" already in progress";
 
  833                    slot->connectivityCheckInProgress = 
true;
 
  843                            slot->remote_endpoint(),
 
  845                            std::placeholders::_1));
 
  856                if (!slot->canAccept)
 
 
  877            auto const iter = 
slots_.
find(slot->remote_endpoint());
 
  881                    "PeerFinder::Logic::remove(): remote_endpoint " 
  882                    "missing from slots_");
 
  890            auto const iter = 
keys_.find(*slot->public_key());
 
  892            if (iter == 
keys_.end())
 
  894                    "PeerFinder::Logic::remove(): public_key missing " 
  906                    "PeerFinder::Logic::remove(): remote_endpont " 
  907                    "address missing from connectedAddresses_");
 
 
  927        if (slot->fixed() && !slot->inbound() && slot->state() != 
Slot::active)
 
  929            auto iter(
fixed_.find(slot->remote_endpoint()));
 
  932                    "PeerFinder::Logic::on_closed(): remote_endpont " 
  933                    "missing from fixed_");
 
  936            JLOG(journal.debug()) << 
"Logic fixed failed";
 
  940        switch (slot->state())
 
  943                JLOG(journal.trace()) << 
"Logic accept failed";
 
  957                JLOG(journal.trace()) << 
"Logic close";
 
  961                JLOG(journal.trace()) << 
"Logic finished";
 
  967                    "ripple::PeerFinder::Logic::on_closed : invalid slot " 
 
  983    template <
class FwdIter>
 
  988        boost::asio::ip::tcp::endpoint 
const& remote_address);
 
  997        for (
auto const& entry : 
fixed_)
 
  998            if (entry.first == endpoint)
 
 
 1009        for (
auto const& entry : 
fixed_)
 
 1010            if (entry.first.address() == address)
 
 
 1022    template <
class Container>
 
 1030        for (
auto iter = 
fixed_.begin(); needed && iter != 
fixed_.end(); ++iter)
 
 1032            auto const& address(iter->first.address());
 
 1033            if (iter->second.when() <= now &&
 
 1034                squelches.
find(address) == squelches.
end() &&
 
 1038                    [address](Slots::value_type 
const& v) {
 
 1039                        return address == v.first.address();
 
 1042                squelches.
insert(iter->first.address());
 
 1043                c.push_back(iter->first);
 
 
 1077        for (
auto addr : list)
 
 
 1116                << 
beast::leftw(18) << 
"Logic added " << count << 
" new " 
 1117                << ((count == 1) ? 
"address" : 
"addresses") << 
" from " 
 1123                                    << 
"'" << source->name() << 
"' fetch, " 
 1124                                    << results.
error.message();
 
 
 1138        if (is_unspecified(address))
 
 1140        if (!is_public(address))
 
 1142        if (address.
port() == 0)
 
 
 1156        for (
auto const& entry : slots)
 
 1159            SlotImp const& slot(*entry.second);
 
 1164                item[
"inbound"] = 
"yes";
 
 1166                item[
"fixed"] = 
"yes";
 
 1168                item[
"reserved"] = 
"yes";
 
 
 1247template <
class Checker>
 
 1248template <
class FwdIter>
 
 1253    boost::asio::ip::tcp::endpoint 
const& remote_address)
 
 1261        JLOG(m_journal.trace()) << 
beast::leftw(18) << 
"Logic add " << n
 
 1262                                << 
" redirect IPs from " << remote_address;
 
 
 
 
A version-independent IP address and port combination.
 
Address const & address() const
Returns the address portion of this endpoint.
 
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
 
Port port() const
Returns the port number on the endpoint.
 
A generic endpoint for log messages.
 
Sink & sink() const
Returns the Sink associated with this Journal.
 
Wraps a Journal::Sink to prefix its output with a string.
 
typename Clock::time_point time_point
 
virtual time_point now() const =0
Returns the current time.
 
Associative container where each element is also indexed by time.
 
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
 
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
 
iterator find(K const &k)
 
Stores IP addresses useful for gaining initial connections.
 
map_type::size_type size() const
Returns the number of entries in the cache.
 
const_iterator end() const
 
void periodicActivity()
Stores the cache in the persistent database on a timer.
 
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
 
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
 
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
 
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
 
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
 
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
 
void load()
Load the persisted data from the Store into the container.
 
Tests remote listening sockets to make sure they are connectible.
 
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
 
Receives handouts for making automatic connections.
 
bool try_insert(beast::IP::Endpoint const &endpoint)
 
Manages the count of available connections for the various slots.
 
std::size_t fixed_active() const
Returns the number of active fixed connections.
 
int out_active() const
Returns the number of outbound peers assigned an open slot.
 
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
 
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
 
int out_max() const
Returns the total number of outbound slots.
 
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
 
int in_max() const
Returns the total number of inbound slots.
 
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
 
void onConfig(Config const &config)
Called when the config is set or changed.
 
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
 
std::size_t attempts() const
Returns the number of outbound connection attempts.
 
void shuffle()
Shuffle each hop list.
 
reverse_iterator rbegin()
 
The Livecache holds the short-lived relayed Endpoint messages.
 
void expire()
Erase entries whose time has expired.
 
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
 
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
 
class ripple::PeerFinder::Livecache::hops_t hops
 
The Logic for maintaining the list of Slot addresses.
 
void stop()
Stop the logic.
 
bool fixed(beast::IP::Endpoint const &endpoint) const
 
int addBootcacheAddresses(IPAddresses const &list)
 
void onWrite(beast::PropertyStream::Map &map)
 
std::recursive_mutex lock_
 
void fetch(std::shared_ptr< Source > const &source)
 
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
 
bool is_valid_address(beast::IP::Endpoint const &address)
 
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
 
void on_closed(SlotImp::ptr const &slot)
 
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
 
void on_failure(SlotImp::ptr const &slot)
 
std::set< PublicKey > keys_
 
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
 
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
 
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
 
bool fixed(beast::IP::Address const &address) const
 
std::shared_ptr< Source > fetchSource_
 
void remove(SlotImp::ptr const &slot)
 
std::pair< SlotImp::ptr, Result > new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
 
clock_type::time_point m_whenBroadcast
 
std::multiset< beast::IP::Address > connectedAddresses_
 
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
 
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
 
std::vector< std::shared_ptr< Source > > m_sources
 
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
 
void addSource(std::shared_ptr< Source > const &source)
 
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
 
void addStaticSource(std::shared_ptr< Source > const &source)
 
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
 
Counts const & counts() const
 
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
 
std::pair< SlotImp::ptr, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
 
ConnectHandouts::Squelches m_squelches
 
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
 
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
 
std::map< beast::IP::Endpoint, Fixed > fixed_
 
void config(Config const &c)
 
static std::string stateString(Slot::State state)
 
Receives handouts for redirecting a connection.
 
std::vector< Endpoint > & list()
 
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
 
bool fixed() const override
Returns true if this is a fixed connection.
 
bool inbound() const override
Returns true if this is an inbound connection.
 
bool connectivityCheckInProgress
 
std::shared_ptr< SlotImp > ptr
 
void set_listening_port(std::uint16_t port)
 
State state() const override
Returns the state of the connection.
 
std::string prefix() const
 
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
 
bool reserved() const override
Returns true if this is a reserved connection.
 
Abstract persistence for PeerFinder data.
 
T forward_as_tuple(T... args)
 
boost::asio::ip::address_v6 AddressV6
 
boost::asio::ip::address Address
 
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
 
std::chrono::seconds constexpr secondsPerMessage(151)
 
std::uint32_t constexpr numberOfEndpointsMax
 
std::chrono::seconds constexpr recentAttemptDuration(60)
 
std::uint32_t constexpr maxHops
 
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
 
Result
Possible results from activating a slot.
 
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
 
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
 
beast::xor_shift_engine & default_prng()
Return the default random engine.
 
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
 
Left justifies a field at the specified width.
 
PeerFinder configuration settings.
 
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
 
bool autoConnect
true if we want to establish connections automatically
 
int ipLimit
Limit how many incoming connections we allow per IP.
 
bool wantIncoming
true if we want to accept incoming connections.
 
std::uint16_t listeningPort
The listening port number.
 
Describes a connectible peer address along with some metadata.
 
beast::IP::Endpoint address
 
boost::system::error_code error