1#ifndef XRPL_PEERFINDER_LOGIC_H_INCLUDED
2#define XRPL_PEERFINDER_LOGIC_H_INCLUDED
4#include <xrpld/peerfinder/PeerfinderManager.h>
5#include <xrpld/peerfinder/detail/Bootcache.h>
6#include <xrpld/peerfinder/detail/Counts.h>
7#include <xrpld/peerfinder/detail/Fixed.h>
8#include <xrpld/peerfinder/detail/Handouts.h>
9#include <xrpld/peerfinder/detail/Livecache.h>
10#include <xrpld/peerfinder/detail/SlotImp.h>
11#include <xrpld/peerfinder/detail/Source.h>
12#include <xrpld/peerfinder/detail/Store.h>
13#include <xrpld/peerfinder/detail/iosformat.h>
15#include <xrpl/basics/Log.h>
16#include <xrpl/basics/contract.h>
17#include <xrpl/basics/random.h>
18#include <xrpl/beast/net/IPAddressConversion.h>
19#include <xrpl/beast/utility/WrappedSink.h>
34template <
class Checker>
167 if (addresses.
empty())
170 <<
"Could not resolve fixed slot '" << name <<
"'";
174 for (
auto const& remote_address : addresses)
176 if (remote_address.port() == 0)
178 Throw<std::runtime_error>(
179 "Port not specified for address:" +
180 remote_address.to_string());
183 auto result(
fixed_.emplace(
192 <<
"' at " << remote_address;
205 boost::system::error_code ec)
207 if (ec == boost::asio::error::operation_aborted)
216 <<
beast::leftw(18) <<
"Logic tested " << checkedAddress
217 <<
" but the connection was closed";
232 JLOG(journal.error()) <<
"Logic testing " << iter->first
233 <<
" with error, " << ec.message();
240 JLOG(journal.debug())
241 <<
"Logic testing " << checkedAddress <<
" succeeded";
252 <<
beast::leftw(18) <<
"Logic accept" << remote_endpoint
253 <<
" on local " << local_endpoint;
258 if (is_public(remote_endpoint))
266 << remote_endpoint <<
" because of ip limits.";
275 <<
beast::leftw(18) <<
"Logic dropping " << remote_endpoint
276 <<
" as duplicate incoming";
287 auto const result(
slots_.
emplace(slot->remote_endpoint(), slot));
291 "ripple::PeerFinder::Logic::new_inbound_slot : remote endpoint "
299 return {result.first->second, Result::success};
307 <<
beast::leftw(18) <<
"Logic connect " << remote_endpoint;
315 <<
beast::leftw(18) <<
"Logic dropping " << remote_endpoint
316 <<
" as duplicate connect";
325 auto const result =
slots_.
emplace(slot->remote_endpoint(), slot);
329 "ripple::PeerFinder::Logic::new_outbound_slot : remote endpoint "
338 return {result.first->second, Result::success};
349 JLOG(journal.trace()) <<
"Logic connected on local " << local_endpoint;
356 "ripple::PeerFinder::Logic::onConnected : valid slot input");
358 slot->local_endpoint(local_endpoint);
366 iter->second->local_endpoint() == slot->remote_endpoint(),
367 "ripple::PeerFinder::Logic::onConnected : local and remote "
368 "endpoints do match");
369 JLOG(journal.warn()) <<
"Logic dropping as self connect";
387 JLOG(journal.debug())
388 <<
"Logic handshake " << slot->remote_endpoint() <<
" with "
389 << (reserved ?
"reserved " :
"") <<
"key " << key;
396 "ripple::PeerFinder::Logic::activate : valid slot input");
400 "ripple::PeerFinder::Logic::activate : valid slot state");
404 return Result::duplicatePeer;
409 slot->reserved(reserved);
415 if (!slot->inbound())
418 return Result::inboundDisabled;
424 slot->public_key(key);
426 [[maybe_unused]]
bool const inserted =
keys_.insert(key).second;
430 "ripple::PeerFinder::Logic::activate : public key inserted");
438 if (!slot->inbound())
442 if (slot->fixed() && !slot->inbound())
444 auto iter(
fixed_.find(slot->remote_endpoint()));
447 "PeerFinder::Logic::activate(): remote_endpoint "
448 "missing from fixed_");
451 JLOG(journal.trace()) <<
"Logic fixed success";
454 return Result::success;
468 return std::move(h.
list());
493 for (
auto const& s :
slots_)
545 << ((h.
list().
size() > 1) ?
"endpoints" :
"endpoint");
583 << ((h.
list().
size() > 1) ?
"addresses" :
"address");
611 [&slots](Slots::value_type
const& value) {
612 if (value.second->state() == Slot::active)
613 slots.emplace_back(value.second);
623 targets.emplace_back(slot);
652 for (
auto& t : targets)
665 for (
auto const& t : targets)
668 auto const& list = t.list();
671 JLOG(journal.trace())
672 <<
"Logic sending " << list.size()
673 << ((list.size() == 1) ?
" endpoint" :
" endpoints");
692 for (
auto const& entry :
slots_)
693 entry.second->expire();
707 bool neighbor(
false);
708 for (
auto iter = list.
begin(); iter != list.
end();)
717 <<
" for excess hops " << ep.
hops;
718 iter = list.
erase(iter);
730 slot->remote_endpoint().at_port(ep.
address.
port());
736 <<
" for extra self";
737 iter = list.
erase(iter);
746 << ep.
address <<
" as invalid";
747 iter = list.
erase(iter);
755 [ep](Endpoints::value_type
const& other) {
756 return ep.address == other.address;
760 << ep.
address <<
" as duplicate";
761 iter = list.
erase(iter);
787 JLOG(journal.trace()) <<
"Endpoints contained " << list.
size()
788 << ((list.
size() > 1) ?
" entries" :
" entry");
795 "ripple::PeerFinder::Logic::on_endpoints : valid slot input");
800 "ripple::PeerFinder::Logic::on_endpoints : valid slot state");
805 if (slot->whenAcceptEndpoints > now)
810 for (
auto const& ep : list)
814 "ripple::PeerFinder::Logic::on_endpoints : nonzero hops");
816 slot->recent.insert(ep.address, ep.hops);
823 if (slot->connectivityCheckInProgress)
825 JLOG(journal.debug()) <<
"Logic testing " << ep.address
826 <<
" already in progress";
833 slot->connectivityCheckInProgress =
true;
843 slot->remote_endpoint(),
845 std::placeholders::_1));
856 if (!slot->canAccept)
877 auto const iter =
slots_.
find(slot->remote_endpoint());
881 "PeerFinder::Logic::remove(): remote_endpoint "
882 "missing from slots_");
890 auto const iter =
keys_.find(*slot->public_key());
892 if (iter ==
keys_.end())
894 "PeerFinder::Logic::remove(): public_key missing "
906 "PeerFinder::Logic::remove(): remote_endpont "
907 "address missing from connectedAddresses_");
927 if (slot->fixed() && !slot->inbound() && slot->state() !=
Slot::active)
929 auto iter(
fixed_.find(slot->remote_endpoint()));
932 "PeerFinder::Logic::on_closed(): remote_endpont "
933 "missing from fixed_");
936 JLOG(journal.debug()) <<
"Logic fixed failed";
940 switch (slot->state())
943 JLOG(journal.trace()) <<
"Logic accept failed";
957 JLOG(journal.trace()) <<
"Logic close";
961 JLOG(journal.trace()) <<
"Logic finished";
967 "ripple::PeerFinder::Logic::on_closed : invalid slot "
983 template <
class FwdIter>
988 boost::asio::ip::tcp::endpoint
const& remote_address);
997 for (
auto const& entry :
fixed_)
998 if (entry.first == endpoint)
1009 for (
auto const& entry :
fixed_)
1010 if (entry.first.address() == address)
1022 template <
class Container>
1030 for (
auto iter =
fixed_.begin(); needed && iter !=
fixed_.end(); ++iter)
1032 auto const& address(iter->first.address());
1033 if (iter->second.when() <= now &&
1034 squelches.
find(address) == squelches.
end() &&
1038 [address](Slots::value_type
const& v) {
1039 return address == v.first.address();
1042 squelches.
insert(iter->first.address());
1043 c.push_back(iter->first);
1077 for (
auto addr : list)
1116 <<
beast::leftw(18) <<
"Logic added " << count <<
" new "
1117 << ((count == 1) ?
"address" :
"addresses") <<
" from "
1123 <<
"'" << source->name() <<
"' fetch, "
1124 << results.
error.message();
1138 if (is_unspecified(address))
1140 if (!is_public(address))
1142 if (address.
port() == 0)
1156 for (
auto const& entry : slots)
1159 SlotImp const& slot(*entry.second);
1164 item[
"inbound"] =
"yes";
1166 item[
"fixed"] =
"yes";
1168 item[
"reserved"] =
"yes";
1247template <
class Checker>
1248template <
class FwdIter>
1253 boost::asio::ip::tcp::endpoint
const& remote_address)
1261 JLOG(m_journal.trace()) <<
beast::leftw(18) <<
"Logic add " << n
1262 <<
" redirect IPs from " << remote_address;
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Port port() const
Returns the port number on the endpoint.
A generic endpoint for log messages.
Sink & sink() const
Returns the Sink associated with this Journal.
Wraps a Journal::Sink to prefix its output with a string.
typename Clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
iterator find(K const &k)
Stores IP addresses useful for gaining initial connections.
map_type::size_type size() const
Returns the number of entries in the cache.
const_iterator end() const
void periodicActivity()
Stores the cache in the persistent database on a timer.
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
void load()
Load the persisted data from the Store into the container.
Tests remote listening sockets to make sure they are connectible.
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
Receives handouts for making automatic connections.
bool try_insert(beast::IP::Endpoint const &endpoint)
Manages the count of available connections for the various slots.
std::size_t fixed_active() const
Returns the number of active fixed connections.
int out_active() const
Returns the number of outbound peers assigned an open slot.
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
int out_max() const
Returns the total number of outbound slots.
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
int in_max() const
Returns the total number of inbound slots.
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
void onConfig(Config const &config)
Called when the config is set or changed.
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
std::size_t attempts() const
Returns the number of outbound connection attempts.
void shuffle()
Shuffle each hop list.
reverse_iterator rbegin()
The Livecache holds the short-lived relayed Endpoint messages.
void expire()
Erase entries whose time has expired.
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
class ripple::PeerFinder::Livecache::hops_t hops
The Logic for maintaining the list of Slot addresses.
void stop()
Stop the logic.
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onWrite(beast::PropertyStream::Map &map)
std::recursive_mutex lock_
void fetch(std::shared_ptr< Source > const &source)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
bool is_valid_address(beast::IP::Endpoint const &address)
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void on_closed(SlotImp::ptr const &slot)
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
void on_failure(SlotImp::ptr const &slot)
std::set< PublicKey > keys_
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
bool fixed(beast::IP::Address const &address) const
std::shared_ptr< Source > fetchSource_
void remove(SlotImp::ptr const &slot)
std::pair< SlotImp::ptr, Result > new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
clock_type::time_point m_whenBroadcast
std::multiset< beast::IP::Address > connectedAddresses_
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
std::vector< std::shared_ptr< Source > > m_sources
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void addSource(std::shared_ptr< Source > const &source)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
void addStaticSource(std::shared_ptr< Source > const &source)
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
Counts const & counts() const
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
std::pair< SlotImp::ptr, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
ConnectHandouts::Squelches m_squelches
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::map< beast::IP::Endpoint, Fixed > fixed_
void config(Config const &c)
static std::string stateString(Slot::State state)
Receives handouts for redirecting a connection.
std::vector< Endpoint > & list()
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
bool fixed() const override
Returns true if this is a fixed connection.
bool inbound() const override
Returns true if this is an inbound connection.
bool connectivityCheckInProgress
std::shared_ptr< SlotImp > ptr
void set_listening_port(std::uint16_t port)
State state() const override
Returns the state of the connection.
std::string prefix() const
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
bool reserved() const override
Returns true if this is a reserved connection.
Abstract persistence for PeerFinder data.
T forward_as_tuple(T... args)
boost::asio::ip::address_v6 AddressV6
boost::asio::ip::address Address
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
std::chrono::seconds constexpr secondsPerMessage(151)
std::uint32_t constexpr numberOfEndpointsMax
std::chrono::seconds constexpr recentAttemptDuration(60)
std::uint32_t constexpr maxHops
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
Result
Possible results from activating a slot.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & default_prng()
Return the default random engine.
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
PeerFinder configuration settings.
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
bool autoConnect
true if we want to establish connections automatically
int ipLimit
Limit how many incoming connections we allow per IP.
bool wantIncoming
true if we want to accept incoming connections.
std::uint16_t listeningPort
The listening port number.
Describes a connectible peer address along with some metadata.
beast::IP::Endpoint address
boost::system::error_code error