3#include <xrpld/peerfinder/PeerfinderManager.h>
4#include <xrpld/peerfinder/detail/Bootcache.h>
5#include <xrpld/peerfinder/detail/Counts.h>
6#include <xrpld/peerfinder/detail/Fixed.h>
7#include <xrpld/peerfinder/detail/Handouts.h>
8#include <xrpld/peerfinder/detail/Livecache.h>
9#include <xrpld/peerfinder/detail/SlotImp.h>
10#include <xrpld/peerfinder/detail/Source.h>
11#include <xrpld/peerfinder/detail/Store.h>
12#include <xrpld/peerfinder/detail/iosformat.h>
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/contract.h>
16#include <xrpl/basics/random.h>
17#include <xrpl/beast/net/IPAddressConversion.h>
18#include <xrpl/beast/utility/WrappedSink.h>
32template <
class Checker>
161 if (addresses.
empty())
163 JLOG(
journal.info()) <<
"Could not resolve fixed slot '" << name <<
"'";
167 for (
auto const& remoteAddress : addresses)
169 if (remoteAddress.port() == 0)
172 "Port not specified for address:" + remoteAddress.toString());
175 auto result(
fixed_.emplace(
183 <<
beast::Leftw(18) <<
"Logic add fixed '" << name <<
"' at " << remoteAddress;
196 boost::system::error_code ec)
198 if (ec == boost::asio::error::operation_aborted)
202 auto const iter(
slots.find(remoteAddress));
203 if (iter ==
slots.end())
207 <<
" but the connection was closed";
222 JLOG(
journal.error()) <<
"Logic testing " << iter->first <<
" with error, "
230 JLOG(
journal.debug()) <<
"Logic testing " << checkedAddress <<
" succeeded";
241 <<
" on local " << localEndpoint;
246 if (isPublic(remoteEndpoint))
249 if (count + 1 >
config_.ipLimit)
252 << remoteEndpoint <<
" because of ip limits.";
258 if (
slots.contains(remoteEndpoint))
261 <<
" as duplicate incoming";
270 auto const result(
slots.emplace(slot->remoteEndpoint(), slot));
274 "xrpl::PeerFinder::Logic::new_inbound_slot : remote endpoint "
282 return {result.first->second, Result::Success};
294 if (
slots.contains(remoteEndpoint))
297 <<
" as duplicate connect";
306 auto const result =
slots.emplace(slot->remoteEndpoint(), slot);
310 "xrpl::PeerFinder::Logic::new_outbound_slot : remote endpoint "
319 return {result.first->second, Result::Success};
328 JLOG(
journal.trace()) <<
"Logic connected on local " << localEndpoint;
334 slots.contains(slot->remoteEndpoint()),
335 "xrpl::PeerFinder::Logic::onConnected : valid slot input");
337 slot->localEndpoint(localEndpoint);
341 auto const iter(
slots.find(localEndpoint));
342 if (iter !=
slots.end())
345 iter->second->localEndpoint() == slot->remoteEndpoint(),
346 "xrpl::PeerFinder::Logic::onConnected : local and remote "
347 "endpoints do match");
348 JLOG(
journal.warn()) <<
"Logic dropping as self connect";
366 JLOG(
journal.debug()) <<
"Logic handshake " << slot->remoteEndpoint() <<
" with "
367 << (reserved ?
"reserved " :
"") <<
"key " << key;
373 slots.contains(slot->remoteEndpoint()),
374 "xrpl::PeerFinder::Logic::activate : valid slot input");
378 "xrpl::PeerFinder::Logic::activate : valid slot state");
381 if (
keys.contains(key))
382 return Result::DuplicatePeer;
387 slot->reserved(reserved);
391 if (!
counts_.canActivate(*slot))
393 if (!slot->inbound())
394 bootcache.onSuccess(slot->remoteEndpoint());
395 if (slot->inbound() &&
counts_.inMax() == 0)
396 return Result::InboundDisabled;
402 slot->publicKey(key);
404 [[maybe_unused]]
bool const inserted =
keys.insert(key).second;
406 XRPL_ASSERT(inserted,
"xrpl::PeerFinder::Logic::activate : public key inserted");
411 slot->activate(
clock.now());
414 if (!slot->inbound())
415 bootcache.onSuccess(slot->remoteEndpoint());
418 if (slot->fixed() && !slot->inbound())
420 auto iter(
fixed_.find(slot->remoteEndpoint()));
424 "PeerFinder::Logic::activate(): remote_endpoint "
425 "missing from fixed_");
428 iter->second.success(
clock.now());
429 JLOG(
journal.trace()) <<
"Logic fixed success";
432 return Result::Success;
446 return std::move(h.
list());
464 auto needed(
counts_.attemptsNeeded());
471 for (
auto const& s :
slots)
473 auto const result(
squelches.insert(s.second->remoteEndpoint().address()));
519 << ((h.
list().size() > 1) ?
"endpoints" :
"endpoint");
552 <<
" boot " << ((h.
list().size() > 1) ?
"addresses" :
"address");
614 for (
auto& t : targets)
623 for (
auto const& t : targets)
626 auto const& list = t.list();
629 JLOG(
journal.trace()) <<
"Logic sending " << list.size()
630 << ((list.size() == 1) ?
" endpoint" :
" endpoints");
649 for (
auto const& entry :
slots)
650 entry.second->expire();
664 bool neighbor(
false);
665 for (
auto iter = list.
begin(); iter != list.
end();)
673 <<
" for excess hops " << ep.
hops;
674 iter = list.
erase(iter);
691 iter = list.
erase(iter);
701 iter = list.
erase(iter);
706 if (
std::any_of(list.
begin(), iter, [ep](Endpoints::value_type
const& other) {
707 return ep.address == other.address;
712 iter = list.
erase(iter);
738 JLOG(
journal.trace()) <<
"Endpoints contained " << list.
size()
739 << ((list.
size() > 1) ?
" entries" :
" entry");
745 slots.contains(slot->remoteEndpoint()),
746 "xrpl::PeerFinder::Logic::onEndpoints : valid slot input");
751 "xrpl::PeerFinder::Logic::onEndpoints : valid slot state");
756 if (slot->whenAcceptEndpoints > now)
761 for (
auto const& ep : list)
763 XRPL_ASSERT(ep.hops,
"xrpl::PeerFinder::Logic::onEndpoints : nonzero hops");
765 slot->recent.insert(ep.address, ep.hops);
772 if (slot->connectivityCheckInProgress)
775 <<
"Logic testing " << ep.address <<
" already in progress";
782 slot->connectivityCheckInProgress =
true;
792 slot->remoteEndpoint(),
794 std::placeholders::_1));
805 if (!slot->canAccept)
826 auto const iter =
slots.find(slot->remoteEndpoint());
828 if (iter ==
slots.end())
831 "PeerFinder::Logic::remove(): remote_endpoint "
832 "missing from slots_");
839 if (slot->publicKey() != std::nullopt)
841 auto const iter =
keys.find(*slot->publicKey());
843 if (iter ==
keys.end())
846 "PeerFinder::Logic::remove(): public_key missing "
859 "PeerFinder::Logic::remove(): remote_endpoint "
860 "address missing from connectedAddresses_");
883 auto iter(
fixed_.find(slot->remoteEndpoint()));
887 "PeerFinder::Logic::on_closed(): remote_endpoint "
888 "missing from fixed_");
891 iter->second.failure(
clock.now());
892 JLOG(
journal.debug()) <<
"Logic fixed failed";
896 switch (slot->state())
899 JLOG(
journal.trace()) <<
"Logic accept failed";
904 bootcache.onFailure(slot->remoteEndpoint());
913 JLOG(
journal.trace()) <<
"Logic close";
917 JLOG(
journal.trace()) <<
"Logic finished";
923 "xrpl::PeerFinder::Logic::on_closed : invalid slot "
935 bootcache.onFailure(slot->remoteEndpoint());
939 template <
class FwdIter>
941 onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint
const& remoteAddress);
950 for (
auto const& entry :
fixed_)
952 if (entry.first == endpoint)
964 for (
auto const& entry :
fixed_)
966 if (entry.first.address() == address)
979 template <
class Container>
983 auto const now(
clock.now());
984 for (
auto iter =
fixed_.begin(); needed && iter !=
fixed_.end(); ++iter)
986 auto const& address(iter->first.address());
989 return address == v.first.address();
993 c.push_back(iter->first);
1027 for (
auto const& addr : list)
1052 source->fetch(results,
journal);
1066 << ((count == 1) ?
"address" :
"addresses") <<
" from "
1072 <<
"'" << source->name() <<
"' fetch, "
1073 << results.
error.message();
1087 if (isUnspecified(address))
1089 if (isLoopback(address))
1091 if (!isPublic(address))
1093 if (address.
port() == 0)
1107 for (
auto const& entry :
slots)
1110 SlotImp const& slot(*entry.second);
1115 item[
"inbound"] =
"yes";
1117 item[
"fixed"] =
"yes";
1119 item[
"reserved"] =
"yes";
1198template <
class Checker>
1199template <
class FwdIter>
1204 boost::asio::ip::tcp::endpoint
const& remoteAddress)
std::chrono::steady_clock::time_point time_point
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
Endpoint atPort(Port port) const
Returns a new Endpoint with a different port.
Port port() const
Returns the port number on the endpoint.
A generic endpoint for log messages.
Wraps a Journal::Sink to prefix its output with a string.
Stores IP addresses useful for gaining initial connections.
Tests remote listening sockets to make sure they are connectable.
Receives handouts for making automatic connections.
bool tryInsert(beast::IP::Endpoint const &endpoint)
beast::aged_set< beast::IP::Address > Squelches
Manages the count of available connections for the various slots.
The Livecache holds the short-lived relayed Endpoint messages.
std::vector< std::shared_ptr< Source > > sources
void addFixedPeer(std::string_view name, std::vector< beast::IP::Endpoint > const &addresses)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remoteAddress)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
void addFixedPeer(std::string_view name, beast::IP::Endpoint const &ep)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
void config(Config const &c)
bool isValidAddress(beast::IP::Endpoint const &address)
void getFixed(std::size_t needed, Container &c, ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
std::pair< SlotImp::ptr, Result > newOutboundSlot(beast::IP::Endpoint const &remoteEndpoint)
std::pair< SlotImp::ptr, Result > newInboundSlot(beast::IP::Endpoint const &localEndpoint, beast::IP::Endpoint const &remoteEndpoint)
static std::string stateString(Slot::State state)
void fetch(std::shared_ptr< Source > const &source)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
std::multiset< beast::IP::Address > connectedAddresses
void addSource(std::shared_ptr< Source > const &source)
ConnectHandouts::Squelches squelches
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
clock_type::time_point whenBroadcast
void stop()
Stop the logic.
std::recursive_mutex lock
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
Counts const & counts() const
std::map< beast::IP::Endpoint, std::shared_ptr< SlotImp > > Slots
void onEndpoints(SlotImp::ptr const &slot, Endpoints list)
void remove(SlotImp::ptr const &slot)
void addStaticSource(std::shared_ptr< Source > const &source)
std::map< beast::IP::Endpoint, Fixed > fixed_
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onClosed(SlotImp::ptr const &slot)
void onWrite(beast::PropertyStream::Map &map)
std::shared_ptr< Source > fetchSource
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &localEndpoint)
bool fixed(beast::IP::Address const &address) const
void onFailure(SlotImp::ptr const &slot)
std::set< PublicKey > keys
Receives handouts for redirecting a connection.
std::vector< Endpoint > & list()
bool fixed() const override
Returns true if this is a fixed connection.
std::shared_ptr< SlotImp > ptr
bool reserved() const override
Returns true if this is a reserved connection.
std::string prefix() const
void setListeningPort(std::uint16_t port)
std::optional< beast::IP::Endpoint > const & localEndpoint() const override
The local endpoint of the socket, when known.
State state() const override
Returns the state of the connection.
bool connectivityCheckInProgress
beast::IP::Endpoint const & remoteEndpoint() const override
The remote endpoint of socket.
bool inbound() const override
Returns true if this is an inbound connection.
Abstract persistence for PeerFinder data.
T emplace_back(T... args)
T forward_as_tuple(T... args)
boost::asio::ip::address Address
boost::asio::ip::address_v6 AddressV6
std::enable_if_t< IsAgedContainer< AgedContainer >::value, std::size_t > expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
constexpr std::uint32_t kMaxHops
static constexpr auto kMaxRedirects
Max redirects we will accept from one connection.
constexpr std::uint32_t kNumberOfEndpointsMax
constexpr std::chrono::seconds kRecentAttemptDuration(60)
constexpr std::chrono::seconds kSecondsPerMessage(151)
std::vector< Endpoint > Endpoints
A set of Endpoint used for connecting.
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seqFirst, SeqFwdIter seqLast)
Distributes objects to targets according to business rules.
std::vector< beast::IP::Endpoint > IPAddresses
Represents a set of addresses.
Result
Possible results from activating a slot.
beast::AbstractClock< std::chrono::steady_clock > clock_type
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void logicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & defaultPrng()
Return the default random engine.
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
static IP::Endpoint fromAsio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
PeerFinder configuration settings.
Describes a connectable peer address along with some metadata.
beast::IP::Endpoint address
boost::system::error_code error