rippled
Loading...
Searching...
No Matches
overlay/Slot.h
1#ifndef XRPL_OVERLAY_SLOT_H_INCLUDED
2#define XRPL_OVERLAY_SLOT_H_INCLUDED
3
4#include <xrpld/core/Config.h>
5#include <xrpld/overlay/Peer.h>
6#include <xrpld/overlay/ReduceRelayCommon.h>
7
8#include <xrpl/basics/Log.h>
9#include <xrpl/basics/chrono.h>
10#include <xrpl/basics/random.h>
11#include <xrpl/beast/container/aged_unordered_map.h>
12#include <xrpl/beast/utility/Journal.h>
13#include <xrpl/protocol/PublicKey.h>
14#include <xrpl/protocol/messages.h>
15
16#include <algorithm>
17#include <optional>
18#include <set>
19#include <tuple>
20#include <unordered_map>
21#include <unordered_set>
22
23namespace ripple {
24
25namespace reduce_relay {
26
27template <typename clock_type>
28class Slots;
29
31enum class PeerState : uint8_t {
32 Counting, // counting messages
33 Selected, // selected to relay, counting if Slot in Counting
34 Squelched, // squelched, doesn't relay
35};
37enum class SlotState : uint8_t {
38 Counting, // counting messages
39 Selected, // peers selected, stop counting
40};
41
42template <typename Unit, typename TP>
43Unit
44epoch(TP const& t)
45{
46 return std::chrono::duration_cast<Unit>(t.time_since_epoch());
47}
48
54{
55public:
57 {
58 }
64 virtual void
65 squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration)
66 const = 0;
71 virtual void
72 unsquelch(PublicKey const& validator, Peer::id_t id) const = 0;
73};
74
85template <typename clock_type>
86class Slot final
87{
88private:
89 friend class Slots<clock_type>;
91 using time_point = typename clock_type::time_point;
92
93 // a callback to report ignored squelches
95
103 SquelchHandler const& handler,
104 beast::Journal journal,
105 uint16_t maxSelectedPeers)
107 , lastSelected_(clock_type::now())
109 , handler_(handler)
110 , journal_(journal)
111 , maxSelectedPeers_(maxSelectedPeers)
112 {
113 }
114
134 void
136 PublicKey const& validator,
137 id_t id,
138 protocol::MessageType type,
139 ignored_squelch_callback callback);
140
151 void
152 deletePeer(PublicKey const& validator, id_t id, bool erase);
153
155 time_point const&
157 {
158 return lastSelected_;
159 }
160
163 inState(PeerState state) const;
164
167 notInState(PeerState state) const;
168
171 getState() const
172 {
173 return state_;
174 }
175
178 getSelected() const;
179
183 std::
185 getPeers() const;
186
193 void
194 deleteIdlePeer(PublicKey const& validator);
195
203
204private:
206 void
208
210 void
212
214 struct PeerInfo
215 {
216 PeerState state; // peer's state
217 std::size_t count; // message count
218 time_point expire; // squelch expiration time
219 time_point lastMessage; // time last message received
220 };
221
223
224 // pool of peers considered as the source of messages
225 // from validator - peers that reached MIN_MESSAGE_THRESHOLD
227
228 // number of peers that reached MAX_MESSAGE_THRESHOLD
230
231 // last time peers were selected, used to age the slot
232 typename clock_type::time_point lastSelected_;
233
234 SlotState state_; // slot's state
235 SquelchHandler const& handler_; // squelch/unsquelch handler
236 beast::Journal const journal_; // logging
237
238 // the maximum number of peers that should be selected as a validator
239 // message source
240 uint16_t const maxSelectedPeers_;
241};
242
243template <typename clock_type>
244void
246{
247 using namespace std::chrono;
248 auto now = clock_type::now();
249 for (auto it = peers_.begin(); it != peers_.end();)
250 {
251 auto& peer = it->second;
252 auto id = it->first;
253 ++it;
254 if (now - peer.lastMessage > IDLED)
255 {
256 JLOG(journal_.trace())
257 << "deleteIdlePeer: " << Slice(validator) << " " << id
258 << " idled "
259 << duration_cast<seconds>(now - peer.lastMessage).count()
260 << " selected " << (peer.state == PeerState::Selected);
261 deletePeer(validator, id, false);
262 }
263 }
264}
265
266template <typename clock_type>
267void
269 PublicKey const& validator,
270 id_t id,
271 protocol::MessageType type,
273{
274 using namespace std::chrono;
275 auto now = clock_type::now();
276 auto it = peers_.find(id);
277 // First message from this peer
278 if (it == peers_.end())
279 {
280 JLOG(journal_.trace())
281 << "update: adding peer " << Slice(validator) << " " << id;
282 peers_.emplace(
283 std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
284 initCounting();
285 return;
286 }
287 // Message from a peer with expired squelch
288 if (it->second.state == PeerState::Squelched && now > it->second.expire)
289 {
290 JLOG(journal_.trace())
291 << "update: squelch expired " << Slice(validator) << " " << id;
292 it->second.state = PeerState::Counting;
293 it->second.lastMessage = now;
294 initCounting();
295 return;
296 }
297
298 auto& peer = it->second;
299
300 JLOG(journal_.trace())
301 << "update: existing peer " << Slice(validator) << " " << id
302 << " slot state " << static_cast<int>(state_) << " peer state "
303 << static_cast<int>(peer.state) << " count " << peer.count << " last "
304 << duration_cast<milliseconds>(now - peer.lastMessage).count()
305 << " pool " << considered_.size() << " threshold " << reachedThreshold_
306 << " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
307
308 peer.lastMessage = now;
309
310 // report if we received a message from a squelched peer
311 if (peer.state == PeerState::Squelched)
312 callback();
313
314 if (state_ != SlotState::Counting || peer.state == PeerState::Squelched)
315 return;
316
317 if (++peer.count > MIN_MESSAGE_THRESHOLD)
318 considered_.insert(id);
319 if (peer.count == (MAX_MESSAGE_THRESHOLD + 1))
320 ++reachedThreshold_;
321
322 if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT)
323 {
324 JLOG(journal_.trace())
325 << "update: resetting due to inactivity " << Slice(validator) << " "
326 << id << " " << duration_cast<seconds>(now - lastSelected_).count();
327 initCounting();
328 return;
329 }
330
331 if (reachedThreshold_ == maxSelectedPeers_)
332 {
333 // Randomly select maxSelectedPeers_ peers from considered.
334 // Exclude peers that have been idling > IDLED -
335 // it's possible that deleteIdlePeer() has not been called yet.
336 // If number of remaining peers != maxSelectedPeers_
337 // then reset the Counting state and let deleteIdlePeer() handle
338 // idled peers.
340 auto const consideredPoolSize = considered_.size();
341 while (selected.size() != maxSelectedPeers_ && considered_.size() != 0)
342 {
343 auto i =
344 considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1);
345 auto it = std::next(considered_.begin(), i);
346 auto id = *it;
347 considered_.erase(it);
348 auto const& itpeers = peers_.find(id);
349 if (itpeers == peers_.end())
350 {
351 JLOG(journal_.error()) << "update: peer not found "
352 << Slice(validator) << " " << id;
353 continue;
354 }
355 if (now - itpeers->second.lastMessage < IDLED)
356 selected.insert(id);
357 }
358
359 if (selected.size() != maxSelectedPeers_)
360 {
361 JLOG(journal_.trace())
362 << "update: selection failed " << Slice(validator) << " " << id;
363 initCounting();
364 return;
365 }
366
367 lastSelected_ = now;
368
369 auto s = selected.begin();
370 JLOG(journal_.trace())
371 << "update: " << Slice(validator) << " " << id << " pool size "
372 << consideredPoolSize << " selected " << *s << " "
373 << *std::next(s, 1) << " " << *std::next(s, 2);
374
375 XRPL_ASSERT(
376 peers_.size() >= maxSelectedPeers_,
377 "ripple::reduce_relay::Slot::update : minimum peers");
378
379 // squelch peers which are not selected and
380 // not already squelched
382 for (auto& [k, v] : peers_)
383 {
384 v.count = 0;
385
386 if (selected.find(k) != selected.end())
387 v.state = PeerState::Selected;
388 else if (v.state != PeerState::Squelched)
389 {
390 if (journal_.trace())
391 str << k << " ";
392 v.state = PeerState::Squelched;
394 getSquelchDuration(peers_.size() - maxSelectedPeers_);
395 v.expire = now + duration;
396 handler_.squelch(validator, k, duration.count());
397 }
398 }
399 JLOG(journal_.trace()) << "update: squelching " << Slice(validator)
400 << " " << id << " " << str.str();
401 considered_.clear();
402 reachedThreshold_ = 0;
403 state_ = SlotState::Selected;
404 }
405}
406
407template <typename clock_type>
410{
411 using namespace std::chrono;
412 auto m = std::max(
415 {
417 JLOG(journal_.warn())
418 << "getSquelchDuration: unexpected squelch duration " << npeers;
419 }
420 return seconds{ripple::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)};
421}
422
423template <typename clock_type>
424void
426{
427 auto it = peers_.find(id);
428 if (it != peers_.end())
429 {
430 std::vector<Peer::id_t> toUnsquelch;
431
432 JLOG(journal_.trace())
433 << "deletePeer: " << Slice(validator) << " " << id << " selected "
434 << (it->second.state == PeerState::Selected) << " considered "
435 << (considered_.find(id) != considered_.end()) << " erase "
436 << erase;
437 auto now = clock_type::now();
438 if (it->second.state == PeerState::Selected)
439 {
440 for (auto& [k, v] : peers_)
441 {
442 if (v.state == PeerState::Squelched)
443 toUnsquelch.push_back(k);
444 v.state = PeerState::Counting;
445 v.count = 0;
446 v.expire = now;
447 }
448
449 considered_.clear();
450 reachedThreshold_ = 0;
451 state_ = SlotState::Counting;
452 }
453 else if (considered_.find(id) != considered_.end())
454 {
455 if (it->second.count > MAX_MESSAGE_THRESHOLD)
456 --reachedThreshold_;
457 considered_.erase(id);
458 }
459
460 it->second.lastMessage = now;
461 it->second.count = 0;
462
463 if (erase)
464 peers_.erase(it);
465
466 // Must be after peers_.erase(it)
467 for (auto const& k : toUnsquelch)
468 handler_.unsquelch(validator, k);
469 }
470}
471
472template <typename clock_type>
473void
475{
476 for (auto& [_, peer] : peers_)
477 {
478 (void)_;
479 peer.count = 0;
480 }
481}
482
483template <typename clock_type>
484void
486{
487 state_ = SlotState::Counting;
488 considered_.clear();
489 reachedThreshold_ = 0;
490 resetCounts();
491}
492
493template <typename clock_type>
496{
497 return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
498 return (it.second.state == state);
499 });
500}
501
502template <typename clock_type>
505{
506 return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
507 return (it.second.state != state);
508 });
509}
510
511template <typename clock_type>
514{
516 for (auto const& [id, info] : peers_)
517 if (info.state == PeerState::Selected)
518 r.insert(id);
519 return r;
520}
521
522template <typename clock_type>
524 typename Peer::id_t,
527{
528 using namespace std::chrono;
529 auto r = std::unordered_map<
530 id_t,
532
533 for (auto const& [id, info] : peers_)
534 r.emplace(std::make_pair(
535 id,
536 std::move(std::make_tuple(
537 info.state,
538 info.count,
539 epoch<milliseconds>(info.expire).count(),
540 epoch<milliseconds>(info.lastMessage).count()))));
541
542 return r;
543}
544
549template <typename clock_type>
550class Slots final
551{
552 using time_point = typename clock_type::time_point;
553 using id_t = typename Peer::id_t;
555 uint256,
557 clock_type,
559
560public:
566 Slots(Logs& logs, SquelchHandler const& handler, Config const& config)
567 : handler_(handler)
568 , logs_(logs)
569 , journal_(logs.journal("Slots"))
570 , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
571 , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
572 {
573 }
574 ~Slots() = default;
575
577 bool
582
584 bool
586 {
589 reduce_relay::epoch<std::chrono::minutes>(clock_type::now()) >
591
592 return reduceRelayReady_;
593 }
594
602 void
604 uint256 const& key,
605 PublicKey const& validator,
606 id_t id,
607 protocol::MessageType type)
608 {
609 updateSlotAndSquelch(key, validator, id, type, []() {});
610 }
611
619 void
621 uint256 const& key,
622 PublicKey const& validator,
623 id_t id,
624 protocol::MessageType type,
626
630 void
632
635 inState(PublicKey const& validator, PeerState state) const
636 {
637 auto const& it = slots_.find(validator);
638 if (it != slots_.end())
639 return it->second.inState(state);
640 return {};
641 }
642
645 notInState(PublicKey const& validator, PeerState state) const
646 {
647 auto const& it = slots_.find(validator);
648 if (it != slots_.end())
649 return it->second.notInState(state);
650 return {};
651 }
652
654 bool
655 inState(PublicKey const& validator, SlotState state) const
656 {
657 auto const& it = slots_.find(validator);
658 if (it != slots_.end())
659 return it->second.state_ == state;
660 return false;
661 }
662
665 getSelected(PublicKey const& validator)
666 {
667 auto const& it = slots_.find(validator);
668 if (it != slots_.end())
669 return it->second.getSelected();
670 return {};
671 }
672
677 typename Peer::id_t,
679 getPeers(PublicKey const& validator)
680 {
681 auto const& it = slots_.find(validator);
682 if (it != slots_.end())
683 return it->second.getPeers();
684 return {};
685 }
686
689 getState(PublicKey const& validator)
690 {
691 auto const& it = slots_.find(validator);
692 if (it != slots_.end())
693 return it->second.getState();
694 return {};
695 }
696
703 void
705
706private:
710 bool
711 addPeerMessage(uint256 const& key, id_t id);
712
714
716 SquelchHandler const& handler_; // squelch/unsquelch handler
719
722
723 // Maintain aged container of message/peers. This is required
724 // to discard duplicate message from the same peer. A message
725 // is aged after IDLED seconds. A message received IDLED seconds
726 // after it was relayed is ignored by PeerImp.
728 beast::get_abstract_clock<clock_type>()};
729};
730
731template <typename clock_type>
732bool
734{
735 beast::expire(peersWithMessage_, reduce_relay::IDLED);
736
737 if (key.isNonZero())
738 {
739 auto it = peersWithMessage_.find(key);
740 if (it == peersWithMessage_.end())
741 {
742 JLOG(journal_.trace())
743 << "addPeerMessage: new " << to_string(key) << " " << id;
744 peersWithMessage_.emplace(key, std::unordered_set<id_t>{id});
745 return true;
746 }
747
748 if (it->second.find(id) != it->second.end())
749 {
750 JLOG(journal_.trace()) << "addPeerMessage: duplicate message "
751 << to_string(key) << " " << id;
752 return false;
753 }
754
755 JLOG(journal_.trace())
756 << "addPeerMessage: added " << to_string(key) << " " << id;
757
758 it->second.insert(id);
759 }
760
761 return true;
762}
763
764template <typename clock_type>
765void
767 uint256 const& key,
768 PublicKey const& validator,
769 id_t id,
770 protocol::MessageType type,
772{
773 if (!addPeerMessage(key, id))
774 return;
775
776 auto it = slots_.find(validator);
777 if (it == slots_.end())
778 {
779 JLOG(journal_.trace())
780 << "updateSlotAndSquelch: new slot " << Slice(validator);
781 auto it =
782 slots_
783 .emplace(std::make_pair(
784 validator,
786 handler_, logs_.journal("Slot"), maxSelectedPeers_)))
787 .first;
788 it->second.update(validator, id, type, callback);
789 }
790 else
791 it->second.update(validator, id, type, callback);
792}
793
794template <typename clock_type>
795void
797{
798 for (auto& [validator, slot] : slots_)
799 slot.deletePeer(validator, id, erase);
800}
801
802template <typename clock_type>
803void
805{
806 auto now = clock_type::now();
807
808 for (auto it = slots_.begin(); it != slots_.end();)
809 {
810 it->second.deleteIdlePeer(it->first);
811 if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT)
812 {
813 JLOG(journal_.trace())
814 << "deleteIdlePeers: deleting idle slot " << Slice(it->first);
815 it = slots_.erase(it);
816 }
817 else
818 ++it;
819 }
820}
821
822} // namespace reduce_relay
823
824} // namespace ripple
825
826#endif // XRPL_OVERLAY_SLOT_H_INCLUDED
T begin(T... args)
A generic endpoint for log messages.
Definition Journal.h:41
Associative container where each element is also indexed by time.
Manages partitions for logging.
Definition Log.h:33
std::uint32_t id_t
Uniquely identifies a peer.
A public key.
Definition PublicKey.h:43
An immutable linear range of bytes.
Definition Slice.h:27
std::size_t size() const noexcept
Returns the number of bytes in the storage.
Definition Slice.h:62
bool isNonZero() const
Definition base_uint.h:526
Seed functor once per construction.
Slot is associated with a specific validator via validator's public key.
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
Slot(SquelchHandler const &handler, beast::Journal journal, uint16_t maxSelectedPeers)
Constructor.
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
std::uint16_t reachedThreshold_
uint16_t const maxSelectedPeers_
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
typename clock_type::time_point time_point
SquelchHandler const & handler_
void initCounting()
Initialize slot to Counting state.
time_point const & getLastSelected() const
Get the time of the last peer selection round.
clock_type::time_point lastSelected_
void update(PublicKey const &validator, id_t id, protocol::MessageType type, ignored_squelch_callback callback)
Update peer info.
std::set< id_t > getSelected() const
Return selected peers.
SlotState getState() const
Return Slot's state.
std::uint16_t inState(PeerState state) const
Return number of peers in state.
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
std::unordered_set< id_t > considered_
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between MIN_UNSQUELCH_EXPIRE and min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT,...
std::unordered_map< id_t, PeerInfo > peers_
void resetCounts()
Reset counts of peers in Selected or Counting state.
beast::Journal const journal_
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
typename clock_type::time_point time_point
bool reduceRelayReady()
Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup.
static messages peersWithMessage_
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
beast::Journal const journal_
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
bool baseSquelchReady()
Check if base squelching feature is enabled and ready.
hash_map< PublicKey, Slot< clock_type > > slots_
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
std::unordered_map< typename Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
uint16_t const maxSelectedPeers_
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type, typename Slot< clock_type >::ignored_squelch_callback callback)
Calls Slot::update of Slot associated with the validator.
std::atomic_bool reduceRelayReady_
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
Slots(Logs &logs, SquelchHandler const &handler, Config const &config)
SquelchHandler const & handler_
typename Peer::id_t id_t
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
T end(T... args)
T find(T... args)
T insert(T... args)
T is_same_v
T make_pair(T... args)
T make_tuple(T... args)
T max(T... args)
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
static constexpr auto SQUELCH_PER_PEER
Unit epoch(TP const &t)
static constexpr auto MIN_UNSQUELCH_EXPIRE
SlotState
Slot's State.
static constexpr auto IDLED
PeerState
Peer's State.
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
static constexpr auto WAIT_ON_BOOTUP
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT
static constexpr uint16_t MIN_MESSAGE_THRESHOLD
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
base_uint< 256 > uint256
Definition base_uint.h:539
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition STExchange.h:153
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
T next(T... args)
T push_back(T... args)
T size(T... args)
T str(T... args)
Data maintained for each peer.