3#include <xrpld/peerfinder/PeerfinderManager.h>
4#include <xrpld/peerfinder/detail/Bootcache.h>
5#include <xrpld/peerfinder/detail/Counts.h>
6#include <xrpld/peerfinder/detail/Fixed.h>
7#include <xrpld/peerfinder/detail/Handouts.h>
8#include <xrpld/peerfinder/detail/Livecache.h>
9#include <xrpld/peerfinder/detail/SlotImp.h>
10#include <xrpld/peerfinder/detail/Source.h>
11#include <xrpld/peerfinder/detail/Store.h>
12#include <xrpld/peerfinder/detail/iosformat.h>
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/contract.h>
16#include <xrpl/basics/random.h>
17#include <xrpl/beast/net/IPAddressConversion.h>
18#include <xrpl/beast/utility/WrappedSink.h>
33template <
class Checker>
160 if (addresses.
empty())
162 JLOG(
m_journal.
info()) <<
"Could not resolve fixed slot '" << name <<
"'";
166 for (
auto const& remote_address : addresses)
168 if (remote_address.port() == 0)
170 Throw<std::runtime_error>(
171 "Port not specified for address:" + remote_address.to_string());
174 auto result(
fixed_.emplace(
182 <<
beast::leftw(18) <<
"Logic add fixed '" << name <<
"' at " << remote_address;
195 boost::system::error_code ec)
197 if (ec == boost::asio::error::operation_aborted)
206 <<
" but the connection was closed";
221 JLOG(journal.error()) <<
"Logic testing " << iter->first <<
" with error, "
229 JLOG(journal.debug()) <<
"Logic testing " << checkedAddress <<
" succeeded";
240 <<
" on local " << local_endpoint;
245 if (is_public(remote_endpoint))
251 << remote_endpoint <<
" because of ip limits.";
260 <<
" as duplicate incoming";
269 auto const result(
slots_.
emplace(slot->remote_endpoint(), slot));
273 "xrpl::PeerFinder::Logic::new_inbound_slot : remote endpoint "
281 return {result.first->second, Result::success};
296 <<
" as duplicate connect";
305 auto const result =
slots_.
emplace(slot->remote_endpoint(), slot);
309 "xrpl::PeerFinder::Logic::new_outbound_slot : remote endpoint "
318 return {result.first->second, Result::success};
327 JLOG(journal.trace()) <<
"Logic connected on local " << local_endpoint;
334 "xrpl::PeerFinder::Logic::onConnected : valid slot input");
336 slot->local_endpoint(local_endpoint);
344 iter->second->local_endpoint() == slot->remote_endpoint(),
345 "xrpl::PeerFinder::Logic::onConnected : local and remote "
346 "endpoints do match");
347 JLOG(journal.warn()) <<
"Logic dropping as self connect";
365 JLOG(journal.debug()) <<
"Logic handshake " << slot->remote_endpoint() <<
" with "
366 << (reserved ?
"reserved " :
"") <<
"key " << key;
373 "xrpl::PeerFinder::Logic::activate : valid slot input");
377 "xrpl::PeerFinder::Logic::activate : valid slot state");
381 return Result::duplicatePeer;
386 slot->reserved(reserved);
392 if (!slot->inbound())
395 return Result::inboundDisabled;
401 slot->public_key(key);
403 [[maybe_unused]]
bool const inserted =
keys_.insert(key).second;
405 XRPL_ASSERT(inserted,
"xrpl::PeerFinder::Logic::activate : public key inserted");
413 if (!slot->inbound())
417 if (slot->fixed() && !slot->inbound())
419 auto iter(
fixed_.find(slot->remote_endpoint()));
422 "PeerFinder::Logic::activate(): remote_endpoint "
423 "missing from fixed_");
426 JLOG(journal.trace()) <<
"Logic fixed success";
429 return Result::success;
443 return std::move(h.
list());
468 for (
auto const& s :
slots_)
516 << ((h.
list().
size() > 1) ?
"endpoints" :
"endpoint");
550 << ((h.
list().
size() > 1) ?
"addresses" :
"address");
576 if (value.second->state() == Slot::active)
577 slots.emplace_back(value.second);
584 targets.emplace_back(slot);
613 for (
auto& t : targets)
622 for (
auto const& t : targets)
625 auto const& list = t.list();
628 JLOG(journal.trace()) <<
"Logic sending " << list.size()
629 << ((list.size() == 1) ?
" endpoint" :
" endpoints");
648 for (
auto const& entry :
slots_)
649 entry.second->expire();
663 bool neighbor(
false);
664 for (
auto iter = list.
begin(); iter != list.
end();)
672 <<
" for excess hops " << ep.
hops;
673 iter = list.
erase(iter);
690 iter = list.
erase(iter);
700 iter = list.
erase(iter);
705 if (
std::any_of(list.
begin(), iter, [ep](Endpoints::value_type
const& other) {
706 return ep.address == other.address;
711 iter = list.
erase(iter);
737 JLOG(journal.trace()) <<
"Endpoints contained " << list.
size()
738 << ((list.
size() > 1) ?
" entries" :
" entry");
745 "xrpl::PeerFinder::Logic::on_endpoints : valid slot input");
750 "xrpl::PeerFinder::Logic::on_endpoints : valid slot state");
755 if (slot->whenAcceptEndpoints > now)
760 for (
auto const& ep : list)
762 XRPL_ASSERT(ep.hops,
"xrpl::PeerFinder::Logic::on_endpoints : nonzero hops");
764 slot->recent.insert(ep.address, ep.hops);
771 if (slot->connectivityCheckInProgress)
773 JLOG(journal.debug())
774 <<
"Logic testing " << ep.address <<
" already in progress";
781 slot->connectivityCheckInProgress =
true;
791 slot->remote_endpoint(),
793 std::placeholders::_1));
804 if (!slot->canAccept)
825 auto const iter =
slots_.
find(slot->remote_endpoint());
829 "PeerFinder::Logic::remove(): remote_endpoint "
830 "missing from slots_");
838 auto const iter =
keys_.find(*slot->public_key());
840 if (iter ==
keys_.end())
842 "PeerFinder::Logic::remove(): public_key missing "
853 "PeerFinder::Logic::remove(): remote_endpoint "
854 "address missing from connectedAddresses_");
874 if (slot->fixed() && !slot->inbound() && slot->state() !=
Slot::active)
876 auto iter(
fixed_.find(slot->remote_endpoint()));
879 "PeerFinder::Logic::on_closed(): remote_endpoint "
880 "missing from fixed_");
883 JLOG(journal.debug()) <<
"Logic fixed failed";
887 switch (slot->state())
890 JLOG(journal.trace()) <<
"Logic accept failed";
904 JLOG(journal.trace()) <<
"Logic close";
908 JLOG(journal.trace()) <<
"Logic finished";
914 "xrpl::PeerFinder::Logic::on_closed : invalid slot "
930 template <
class FwdIter>
932 onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint
const& remote_address);
941 for (
auto const& entry :
fixed_)
942 if (entry.first == endpoint)
953 for (
auto const& entry :
fixed_)
954 if (entry.first.address() == address)
966 template <
class Container>
971 for (
auto iter =
fixed_.begin(); needed && iter !=
fixed_.end(); ++iter)
973 auto const& address(iter->first.address());
974 if (iter->second.when() <= now && squelches.
find(address) == squelches.
end() &&
976 return address == v.first.address();
979 squelches.
insert(iter->first.address());
980 c.push_back(iter->first);
1014 for (
auto const& addr : list)
1053 <<
beast::leftw(18) <<
"Logic added " << count <<
" new "
1054 << ((count == 1) ?
"address" :
"addresses") <<
" from " << source->name();
1060 <<
"'" << source->name() <<
"' fetch, " << results.
error.message();
1074 if (is_unspecified(address))
1076 if (!is_public(address))
1078 if (address.
port() == 0)
1092 for (
auto const& entry : slots)
1095 SlotImp const& slot(*entry.second);
1100 item[
"inbound"] =
"yes";
1102 item[
"fixed"] =
"yes";
1104 item[
"reserved"] =
"yes";
1183template <
class Checker>
1184template <
class FwdIter>
1189 boost::asio::ip::tcp::endpoint
const& remote_address)
1197 JLOG(m_journal.trace()) <<
beast::leftw(18) <<
"Logic add " << n <<
" redirect IPs from "
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Port port() const
Returns the port number on the endpoint.
A generic endpoint for log messages.
Sink & sink() const
Returns the Sink associated with this Journal.
Wraps a Journal::Sink to prefix its output with a string.
typename Clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
iterator find(K const &k)
Stores IP addresses useful for gaining initial connections.
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
void periodicActivity()
Stores the cache in the persistent database on a timer.
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
void load()
Load the persisted data from the Store into the container.
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
const_iterator end() const
map_type::size_type size() const
Returns the number of entries in the cache.
Tests remote listening sockets to make sure they are connectable.
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
Receives handouts for making automatic connections.
bool try_insert(beast::IP::Endpoint const &endpoint)
Manages the count of available connections for the various slots.
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
std::size_t attempts() const
Returns the number of outbound connection attempts.
int out_max() const
Returns the total number of outbound slots.
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
int in_max() const
Returns the total number of inbound slots.
std::size_t fixed_active() const
Returns the number of active fixed connections.
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
void onConfig(Config const &config)
Called when the config is set or changed.
int out_active() const
Returns the number of outbound peers assigned an open slot.
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
void shuffle()
Shuffle each hop list.
reverse_iterator rbegin()
The Livecache holds the short-lived relayed Endpoint messages.
class xrpl::PeerFinder::Livecache::hops_t hops
void expire()
Erase entries whose time has expired.
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
The Logic for maintaining the list of Slot addresses.
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
void on_closed(SlotImp::ptr const &slot)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::shared_ptr< Source > fetchSource_
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
void config(Config const &c)
std::recursive_mutex lock_
ConnectHandouts::Squelches m_squelches
static std::string stateString(Slot::State state)
std::pair< SlotImp::ptr, Result > new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
void fetch(std::shared_ptr< Source > const &source)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
void addSource(std::shared_ptr< Source > const &source)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
bool is_valid_address(beast::IP::Endpoint const &address)
void stop()
Stop the logic.
std::vector< std::shared_ptr< Source > > m_sources
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
Counts const & counts() const
void remove(SlotImp::ptr const &slot)
void addStaticSource(std::shared_ptr< Source > const &source)
std::multiset< beast::IP::Address > connectedAddresses_
std::pair< SlotImp::ptr, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
clock_type::time_point m_whenBroadcast
std::set< PublicKey > keys_
std::map< beast::IP::Endpoint, Fixed > fixed_
void on_failure(SlotImp::ptr const &slot)
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onWrite(beast::PropertyStream::Map &map)
bool fixed(beast::IP::Address const &address) const
Receives handouts for redirecting a connection.
std::vector< Endpoint > & list()
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
bool fixed() const override
Returns true if this is a fixed connection.
void set_listening_port(std::uint16_t port)
std::shared_ptr< SlotImp > ptr
bool reserved() const override
Returns true if this is a reserved connection.
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
std::string prefix() const
State state() const override
Returns the state of the connection.
bool connectivityCheckInProgress
bool inbound() const override
Returns true if this is an inbound connection.
Abstract persistence for PeerFinder data.
T forward_as_tuple(T... args)
boost::asio::ip::address_v6 AddressV6
boost::asio::ip::address Address
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
std::uint32_t constexpr numberOfEndpointsMax
std::uint32_t constexpr maxHops
std::chrono::seconds constexpr recentAttemptDuration(60)
std::chrono::seconds constexpr secondsPerMessage(151)
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
Result
Possible results from activating a slot.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & default_prng()
Return the default random engine.
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
PeerFinder configuration settings.
int ipLimit
Limit how many incoming connections we allow per IP.
void onWrite(beast::PropertyStream::Map &map) const
Write the configuration into a property stream.
bool wantIncoming
true if we want to accept incoming connections.
bool autoConnect
true if we want to establish connections automatically
std::uint16_t listeningPort
The listening port number.
Describes a connectable peer address along with some metadata.
beast::IP::Endpoint address
boost::system::error_code error