3#include <xrpld/core/Config.h>
4#include <xrpld/overlay/Peer.h>
5#include <xrpld/overlay/ReduceRelayCommon.h>
7#include <xrpl/basics/Log.h>
8#include <xrpl/basics/chrono.h>
9#include <xrpl/basics/random.h>
10#include <xrpl/beast/container/aged_unordered_map.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/protocol/PublicKey.h>
13#include <xrpl/protocol/messages.h>
24template <
typename ClockType>
39template <
typename Unit,
typename TP>
79template <
typename ClockType>
83 friend class Slots<ClockType>;
127 protocol::MessageType type,
231template <
typename ClockType>
236 auto now = ClockType::now();
239 auto& peer = it->second;
242 if (now - peer.lastMessage >
kIdled)
245 <<
"deleteIdlePeer: " <<
Slice(validator) <<
" " <<
id <<
" idled "
253template <
typename ClockType>
258 protocol::MessageType type,
262 auto now = ClockType::now();
263 auto it =
peers_.find(
id);
267 JLOG(
journal_.trace()) <<
"update: adding peer " <<
Slice(validator) <<
" " << id;
275 JLOG(
journal_.trace()) <<
"update: squelch expired " <<
Slice(validator) <<
" " << id;
277 it->second.lastMessage = now;
282 auto& peer = it->second;
284 JLOG(
journal_.trace()) <<
"update: existing peer " <<
Slice(validator) <<
" " <<
id
285 <<
" slot state " <<
static_cast<int>(
state_) <<
" peer state "
286 <<
static_cast<int>(peer.state) <<
" count " << peer.count <<
" last "
289 <<
" " << (type == protocol::mtVALIDATION ?
"validation" :
"proposal");
291 peer.lastMessage = now;
307 JLOG(
journal_.trace()) <<
"update: resetting due to inactivity " <<
Slice(validator) <<
" "
322 auto const consideredPoolSize =
considered_.size();
329 auto const& itPeers =
peers_.find(
id);
330 if (itPeers ==
peers_.end())
333 <<
"update: peer not found " <<
Slice(validator) <<
" " << id;
336 if (now - itPeers->second.lastMessage <
kIdled)
342 JLOG(
journal_.trace()) <<
"update: selection failed " <<
Slice(validator) <<
" " << id;
349 auto s = selected.
begin();
350 JLOG(
journal_.trace()) <<
"update: " <<
Slice(validator) <<
" " <<
id <<
" pool size "
351 << consideredPoolSize <<
" selected " << *s <<
" "
360 for (
auto& [k, v] :
peers_)
364 if (selected.
find(k) != selected.
end())
379 JLOG(
journal_.trace()) <<
"update: squelching " <<
Slice(validator) <<
" " <<
id <<
" "
387template <
typename ClockType>
396 JLOG(
journal_.warn()) <<
"getSquelchDuration: unexpected squelch duration " << npeers;
401template <
typename ClockType>
405 auto it =
peers_.find(
id);
410 JLOG(
journal_.trace()) <<
"deletePeer: " <<
Slice(validator) <<
" " <<
id <<
" selected "
413 auto now = ClockType::now();
416 for (
auto& [k, v] :
peers_)
436 it->second.lastMessage = now;
437 it->second.count = 0;
443 for (
auto const& k : toUnsquelch)
448template <
typename ClockType>
452 for (
auto& [_, peer] :
peers_)
459template <
typename ClockType>
469template <
typename ClockType>
474 peers_.begin(),
peers_.end(), [&](
auto const& it) { return (it.second.state == state); });
477template <
typename ClockType>
482 peers_.begin(),
peers_.end(), [&](
auto const& it) { return (it.second.state != state); });
485template <
typename ClockType>
490 for (
auto const& [
id, info] :
peers_)
498template <
typename ClockType>
504 unordered_map<id_t, std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>();
506 for (
auto const& [
id, info] :
peers_)
526template <
typename ClockType>
545 ,
logs_(registry.getLogs())
546 ,
journal_(registry.getJournal(
"Slots"))
585 protocol::MessageType type)
602 protocol::MessageType type,
615 auto const& it =
slots_.find(validator);
617 return it->second.inState(state);
625 auto const& it =
slots_.find(validator);
627 return it->second.notInState(state);
635 auto const& it =
slots_.find(validator);
637 return it->second.state_ == state;
645 auto const& it =
slots_.find(validator);
647 return it->second.getSelected();
657 auto const& it =
slots_.find(validator);
659 return it->second.getPeers();
667 auto const& it =
slots_.find(validator);
669 return it->second.getState();
706template <
typename ClockType>
722 if (it->second.find(
id) != it->second.end())
725 <<
"addPeerMessage: duplicate message " <<
to_string(key) <<
" " << id;
729 JLOG(
journal_.trace()) <<
"addPeerMessage: added " <<
to_string(key) <<
" " << id;
731 it->second.insert(
id);
737template <
typename ClockType>
743 protocol::MessageType type,
749 auto it =
slots_.find(validator);
752 JLOG(
journal_.trace()) <<
"updateSlotAndSquelch: new slot " <<
Slice(validator);
759 it->second.update(validator,
id, type, callback);
763 it->second.update(validator,
id, type, callback);
767template <
typename ClockType>
771 for (
auto& [validator, slot] :
slots_)
772 slot.deletePeer(validator,
id,
erase);
775template <
typename ClockType>
779 auto now = ClockType::now();
783 it->second.deleteIdlePeer(it->first);
786 JLOG(
journal_.trace()) <<
"deleteIdlePeers: deleting idle slot " <<
Slice(it->first);
A generic endpoint for log messages.
Seed functor once per construction.
Manages partitions for logging.
std::uint32_t id_t
Uniquely identifies a peer.
Service registry for dependency injection.
An immutable linear range of bytes.
Slot is associated with a specific validator via validator's public key.
ManualClock::time_point time_point
std::function< void()> ignored_squelch_callback
time_point const & getLastSelected() const
Get the time of the last peer selection round.
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
std::uint16_t inState(PeerState state) const
Return number of peers in state.
std::set< id_t > getSelected() const
Return selected peers.
SlotState getState() const
Return Slot's state.
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
Slot(SquelchHandler const &handler, beast::Journal journal, uint16_t maxSelectedPeers)
Constructor.
std::unordered_set< id_t > considered_
void resetCounts()
Reset counts of peers in Selected or Counting state.
std::uint16_t reachedThreshold_
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between kMinUnsquelchExpire and min(max(kMaxUnsquelchExpireDefault,...
void initCounting()
Initialize slot to Counting state.
beast::Journal const journal_
void update(PublicKey const &validator, id_t id, protocol::MessageType type, ignored_squelch_callback callback)
Update peer info.
uint16_t const maxSelectedPeers_
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
SquelchHandler const & handler_
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
ManualClock::time_point lastSelected_
std::unordered_map< id_t, PeerInfo > peers_
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
Slots(ServiceRegistry ®istry, SquelchHandler const &handler, Config const &config)
uint16_t const maxSelectedPeers_
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
beast::Journal const journal_
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
static messages peersWithMessage
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
bool reduceRelayReady()
Check if reduce_relay::kWaitOnBootup time passed since startup.
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
hash_map< PublicKey, Slot< ClockType > > slots_
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
std::atomic_bool reduceRelayReady_
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
beast::aged_unordered_map< uint256, std::unordered_set< Peer::id_t >, ClockType, HardenedHash< strong_hash > > messages
SquelchHandler const & handler_
bool const baseSquelchEnabled_
bool baseSquelchReady()
Check if base squelching feature is enabled and ready.
ClockType::time_point time_point
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
std::unordered_map< Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
virtual ~SquelchHandler()=default
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
T duration_cast(T... args)
std::enable_if_t< IsAgedContainer< AgedContainer >::value, std::size_t > expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
AbstractClock< Facade > & getAbstractClock()
Returns a global instance of an abstract clock.
detail::AgedUnorderedContainer< false, true, Key, T, Clock, Hash, KeyEqual, Allocator > aged_unordered_map
static constexpr uint16_t kMinMessageThreshold
static constexpr auto kSquelchPerPeer
static constexpr auto kMinUnsquelchExpire
static constexpr uint16_t kMaxMessageThreshold
static constexpr auto kWaitOnBootup
static constexpr auto kMaxUnsquelchExpireDefault
static constexpr auto kMaxUnsquelchExpirePeers
static constexpr auto kIdled
std::enable_if_t< std::is_integral_v< Integral >, Integral > randInt()
std::string to_string(BaseUInt< Bits, Tag > const &a)
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Data maintained for each peer.