1#ifndef XRPL_OVERLAY_SLOT_H_INCLUDED
2#define XRPL_OVERLAY_SLOT_H_INCLUDED
4#include <xrpld/core/Config.h>
5#include <xrpld/overlay/Peer.h>
6#include <xrpld/overlay/ReduceRelayCommon.h>
8#include <xrpl/basics/Log.h>
9#include <xrpl/basics/chrono.h>
10#include <xrpl/basics/random.h>
11#include <xrpl/beast/container/aged_unordered_map.h>
12#include <xrpl/beast/utility/Journal.h>
13#include <xrpl/protocol/PublicKey.h>
14#include <xrpl/protocol/messages.h>
25namespace reduce_relay {
27template <
typename clock_type>
42template <
typename Unit,
typename TP>
46 return std::chrono::duration_cast<Unit>(t.time_since_epoch());
85template <
typename clock_type>
89 friend class Slots<clock_type>;
105 uint16_t maxSelectedPeers)
138 protocol::MessageType type,
243template <
typename clock_type>
248 auto now = clock_type::now();
249 for (
auto it = peers_.begin(); it != peers_.end();)
251 auto& peer = it->second;
254 if (now - peer.lastMessage >
IDLED)
256 JLOG(journal_.trace())
257 <<
"deleteIdlePeer: " <<
Slice(validator) <<
" " <<
id
259 << duration_cast<seconds>(now - peer.lastMessage).count()
261 deletePeer(validator,
id,
false);
266template <
typename clock_type>
271 protocol::MessageType type,
275 auto now = clock_type::now();
276 auto it = peers_.find(
id);
278 if (it == peers_.end())
280 JLOG(journal_.trace())
281 <<
"update: adding peer " <<
Slice(validator) <<
" " << id;
290 JLOG(journal_.trace())
291 <<
"update: squelch expired " <<
Slice(validator) <<
" " << id;
293 it->second.lastMessage = now;
298 auto& peer = it->second;
300 JLOG(journal_.trace())
301 <<
"update: existing peer " <<
Slice(validator) <<
" " <<
id
302 <<
" slot state " <<
static_cast<int>(state_) <<
" peer state "
303 <<
static_cast<int>(peer.state) <<
" count " << peer.count <<
" last "
304 << duration_cast<milliseconds>(now - peer.lastMessage).count()
305 <<
" pool " << considered_.
size() <<
" threshold " << reachedThreshold_
306 <<
" " << (type == protocol::mtVALIDATION ?
"validation" :
"proposal");
308 peer.lastMessage = now;
318 considered_.insert(
id);
324 JLOG(journal_.trace())
325 <<
"update: resetting due to inactivity " <<
Slice(validator) <<
" "
326 <<
id <<
" " << duration_cast<seconds>(now - lastSelected_).count();
331 if (reachedThreshold_ == maxSelectedPeers_)
340 auto const consideredPoolSize = considered_.
size();
341 while (selected.
size() != maxSelectedPeers_ && considered_.size() != 0)
344 considered_.size() == 1 ? 0 :
rand_int(considered_.size() - 1);
345 auto it =
std::next(considered_.begin(), i);
347 considered_.erase(it);
348 auto const& itpeers = peers_.find(
id);
349 if (itpeers == peers_.end())
351 JLOG(journal_.error()) <<
"update: peer not found "
352 <<
Slice(validator) <<
" " << id;
355 if (now - itpeers->second.lastMessage <
IDLED)
359 if (selected.
size() != maxSelectedPeers_)
361 JLOG(journal_.trace())
362 <<
"update: selection failed " <<
Slice(validator) <<
" " << id;
369 auto s = selected.
begin();
370 JLOG(journal_.trace())
371 <<
"update: " <<
Slice(validator) <<
" " <<
id <<
" pool size "
372 << consideredPoolSize <<
" selected " << *s <<
" "
376 peers_.size() >= maxSelectedPeers_,
377 "ripple::reduce_relay::Slot::update : minimum peers");
382 for (
auto& [k, v] : peers_)
386 if (selected.
find(k) != selected.
end())
390 if (journal_.trace())
394 getSquelchDuration(peers_.size() - maxSelectedPeers_);
399 JLOG(journal_.trace()) <<
"update: squelching " <<
Slice(validator)
400 <<
" " <<
id <<
" " << str.
str();
402 reachedThreshold_ = 0;
407template <
typename clock_type>
417 JLOG(journal_.warn())
418 <<
"getSquelchDuration: unexpected squelch duration " << npeers;
423template <
typename clock_type>
427 auto it = peers_.find(
id);
428 if (it != peers_.end())
432 JLOG(journal_.trace())
433 <<
"deletePeer: " <<
Slice(validator) <<
" " <<
id <<
" selected "
435 << (considered_.find(
id) != considered_.end()) <<
" erase "
437 auto now = clock_type::now();
440 for (
auto& [k, v] : peers_)
450 reachedThreshold_ = 0;
453 else if (considered_.find(
id) != considered_.end())
457 considered_.erase(
id);
460 it->second.lastMessage = now;
461 it->second.count = 0;
467 for (
auto const& k : toUnsquelch)
468 handler_.unsquelch(validator, k);
472template <
typename clock_type>
476 for (
auto& [_, peer] : peers_)
483template <
typename clock_type>
489 reachedThreshold_ = 0;
493template <
typename clock_type>
497 return std::count_if(peers_.begin(), peers_.end(), [&](
auto const& it) {
498 return (it.second.state == state);
502template <
typename clock_type>
506 return std::count_if(peers_.begin(), peers_.end(), [&](
auto const& it) {
507 return (it.second.state != state);
511template <
typename clock_type>
516 for (
auto const& [
id, info] : peers_)
522template <
typename clock_type>
533 for (
auto const& [
id, info] : peers_)
539 epoch<milliseconds>(info.expire).count(),
540 epoch<milliseconds>(info.lastMessage).count()))));
549template <
typename clock_type>
589 reduce_relay::epoch<std::chrono::minutes>(clock_type::now()) >
607 protocol::MessageType type)
624 protocol::MessageType type,
637 auto const& it =
slots_.find(validator);
639 return it->second.inState(state);
647 auto const& it =
slots_.find(validator);
649 return it->second.notInState(state);
657 auto const& it =
slots_.find(validator);
659 return it->second.state_ == state;
667 auto const& it =
slots_.find(validator);
669 return it->second.getSelected();
681 auto const& it =
slots_.find(validator);
683 return it->second.getPeers();
691 auto const& it =
slots_.find(validator);
693 return it->second.getState();
728 beast::get_abstract_clock<clock_type>()};
731template <
typename clock_type>
739 auto it = peersWithMessage_.find(key);
740 if (it == peersWithMessage_.end())
742 JLOG(journal_.trace())
743 <<
"addPeerMessage: new " <<
to_string(key) <<
" " << id;
748 if (it->second.find(
id) != it->second.end())
750 JLOG(journal_.trace()) <<
"addPeerMessage: duplicate message "
755 JLOG(journal_.trace())
756 <<
"addPeerMessage: added " <<
to_string(key) <<
" " << id;
764template <
typename clock_type>
770 protocol::MessageType type,
773 if (!addPeerMessage(key,
id))
776 auto it = slots_.find(validator);
777 if (it == slots_.end())
779 JLOG(journal_.trace())
780 <<
"updateSlotAndSquelch: new slot " <<
Slice(validator);
786 handler_, logs_.journal(
"Slot"), maxSelectedPeers_)))
788 it->second.update(validator,
id, type, callback);
791 it->second.update(validator,
id, type, callback);
794template <
typename clock_type>
798 for (
auto& [validator, slot] : slots_)
799 slot.deletePeer(validator,
id,
erase);
802template <
typename clock_type>
806 auto now = clock_type::now();
808 for (
auto it = slots_.begin(); it != slots_.end();)
810 it->second.deleteIdlePeer(it->first);
813 JLOG(journal_.trace())
814 <<
"deleteIdlePeers: deleting idle slot " <<
Slice(it->first);
815 it = slots_.erase(it);
A generic endpoint for log messages.
Associative container where each element is also indexed by time.
Manages partitions for logging.
std::uint32_t id_t
Uniquely identifies a peer.
An immutable linear range of bytes.
std::size_t size() const noexcept
Returns the number of bytes in the storage.
Seed functor once per construction.
Slot is associated with a specific validator via validator's public key.
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
Slot(SquelchHandler const &handler, beast::Journal journal, uint16_t maxSelectedPeers)
Constructor.
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
std::uint16_t reachedThreshold_
uint16_t const maxSelectedPeers_
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
typename clock_type::time_point time_point
SquelchHandler const & handler_
void initCounting()
Initialize slot to Counting state.
time_point const & getLastSelected() const
Get the time of the last peer selection round.
clock_type::time_point lastSelected_
void update(PublicKey const &validator, id_t id, protocol::MessageType type, ignored_squelch_callback callback)
Update peer info.
std::set< id_t > getSelected() const
Return selected peers.
SlotState getState() const
Return Slot's state.
std::uint16_t inState(PeerState state) const
Return number of peers in state.
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
std::unordered_set< id_t > considered_
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between MIN_UNSQUELCH_EXPIRE and min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT,...
std::unordered_map< id_t, PeerInfo > peers_
void resetCounts()
Reset counts of peers in Selected or Counting state.
beast::Journal const journal_
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
typename clock_type::time_point time_point
bool reduceRelayReady()
Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup.
static messages peersWithMessage_
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
bool const baseSquelchEnabled_
beast::Journal const journal_
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
bool baseSquelchReady()
Check if base squelching feature is enabled and ready.
hash_map< PublicKey, Slot< clock_type > > slots_
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
std::unordered_map< typename Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
uint16_t const maxSelectedPeers_
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type, typename Slot< clock_type >::ignored_squelch_callback callback)
Calls Slot::update of Slot associated with the validator.
std::atomic_bool reduceRelayReady_
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
Slots(Logs &logs, SquelchHandler const &handler, Config const &config)
SquelchHandler const & handler_
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
virtual ~SquelchHandler()
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
static constexpr auto SQUELCH_PER_PEER
static constexpr auto MIN_UNSQUELCH_EXPIRE
static constexpr auto IDLED
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
static constexpr auto WAIT_ON_BOOTUP
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT
static constexpr uint16_t MIN_MESSAGE_THRESHOLD
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
std::string to_string(base_uint< Bits, Tag > const &a)
Data maintained for each peer.