rippled
Loading...
Searching...
No Matches
overlay/Slot.h
1#pragma once
2
3#include <xrpld/core/Config.h>
4#include <xrpld/overlay/Peer.h>
5#include <xrpld/overlay/ReduceRelayCommon.h>
6
7#include <xrpl/basics/Log.h>
8#include <xrpl/basics/chrono.h>
9#include <xrpl/basics/random.h>
10#include <xrpl/beast/container/aged_unordered_map.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/protocol/PublicKey.h>
13#include <xrpl/protocol/messages.h>
14
15#include <algorithm>
16#include <optional>
17#include <set>
18#include <tuple>
19#include <unordered_map>
20#include <unordered_set>
21
22namespace xrpl {
23
24namespace reduce_relay {
25
26template <typename clock_type>
27class Slots;
28
30enum class PeerState : uint8_t {
31 Counting, // counting messages
32 Selected, // selected to relay, counting if Slot in Counting
33 Squelched, // squelched, doesn't relay
34};
36enum class SlotState : uint8_t {
37 Counting, // counting messages
38 Selected, // peers selected, stop counting
39};
40
41template <typename Unit, typename TP>
42Unit
43epoch(TP const& t)
44{
45 return std::chrono::duration_cast<Unit>(t.time_since_epoch());
46}
47
53{
54public:
56 {
57 }
63 virtual void
64 squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration) const = 0;
69 virtual void
70 unsquelch(PublicKey const& validator, Peer::id_t id) const = 0;
71};
72
83template <typename clock_type>
84class Slot final
85{
86private:
87 friend class Slots<clock_type>;
89 using time_point = typename clock_type::time_point;
90
91 // a callback to report ignored squelches
93
100 Slot(SquelchHandler const& handler, beast::Journal journal, uint16_t maxSelectedPeers)
102 , lastSelected_(clock_type::now())
104 , handler_(handler)
105 , journal_(journal)
106 , maxSelectedPeers_(maxSelectedPeers)
107 {
108 }
109
129 void
130 update(PublicKey const& validator, id_t id, protocol::MessageType type, ignored_squelch_callback callback);
131
142 void
143 deletePeer(PublicKey const& validator, id_t id, bool erase);
144
146 time_point const&
148 {
149 return lastSelected_;
150 }
151
154 inState(PeerState state) const;
155
158 notInState(PeerState state) const;
159
162 getState() const
163 {
164 return state_;
165 }
166
169 getSelected() const;
170
175 getPeers() const;
176
183 void
184 deleteIdlePeer(PublicKey const& validator);
185
193
194private:
196 void
198
200 void
202
204 struct PeerInfo
205 {
206 PeerState state; // peer's state
207 std::size_t count; // message count
208 time_point expire; // squelch expiration time
209 time_point lastMessage; // time last message received
210 };
211
213
214 // pool of peers considered as the source of messages
215 // from validator - peers that reached MIN_MESSAGE_THRESHOLD
217
218 // number of peers that reached MAX_MESSAGE_THRESHOLD
220
221 // last time peers were selected, used to age the slot
222 typename clock_type::time_point lastSelected_;
223
224 SlotState state_; // slot's state
225 SquelchHandler const& handler_; // squelch/unsquelch handler
226 beast::Journal const journal_; // logging
227
228 // the maximum number of peers that should be selected as a validator
229 // message source
230 uint16_t const maxSelectedPeers_;
231};
232
233template <typename clock_type>
234void
236{
237 using namespace std::chrono;
238 auto now = clock_type::now();
239 for (auto it = peers_.begin(); it != peers_.end();)
240 {
241 auto& peer = it->second;
242 auto id = it->first;
243 ++it;
244 if (now - peer.lastMessage > IDLED)
245 {
246 JLOG(journal_.trace()) << "deleteIdlePeer: " << Slice(validator) << " " << id << " idled "
247 << duration_cast<seconds>(now - peer.lastMessage).count() << " selected "
248 << (peer.state == PeerState::Selected);
249 deletePeer(validator, id, false);
250 }
251 }
252}
253
254template <typename clock_type>
255void
257 PublicKey const& validator,
258 id_t id,
259 protocol::MessageType type,
261{
262 using namespace std::chrono;
263 auto now = clock_type::now();
264 auto it = peers_.find(id);
265 // First message from this peer
266 if (it == peers_.end())
267 {
268 JLOG(journal_.trace()) << "update: adding peer " << Slice(validator) << " " << id;
269 peers_.emplace(std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
270 initCounting();
271 return;
272 }
273 // Message from a peer with expired squelch
274 if (it->second.state == PeerState::Squelched && now > it->second.expire)
275 {
276 JLOG(journal_.trace()) << "update: squelch expired " << Slice(validator) << " " << id;
277 it->second.state = PeerState::Counting;
278 it->second.lastMessage = now;
279 initCounting();
280 return;
281 }
282
283 auto& peer = it->second;
284
285 JLOG(journal_.trace()) << "update: existing peer " << Slice(validator) << " " << id << " slot state "
286 << static_cast<int>(state_) << " peer state " << static_cast<int>(peer.state) << " count "
287 << peer.count << " last " << duration_cast<milliseconds>(now - peer.lastMessage).count()
288 << " pool " << considered_.size() << " threshold " << reachedThreshold_ << " "
289 << (type == protocol::mtVALIDATION ? "validation" : "proposal");
290
291 peer.lastMessage = now;
292
293 // report if we received a message from a squelched peer
294 if (peer.state == PeerState::Squelched)
295 callback();
296
297 if (state_ != SlotState::Counting || peer.state == PeerState::Squelched)
298 return;
299
300 if (++peer.count > MIN_MESSAGE_THRESHOLD)
301 considered_.insert(id);
302 if (peer.count == (MAX_MESSAGE_THRESHOLD + 1))
303 ++reachedThreshold_;
304
305 if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT)
306 {
307 JLOG(journal_.trace()) << "update: resetting due to inactivity " << Slice(validator) << " " << id << " "
308 << duration_cast<seconds>(now - lastSelected_).count();
309 initCounting();
310 return;
311 }
312
313 if (reachedThreshold_ == maxSelectedPeers_)
314 {
315 // Randomly select maxSelectedPeers_ peers from considered.
316 // Exclude peers that have been idling > IDLED -
317 // it's possible that deleteIdlePeer() has not been called yet.
318 // If number of remaining peers != maxSelectedPeers_
319 // then reset the Counting state and let deleteIdlePeer() handle
320 // idled peers.
322 auto const consideredPoolSize = considered_.size();
323 while (selected.size() != maxSelectedPeers_ && considered_.size() != 0)
324 {
325 auto i = considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1);
326 auto it = std::next(considered_.begin(), i);
327 auto id = *it;
328 considered_.erase(it);
329 auto const& itPeers = peers_.find(id);
330 if (itPeers == peers_.end())
331 {
332 JLOG(journal_.error()) << "update: peer not found " << Slice(validator) << " " << id;
333 continue;
334 }
335 if (now - itPeers->second.lastMessage < IDLED)
336 selected.insert(id);
337 }
338
339 if (selected.size() != maxSelectedPeers_)
340 {
341 JLOG(journal_.trace()) << "update: selection failed " << Slice(validator) << " " << id;
342 initCounting();
343 return;
344 }
345
346 lastSelected_ = now;
347
348 auto s = selected.begin();
349 JLOG(journal_.trace()) << "update: " << Slice(validator) << " " << id << " pool size " << consideredPoolSize
350 << " selected " << *s << " " << *std::next(s, 1) << " " << *std::next(s, 2);
351
352 XRPL_ASSERT(peers_.size() >= maxSelectedPeers_, "xrpl::reduce_relay::Slot::update : minimum peers");
353
354 // squelch peers which are not selected and
355 // not already squelched
357 for (auto& [k, v] : peers_)
358 {
359 v.count = 0;
360
361 if (selected.find(k) != selected.end())
362 v.state = PeerState::Selected;
363 else if (v.state != PeerState::Squelched)
364 {
365 if (journal_.trace())
366 str << k << " ";
367 v.state = PeerState::Squelched;
368 std::chrono::seconds duration = getSquelchDuration(peers_.size() - maxSelectedPeers_);
369 v.expire = now + duration;
370 handler_.squelch(validator, k, duration.count());
371 }
372 }
373 JLOG(journal_.trace()) << "update: squelching " << Slice(validator) << " " << id << " " << str.str();
374 considered_.clear();
375 reachedThreshold_ = 0;
376 state_ = SlotState::Selected;
377 }
378}
379
380template <typename clock_type>
383{
384 using namespace std::chrono;
387 {
389 JLOG(journal_.warn()) << "getSquelchDuration: unexpected squelch duration " << npeers;
390 }
391 return seconds{xrpl::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)};
392}
393
394template <typename clock_type>
395void
397{
398 auto it = peers_.find(id);
399 if (it != peers_.end())
400 {
401 std::vector<Peer::id_t> toUnsquelch;
402
403 JLOG(journal_.trace()) << "deletePeer: " << Slice(validator) << " " << id << " selected "
404 << (it->second.state == PeerState::Selected) << " considered "
405 << (considered_.find(id) != considered_.end()) << " erase " << erase;
406 auto now = clock_type::now();
407 if (it->second.state == PeerState::Selected)
408 {
409 for (auto& [k, v] : peers_)
410 {
411 if (v.state == PeerState::Squelched)
412 toUnsquelch.push_back(k);
413 v.state = PeerState::Counting;
414 v.count = 0;
415 v.expire = now;
416 }
417
418 considered_.clear();
419 reachedThreshold_ = 0;
420 state_ = SlotState::Counting;
421 }
422 else if (considered_.find(id) != considered_.end())
423 {
424 if (it->second.count > MAX_MESSAGE_THRESHOLD)
425 --reachedThreshold_;
426 considered_.erase(id);
427 }
428
429 it->second.lastMessage = now;
430 it->second.count = 0;
431
432 if (erase)
433 peers_.erase(it);
434
435 // Must be after peers_.erase(it)
436 for (auto const& k : toUnsquelch)
437 handler_.unsquelch(validator, k);
438 }
439}
440
441template <typename clock_type>
442void
444{
445 for (auto& [_, peer] : peers_)
446 {
447 (void)_;
448 peer.count = 0;
449 }
450}
451
452template <typename clock_type>
453void
455{
456 state_ = SlotState::Counting;
457 considered_.clear();
458 reachedThreshold_ = 0;
459 resetCounts();
460}
461
462template <typename clock_type>
465{
466 return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { return (it.second.state == state); });
467}
468
469template <typename clock_type>
472{
473 return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { return (it.second.state != state); });
474}
475
476template <typename clock_type>
479{
481 for (auto const& [id, info] : peers_)
482 if (info.state == PeerState::Selected)
483 r.insert(id);
484 return r;
485}
486
487template <typename clock_type>
490{
491 using namespace std::chrono;
493
494 for (auto const& [id, info] : peers_)
496 id,
497 std::move(std::make_tuple(
498 info.state,
499 info.count,
500 epoch<milliseconds>(info.expire).count(),
501 epoch<milliseconds>(info.lastMessage).count()))));
502
503 return r;
504}
505
510template <typename clock_type>
511class Slots final
512{
513 using time_point = typename clock_type::time_point;
514 using id_t = typename Peer::id_t;
515 using messages =
517
518public:
524 Slots(Logs& logs, SquelchHandler const& handler, Config const& config)
525 : handler_(handler)
526 , logs_(logs)
527 , journal_(logs.journal("Slots"))
528 , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
529 , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
530 {
531 }
532 ~Slots() = default;
533
535 bool
540
542 bool
544 {
547 reduce_relay::epoch<std::chrono::minutes>(clock_type::now()) > reduce_relay::WAIT_ON_BOOTUP;
548
549 return reduceRelayReady_;
550 }
551
559 void
560 updateSlotAndSquelch(uint256 const& key, PublicKey const& validator, id_t id, protocol::MessageType type)
561 {
562 updateSlotAndSquelch(key, validator, id, type, []() {});
563 }
564
572 void
574 uint256 const& key,
575 PublicKey const& validator,
576 id_t id,
577 protocol::MessageType type,
579
583 void
585
588 inState(PublicKey const& validator, PeerState state) const
589 {
590 auto const& it = slots_.find(validator);
591 if (it != slots_.end())
592 return it->second.inState(state);
593 return {};
594 }
595
598 notInState(PublicKey const& validator, PeerState state) const
599 {
600 auto const& it = slots_.find(validator);
601 if (it != slots_.end())
602 return it->second.notInState(state);
603 return {};
604 }
605
607 bool
608 inState(PublicKey const& validator, SlotState state) const
609 {
610 auto const& it = slots_.find(validator);
611 if (it != slots_.end())
612 return it->second.state_ == state;
613 return false;
614 }
615
618 getSelected(PublicKey const& validator)
619 {
620 auto const& it = slots_.find(validator);
621 if (it != slots_.end())
622 return it->second.getSelected();
623 return {};
624 }
625
630 getPeers(PublicKey const& validator)
631 {
632 auto const& it = slots_.find(validator);
633 if (it != slots_.end())
634 return it->second.getPeers();
635 return {};
636 }
637
640 getState(PublicKey const& validator)
641 {
642 auto const& it = slots_.find(validator);
643 if (it != slots_.end())
644 return it->second.getState();
645 return {};
646 }
647
654 void
656
657private:
661 bool
662 addPeerMessage(uint256 const& key, id_t id);
663
665
667 SquelchHandler const& handler_; // squelch/unsquelch handler
670
673
674 // Maintain aged container of message/peers. This is required
675 // to discard duplicate message from the same peer. A message
676 // is aged after IDLED seconds. A message received IDLED seconds
677 // after it was relayed is ignored by PeerImp.
678 inline static messages peersWithMessage_{beast::get_abstract_clock<clock_type>()};
679};
680
681template <typename clock_type>
682bool
684{
685 beast::expire(peersWithMessage_, reduce_relay::IDLED);
686
687 if (key.isNonZero())
688 {
689 auto it = peersWithMessage_.find(key);
690 if (it == peersWithMessage_.end())
691 {
692 JLOG(journal_.trace()) << "addPeerMessage: new " << to_string(key) << " " << id;
693 peersWithMessage_.emplace(key, std::unordered_set<id_t>{id});
694 return true;
695 }
696
697 if (it->second.find(id) != it->second.end())
698 {
699 JLOG(journal_.trace()) << "addPeerMessage: duplicate message " << to_string(key) << " " << id;
700 return false;
701 }
702
703 JLOG(journal_.trace()) << "addPeerMessage: added " << to_string(key) << " " << id;
704
705 it->second.insert(id);
706 }
707
708 return true;
709}
710
711template <typename clock_type>
712void
714 uint256 const& key,
715 PublicKey const& validator,
716 id_t id,
717 protocol::MessageType type,
719{
720 if (!addPeerMessage(key, id))
721 return;
722
723 auto it = slots_.find(validator);
724 if (it == slots_.end())
725 {
726 JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator);
727 auto it = slots_
728 .emplace(std::make_pair(
729 validator, Slot<clock_type>(handler_, logs_.journal("Slot"), maxSelectedPeers_)))
730 .first;
731 it->second.update(validator, id, type, callback);
732 }
733 else
734 it->second.update(validator, id, type, callback);
735}
736
737template <typename clock_type>
738void
740{
741 for (auto& [validator, slot] : slots_)
742 slot.deletePeer(validator, id, erase);
743}
744
745template <typename clock_type>
746void
748{
749 auto now = clock_type::now();
750
751 for (auto it = slots_.begin(); it != slots_.end();)
752 {
753 it->second.deleteIdlePeer(it->first);
754 if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT)
755 {
756 JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " << Slice(it->first);
757 it = slots_.erase(it);
758 }
759 else
760 ++it;
761 }
762}
763
764} // namespace reduce_relay
765
766} // namespace xrpl
T begin(T... args)
A generic endpoint for log messages.
Definition Journal.h:40
Associative container where each element is also indexed by time.
Manages partitions for logging.
Definition Log.h:32
std::uint32_t id_t
Uniquely identifies a peer.
A public key.
Definition PublicKey.h:42
An immutable linear range of bytes.
Definition Slice.h:26
std::size_t size() const noexcept
Returns the number of bytes in the storage.
Definition Slice.h:60
bool isNonZero() const
Definition base_uint.h:513
Seed functor once per construction.
Slot is associated with a specific validator via validator's public key.
beast::Journal const journal_
std::uint16_t reachedThreshold_
std::set< id_t > getSelected() const
Return selected peers.
std::uint16_t inState(PeerState state) const
Return number of peers in state.
uint16_t const maxSelectedPeers_
void update(PublicKey const &validator, id_t id, protocol::MessageType type, ignored_squelch_callback callback)
Update peer info.
typename clock_type::time_point time_point
void initCounting()
Initialize slot to Counting state.
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
std::unordered_map< id_t, PeerInfo > peers_
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
time_point const & getLastSelected() const
Get the time of the last peer selection round.
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
SquelchHandler const & handler_
void resetCounts()
Reset counts of peers in Selected or Counting state.
Slot(SquelchHandler const &handler, beast::Journal journal, uint16_t maxSelectedPeers)
Constructor.
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
std::unordered_set< id_t > considered_
clock_type::time_point lastSelected_
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between MIN_UNSQUELCH_EXPIRE and min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT,...
SlotState getState() const
Return Slot's state.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
hash_map< PublicKey, Slot< clock_type > > slots_
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
bool baseSquelchReady()
Check if base squelching feature is enabled and ready.
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type, typename Slot< clock_type >::ignored_squelch_callback callback)
Calls Slot::update of Slot associated with the validator.
bool reduceRelayReady()
Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup.
beast::Journal const journal_
Slots(Logs &logs, SquelchHandler const &handler, Config const &config)
SquelchHandler const & handler_
std::atomic_bool reduceRelayReady_
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
std::unordered_map< typename Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
static messages peersWithMessage_
typename Peer::id_t id_t
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
uint16_t const maxSelectedPeers_
typename clock_type::time_point time_point
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
T emplace(T... args)
T end(T... args)
T find(T... args)
T insert(T... args)
T make_pair(T... args)
T make_tuple(T... args)
T max(T... args)
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
static constexpr auto IDLED
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT
static constexpr uint16_t MIN_MESSAGE_THRESHOLD
static constexpr auto WAIT_ON_BOOTUP
static constexpr auto SQUELCH_PER_PEER
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS
static constexpr auto MIN_UNSQUELCH_EXPIRE
Unit epoch(TP const &t)
PeerState
Peer's State.
SlotState
Slot's State.
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:597
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition STExchange.h:148
T next(T... args)
T push_back(T... args)
T size(T... args)
T str(T... args)
Data maintained for each peer.