3#include <xrpld/peerfinder/PeerfinderManager.h>
4#include <xrpld/peerfinder/detail/Bootcache.h>
5#include <xrpld/peerfinder/detail/Counts.h>
6#include <xrpld/peerfinder/detail/Fixed.h>
7#include <xrpld/peerfinder/detail/Handouts.h>
8#include <xrpld/peerfinder/detail/Livecache.h>
9#include <xrpld/peerfinder/detail/SlotImp.h>
10#include <xrpld/peerfinder/detail/Source.h>
11#include <xrpld/peerfinder/detail/Store.h>
12#include <xrpld/peerfinder/detail/iosformat.h>
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/contract.h>
16#include <xrpl/basics/random.h>
17#include <xrpl/beast/net/IPAddressConversion.h>
18#include <xrpl/beast/utility/WrappedSink.h>
33template <
class Checker>
160 if (addresses.
empty())
162 JLOG(
m_journal.
info()) <<
"Could not resolve fixed slot '" << name <<
"'";
166 for (
auto const& remote_address : addresses)
168 if (remote_address.port() == 0)
170 Throw<std::runtime_error>(
"Port not specified for address:" + remote_address.to_string());
173 auto result(
fixed_.emplace(
191 boost::system::error_code ec)
193 if (ec == boost::asio::error::operation_aborted)
202 <<
" but the connection was closed";
217 JLOG(journal.error()) <<
"Logic testing " << iter->first <<
" with error, " << ec.message();
224 JLOG(journal.debug()) <<
"Logic testing " << checkedAddress <<
" succeeded";
238 if (is_public(remote_endpoint))
244 <<
" because of ip limits.";
253 <<
" as duplicate incoming";
261 auto const result(
slots_.
emplace(slot->remote_endpoint(), slot));
265 "xrpl::PeerFinder::Logic::new_inbound_slot : remote endpoint "
273 return {result.first->second, Result::success};
288 <<
" as duplicate connect";
296 auto const result =
slots_.
emplace(slot->remote_endpoint(), slot);
300 "xrpl::PeerFinder::Logic::new_outbound_slot : remote endpoint "
309 return {result.first->second, Result::success};
318 JLOG(journal.trace()) <<
"Logic connected on local " << local_endpoint;
325 "xrpl::PeerFinder::Logic::onConnected : valid slot input");
327 slot->local_endpoint(local_endpoint);
335 iter->second->local_endpoint() == slot->remote_endpoint(),
336 "xrpl::PeerFinder::Logic::onConnected : local and remote "
337 "endpoints do match");
338 JLOG(journal.warn()) <<
"Logic dropping as self connect";
356 JLOG(journal.debug()) <<
"Logic handshake " << slot->remote_endpoint() <<
" with "
357 << (reserved ?
"reserved " :
"") <<
"key " << key;
364 "xrpl::PeerFinder::Logic::activate : valid slot input");
368 "xrpl::PeerFinder::Logic::activate : valid slot state");
372 return Result::duplicatePeer;
377 slot->reserved(reserved);
383 if (!slot->inbound())
386 return Result::inboundDisabled;
392 slot->public_key(key);
394 [[maybe_unused]]
bool const inserted =
keys_.insert(key).second;
396 XRPL_ASSERT(inserted,
"xrpl::PeerFinder::Logic::activate : public key inserted");
404 if (!slot->inbound())
408 if (slot->fixed() && !slot->inbound())
410 auto iter(
fixed_.find(slot->remote_endpoint()));
413 "PeerFinder::Logic::activate(): remote_endpoint "
414 "missing from fixed_");
417 JLOG(journal.trace()) <<
"Logic fixed success";
420 return Result::success;
434 return std::move(h.
list());
459 for (
auto const& s :
slots_)
504 << ((h.
list().
size() > 1) ?
"endpoints" :
"endpoint");
536 << ((h.
list().
size() > 1) ?
"addresses" :
"address");
561 if (value.second->state() == Slot::active)
562 slots.emplace_back(value.second);
596 for (
auto& t : targets)
605 for (
auto const& t : targets)
608 auto const& list = t.list();
611 JLOG(journal.trace()) <<
"Logic sending " << list.size()
612 << ((list.size() == 1) ?
" endpoint" :
" endpoints");
631 for (
auto const& entry :
slots_)
632 entry.second->expire();
646 bool neighbor(
false);
647 for (
auto iter = list.
begin(); iter != list.
end();)
656 iter = list.
erase(iter);
672 iter = list.
erase(iter);
681 iter = list.
erase(iter);
686 if (
std::any_of(list.
begin(), iter, [ep](Endpoints::value_type
const& other) {
687 return ep.address == other.address;
691 iter = list.
erase(iter);
717 JLOG(journal.trace()) <<
"Endpoints contained " << list.
size() << ((list.
size() > 1) ?
" entries" :
" entry");
724 "xrpl::PeerFinder::Logic::on_endpoints : valid slot input");
727 XRPL_ASSERT(slot->state() ==
Slot::active,
"xrpl::PeerFinder::Logic::on_endpoints : valid slot state");
732 if (slot->whenAcceptEndpoints > now)
737 for (
auto const& ep : list)
739 XRPL_ASSERT(ep.hops,
"xrpl::PeerFinder::Logic::on_endpoints : nonzero hops");
741 slot->recent.insert(ep.address, ep.hops);
748 if (slot->connectivityCheckInProgress)
750 JLOG(journal.debug()) <<
"Logic testing " << ep.address <<
" already in progress";
757 slot->connectivityCheckInProgress =
true;
776 if (!slot->canAccept)
797 auto const iter =
slots_.
find(slot->remote_endpoint());
801 "PeerFinder::Logic::remove(): remote_endpoint "
802 "missing from slots_");
810 auto const iter =
keys_.find(*slot->public_key());
812 if (iter ==
keys_.end())
814 "PeerFinder::Logic::remove(): public_key missing "
825 "PeerFinder::Logic::remove(): remote_endpoint "
826 "address missing from connectedAddresses_");
846 if (slot->fixed() && !slot->inbound() && slot->state() !=
Slot::active)
848 auto iter(
fixed_.find(slot->remote_endpoint()));
851 "PeerFinder::Logic::on_closed(): remote_endpoint "
852 "missing from fixed_");
855 JLOG(journal.debug()) <<
"Logic fixed failed";
859 switch (slot->state())
862 JLOG(journal.trace()) <<
"Logic accept failed";
876 JLOG(journal.trace()) <<
"Logic close";
880 JLOG(journal.trace()) <<
"Logic finished";
886 "xrpl::PeerFinder::Logic::on_closed : invalid slot "
902 template <
class FwdIter>
904 onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint
const& remote_address);
913 for (
auto const& entry :
fixed_)
914 if (entry.first == endpoint)
925 for (
auto const& entry :
fixed_)
926 if (entry.first.address() == address)
938 template <
class Container>
943 for (
auto iter =
fixed_.begin(); needed && iter !=
fixed_.end(); ++iter)
945 auto const& address(iter->first.address());
946 if (iter->second.when() <= now && squelches.
find(address) == squelches.
end() &&
948 return address == v.first.address();
951 squelches.
insert(iter->first.address());
952 c.push_back(iter->first);
986 for (
auto addr : list)
1025 << ((count == 1) ?
"address" :
"addresses") <<
" from " << source->name();
1030 <<
"'" << source->name() <<
"' fetch, " << results.
error.message();
1044 if (is_unspecified(address))
1046 if (!is_public(address))
1048 if (address.
port() == 0)
1062 for (
auto const& entry : slots)
1065 SlotImp const& slot(*entry.second);
1070 item[
"inbound"] =
"yes";
1072 item[
"fixed"] =
"yes";
1074 item[
"reserved"] =
"yes";
1153template <
class Checker>
1154template <
class FwdIter>
1164 JLOG(m_journal.trace()) <<
beast::leftw(18) <<
"Logic add " << n <<
" redirect IPs from " << remote_address;
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Port port() const
Returns the port number on the endpoint.
A generic endpoint for log messages.
Sink & sink() const
Returns the Sink associated with this Journal.
Wraps a Journal::Sink to prefix its output with a string.
typename Clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
iterator find(K const &k)
Stores IP addresses useful for gaining initial connections.
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
void periodicActivity()
Stores the cache in the persistent database on a timer.
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
void load()
Load the persisted data from the Store into the container.
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
const_iterator end() const
map_type::size_type size() const
Returns the number of entries in the cache.
Tests remote listening sockets to make sure they are connectable.
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
Receives handouts for making automatic connections.
bool try_insert(beast::IP::Endpoint const &endpoint)
Manages the count of available connections for the various slots.
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
std::size_t attempts() const
Returns the number of outbound connection attempts.
int out_max() const
Returns the total number of outbound slots.
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
int in_max() const
Returns the total number of inbound slots.
std::size_t fixed_active() const
Returns the number of active fixed connections.
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
void onConfig(Config const &config)
Called when the config is set or changed.
int out_active() const
Returns the number of outbound peers assigned an open slot.
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
void shuffle()
Shuffle each hop list.
reverse_iterator rbegin()
The Livecache holds the short-lived relayed Endpoint messages.
class xrpl::PeerFinder::Livecache::hops_t hops
void expire()
Erase entries whose time has expired.
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
The Logic for maintaining the list of Slot addresses.
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
void on_closed(SlotImp::ptr const &slot)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::shared_ptr< Source > fetchSource_
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
void config(Config const &c)
std::recursive_mutex lock_
ConnectHandouts::Squelches m_squelches
static std::string stateString(Slot::State state)
std::pair< SlotImp::ptr, Result > new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
void fetch(std::shared_ptr< Source > const &source)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
void addSource(std::shared_ptr< Source > const &source)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
bool is_valid_address(beast::IP::Endpoint const &address)
void stop()
Stop the logic.
std::vector< std::shared_ptr< Source > > m_sources
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
Counts const & counts() const
void remove(SlotImp::ptr const &slot)
void addStaticSource(std::shared_ptr< Source > const &source)
std::multiset< beast::IP::Address > connectedAddresses_
std::pair< SlotImp::ptr, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
clock_type::time_point m_whenBroadcast
std::set< PublicKey > keys_
std::map< beast::IP::Endpoint, Fixed > fixed_
void on_failure(SlotImp::ptr const &slot)
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onWrite(beast::PropertyStream::Map &map)
bool fixed(beast::IP::Address const &address) const
Receives handouts for redirecting a connection.
std::vector< Endpoint > & list()
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
bool fixed() const override
Returns true if this is a fixed connection.
void set_listening_port(std::uint16_t port)
std::shared_ptr< SlotImp > ptr
bool reserved() const override
Returns true if this is a reserved connection.
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
std::string prefix() const
State state() const override
Returns the state of the connection.
bool connectivityCheckInProgress
bool inbound() const override
Returns true if this is an inbound connection.
Abstract persistence for PeerFinder data.
T forward_as_tuple(T... args)
boost::asio::ip::address_v6 AddressV6
boost::asio::ip::address Address
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
std::uint32_t constexpr numberOfEndpointsMax
std::uint32_t constexpr maxHops
std::chrono::seconds constexpr recentAttemptDuration(60)
std::chrono::seconds constexpr secondsPerMessage(151)
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
Result
Possible results from activating a slot.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & default_prng()
Return the default random engine.
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
PeerFinder configuration settings.
int ipLimit
Limit how many incoming connections we allow per IP.
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
bool wantIncoming
true if we want to accept incoming connections.
bool autoConnect
true if we want to establish connections automatically
std::uint16_t listeningPort
The listening port number.
Describes a connectable peer address along with some metadata.
beast::IP::Endpoint address
boost::system::error_code error