rippled
Loading...
Searching...
No Matches
overlay/Slot.h
1#pragma once
2
3#include <xrpld/core/Config.h>
4#include <xrpld/overlay/Peer.h>
5#include <xrpld/overlay/ReduceRelayCommon.h>
6
7#include <xrpl/basics/Log.h>
8#include <xrpl/basics/chrono.h>
9#include <xrpl/basics/random.h>
10#include <xrpl/beast/container/aged_unordered_map.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/protocol/PublicKey.h>
13#include <xrpl/protocol/messages.h>
14
15#include <algorithm>
16#include <optional>
17#include <set>
18#include <tuple>
19#include <unordered_map>
20#include <unordered_set>
21
22namespace xrpl {
23
24namespace reduce_relay {
25
26template <typename clock_type>
27class Slots;
28
30enum class PeerState : uint8_t {
31 Counting, // counting messages
32 Selected, // selected to relay, counting if Slot in Counting
33 Squelched, // squelched, doesn't relay
34};
36enum class SlotState : uint8_t {
37 Counting, // counting messages
38 Selected, // peers selected, stop counting
39};
40
41template <typename Unit, typename TP>
42Unit
43epoch(TP const& t)
44{
45 return std::chrono::duration_cast<Unit>(t.time_since_epoch());
46}
47
53{
54public:
56 {
57 }
63 virtual void
64 squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration) const = 0;
69 virtual void
70 unsquelch(PublicKey const& validator, Peer::id_t id) const = 0;
71};
72
83template <typename clock_type>
84class Slot final
85{
86private:
87 friend class Slots<clock_type>;
89 using time_point = typename clock_type::time_point;
90
91 // a callback to report ignored squelches
93
100 Slot(SquelchHandler const& handler, beast::Journal journal, uint16_t maxSelectedPeers)
101 : lastSelected_(clock_type::now())
102 , handler_(handler)
103 , journal_(journal)
104 , maxSelectedPeers_(maxSelectedPeers)
105 {
106 }
107
127 void
129 PublicKey const& validator,
130 id_t id,
131 protocol::MessageType type,
132 ignored_squelch_callback callback);
133
144 void
145 deletePeer(PublicKey const& validator, id_t id, bool erase);
146
148 time_point const&
150 {
151 return lastSelected_;
152 }
153
156 inState(PeerState state) const;
157
160 notInState(PeerState state) const;
161
164 getState() const
165 {
166 return state_;
167 }
168
171 getSelected() const;
172
177 getPeers() const;
178
185 void
186 deleteIdlePeer(PublicKey const& validator);
187
195
196private:
198 void
200
202 void
204
206 struct PeerInfo
207 {
208 PeerState state; // peer's state
209 std::size_t count; // message count
210 time_point expire; // squelch expiration time
211 time_point lastMessage; // time last message received
212 };
213
215
216 // pool of peers considered as the source of messages
217 // from validator - peers that reached MIN_MESSAGE_THRESHOLD
219
220 // number of peers that reached MAX_MESSAGE_THRESHOLD
222
223 // last time peers were selected, used to age the slot
224 typename clock_type::time_point lastSelected_;
225
227 SquelchHandler const& handler_; // squelch/unsquelch handler
228 beast::Journal const journal_; // logging
229
230 // the maximum number of peers that should be selected as a validator
231 // message source
232 uint16_t const maxSelectedPeers_;
233};
234
235template <typename clock_type>
236void
238{
239 using namespace std::chrono;
240 auto now = clock_type::now();
241 for (auto it = peers_.begin(); it != peers_.end();)
242 {
243 auto& peer = it->second;
244 auto id = it->first;
245 ++it;
246 if (now - peer.lastMessage > IDLED)
247 {
248 JLOG(journal_.trace())
249 << "deleteIdlePeer: " << Slice(validator) << " " << id << " idled "
250 << duration_cast<seconds>(now - peer.lastMessage).count() << " selected "
251 << (peer.state == PeerState::Selected);
252 deletePeer(validator, id, false);
253 }
254 }
255}
256
257template <typename clock_type>
258void
260 PublicKey const& validator,
261 id_t id,
262 protocol::MessageType type,
264{
265 using namespace std::chrono;
266 auto now = clock_type::now();
267 auto it = peers_.find(id);
268 // First message from this peer
269 if (it == peers_.end())
270 {
271 JLOG(journal_.trace()) << "update: adding peer " << Slice(validator) << " " << id;
272 peers_.emplace(std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
273 initCounting();
274 return;
275 }
276 // Message from a peer with expired squelch
277 if (it->second.state == PeerState::Squelched && now > it->second.expire)
278 {
279 JLOG(journal_.trace()) << "update: squelch expired " << Slice(validator) << " " << id;
280 it->second.state = PeerState::Counting;
281 it->second.lastMessage = now;
282 initCounting();
283 return;
284 }
285
286 auto& peer = it->second;
287
288 JLOG(journal_.trace()) << "update: existing peer " << Slice(validator) << " " << id
289 << " slot state " << static_cast<int>(state_) << " peer state "
290 << static_cast<int>(peer.state) << " count " << peer.count << " last "
291 << duration_cast<milliseconds>(now - peer.lastMessage).count()
292 << " pool " << considered_.size() << " threshold " << reachedThreshold_
293 << " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
294
295 peer.lastMessage = now;
296
297 // report if we received a message from a squelched peer
298 if (peer.state == PeerState::Squelched)
299 callback();
300
301 if (state_ != SlotState::Counting || peer.state == PeerState::Squelched)
302 return;
303
304 if (++peer.count > MIN_MESSAGE_THRESHOLD)
305 considered_.insert(id);
306 if (peer.count == (MAX_MESSAGE_THRESHOLD + 1))
307 ++reachedThreshold_;
308
309 if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT)
310 {
311 JLOG(journal_.trace()) << "update: resetting due to inactivity " << Slice(validator) << " "
312 << id << " " << duration_cast<seconds>(now - lastSelected_).count();
313 initCounting();
314 return;
315 }
316
317 if (reachedThreshold_ == maxSelectedPeers_)
318 {
319 // Randomly select maxSelectedPeers_ peers from considered.
320 // Exclude peers that have been idling > IDLED -
321 // it's possible that deleteIdlePeer() has not been called yet.
322 // If number of remaining peers != maxSelectedPeers_
323 // then reset the Counting state and let deleteIdlePeer() handle
324 // idled peers.
326 auto const consideredPoolSize = considered_.size();
327 while (selected.size() != maxSelectedPeers_ && considered_.size() != 0)
328 {
329 auto i = considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1);
330 auto it = std::next(considered_.begin(), i);
331 auto id = *it;
332 considered_.erase(it);
333 auto const& itPeers = peers_.find(id);
334 if (itPeers == peers_.end())
335 {
336 JLOG(journal_.error())
337 << "update: peer not found " << Slice(validator) << " " << id;
338 continue;
339 }
340 if (now - itPeers->second.lastMessage < IDLED)
341 selected.insert(id);
342 }
343
344 if (selected.size() != maxSelectedPeers_)
345 {
346 JLOG(journal_.trace()) << "update: selection failed " << Slice(validator) << " " << id;
347 initCounting();
348 return;
349 }
350
351 lastSelected_ = now;
352
353 auto s = selected.begin();
354 JLOG(journal_.trace()) << "update: " << Slice(validator) << " " << id << " pool size "
355 << consideredPoolSize << " selected " << *s << " "
356 << *std::next(s, 1) << " " << *std::next(s, 2);
357
358 XRPL_ASSERT(
359 peers_.size() >= maxSelectedPeers_, "xrpl::reduce_relay::Slot::update : minimum peers");
360
361 // squelch peers which are not selected and
362 // not already squelched
364 for (auto& [k, v] : peers_)
365 {
366 v.count = 0;
367
368 if (selected.find(k) != selected.end())
369 v.state = PeerState::Selected;
370 else if (v.state != PeerState::Squelched)
371 {
372 if (journal_.trace())
373 str << k << " ";
374 v.state = PeerState::Squelched;
376 getSquelchDuration(peers_.size() - maxSelectedPeers_);
377 v.expire = now + duration;
378 handler_.squelch(validator, k, duration.count());
379 }
380 }
381 JLOG(journal_.trace()) << "update: squelching " << Slice(validator) << " " << id << " "
382 << str.str();
383 considered_.clear();
384 reachedThreshold_ = 0;
385 state_ = SlotState::Selected;
386 }
387}
388
389template <typename clock_type>
392{
393 using namespace std::chrono;
396 {
398 JLOG(journal_.warn()) << "getSquelchDuration: unexpected squelch duration " << npeers;
399 }
400 return seconds{xrpl::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)};
401}
402
403template <typename clock_type>
404void
406{
407 auto it = peers_.find(id);
408 if (it != peers_.end())
409 {
410 std::vector<Peer::id_t> toUnsquelch;
411
412 JLOG(journal_.trace()) << "deletePeer: " << Slice(validator) << " " << id << " selected "
413 << (it->second.state == PeerState::Selected) << " considered "
414 << (considered_.find(id) != considered_.end()) << " erase " << erase;
415 auto now = clock_type::now();
416 if (it->second.state == PeerState::Selected)
417 {
418 for (auto& [k, v] : peers_)
419 {
420 if (v.state == PeerState::Squelched)
421 toUnsquelch.push_back(k);
422 v.state = PeerState::Counting;
423 v.count = 0;
424 v.expire = now;
425 }
426
427 considered_.clear();
428 reachedThreshold_ = 0;
429 state_ = SlotState::Counting;
430 }
431 else if (considered_.find(id) != considered_.end())
432 {
433 if (it->second.count > MAX_MESSAGE_THRESHOLD)
434 --reachedThreshold_;
435 considered_.erase(id);
436 }
437
438 it->second.lastMessage = now;
439 it->second.count = 0;
440
441 if (erase)
442 peers_.erase(it);
443
444 // Must be after peers_.erase(it)
445 for (auto const& k : toUnsquelch)
446 handler_.unsquelch(validator, k);
447 }
448}
449
450template <typename clock_type>
451void
453{
454 for (auto& [_, peer] : peers_)
455 {
456 (void)_;
457 peer.count = 0;
458 }
459}
460
461template <typename clock_type>
462void
464{
465 state_ = SlotState::Counting;
466 considered_.clear();
467 reachedThreshold_ = 0;
468 resetCounts();
469}
470
471template <typename clock_type>
474{
475 return std::count_if(
476 peers_.begin(), peers_.end(), [&](auto const& it) { return (it.second.state == state); });
477}
478
479template <typename clock_type>
482{
483 return std::count_if(
484 peers_.begin(), peers_.end(), [&](auto const& it) { return (it.second.state != state); });
485}
486
487template <typename clock_type>
490{
492 for (auto const& [id, info] : peers_)
493 if (info.state == PeerState::Selected)
494 r.insert(id);
495 return r;
496}
497
498template <typename clock_type>
501{
502 using namespace std::chrono;
503 auto r = std::
505
506 for (auto const& [id, info] : peers_)
507 r.emplace(
509 id,
510 std::move(
512 info.state,
513 info.count,
514 epoch<milliseconds>(info.expire).count(),
515 epoch<milliseconds>(info.lastMessage).count()))));
516
517 return r;
518}
519
524template <typename clock_type>
525class Slots final
526{
527 using time_point = typename clock_type::time_point;
528 using id_t = typename Peer::id_t;
530 uint256,
532 clock_type,
534
535public:
541 Slots(ServiceRegistry& registry, SquelchHandler const& handler, Config const& config)
542 : handler_(handler)
543 , logs_(registry.getLogs())
544 , journal_(registry.getJournal("Slots"))
545 , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
546 , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
547 {
548 }
549 ~Slots() = default;
550
552 bool
557
559 bool
561 {
563 reduceRelayReady_ = reduce_relay::epoch<std::chrono::minutes>(clock_type::now()) >
565
566 return reduceRelayReady_;
567 }
568
576 void
578 uint256 const& key,
579 PublicKey const& validator,
580 id_t id,
581 protocol::MessageType type)
582 {
583 updateSlotAndSquelch(key, validator, id, type, []() {});
584 }
585
593 void
595 uint256 const& key,
596 PublicKey const& validator,
597 id_t id,
598 protocol::MessageType type,
600
604 void
606
609 inState(PublicKey const& validator, PeerState state) const
610 {
611 auto const& it = slots_.find(validator);
612 if (it != slots_.end())
613 return it->second.inState(state);
614 return {};
615 }
616
619 notInState(PublicKey const& validator, PeerState state) const
620 {
621 auto const& it = slots_.find(validator);
622 if (it != slots_.end())
623 return it->second.notInState(state);
624 return {};
625 }
626
628 bool
629 inState(PublicKey const& validator, SlotState state) const
630 {
631 auto const& it = slots_.find(validator);
632 if (it != slots_.end())
633 return it->second.state_ == state;
634 return false;
635 }
636
639 getSelected(PublicKey const& validator)
640 {
641 auto const& it = slots_.find(validator);
642 if (it != slots_.end())
643 return it->second.getSelected();
644 return {};
645 }
646
650 std::
652 getPeers(PublicKey const& validator)
653 {
654 auto const& it = slots_.find(validator);
655 if (it != slots_.end())
656 return it->second.getPeers();
657 return {};
658 }
659
662 getState(PublicKey const& validator)
663 {
664 auto const& it = slots_.find(validator);
665 if (it != slots_.end())
666 return it->second.getState();
667 return {};
668 }
669
676 void
678
679private:
683 bool
684 addPeerMessage(uint256 const& key, id_t id);
685
687
689 SquelchHandler const& handler_; // squelch/unsquelch handler
692
695
696 // Maintain aged container of message/peers. This is required
697 // to discard duplicate message from the same peer. A message
698 // is aged after IDLED seconds. A message received IDLED seconds
699 // after it was relayed is ignored by PeerImp.
700 inline static messages peersWithMessage_{beast::get_abstract_clock<clock_type>()};
701};
702
703template <typename clock_type>
704bool
706{
707 beast::expire(peersWithMessage_, reduce_relay::IDLED);
708
709 if (key.isNonZero())
710 {
711 auto it = peersWithMessage_.find(key);
712 if (it == peersWithMessage_.end())
713 {
714 JLOG(journal_.trace()) << "addPeerMessage: new " << to_string(key) << " " << id;
715 peersWithMessage_.emplace(key, std::unordered_set<id_t>{id});
716 return true;
717 }
718
719 if (it->second.find(id) != it->second.end())
720 {
721 JLOG(journal_.trace())
722 << "addPeerMessage: duplicate message " << to_string(key) << " " << id;
723 return false;
724 }
725
726 JLOG(journal_.trace()) << "addPeerMessage: added " << to_string(key) << " " << id;
727
728 it->second.insert(id);
729 }
730
731 return true;
732}
733
734template <typename clock_type>
735void
737 uint256 const& key,
738 PublicKey const& validator,
739 id_t id,
740 protocol::MessageType type,
742{
743 if (!addPeerMessage(key, id))
744 return;
745
746 auto it = slots_.find(validator);
747 if (it == slots_.end())
748 {
749 JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator);
750 auto it = slots_
751 .emplace(
753 validator,
754 Slot<clock_type>(handler_, logs_.journal("Slot"), maxSelectedPeers_)))
755 .first;
756 it->second.update(validator, id, type, callback);
757 }
758 else
759 it->second.update(validator, id, type, callback);
760}
761
762template <typename clock_type>
763void
765{
766 for (auto& [validator, slot] : slots_)
767 slot.deletePeer(validator, id, erase);
768}
769
770template <typename clock_type>
771void
773{
774 auto now = clock_type::now();
775
776 for (auto it = slots_.begin(); it != slots_.end();)
777 {
778 it->second.deleteIdlePeer(it->first);
779 if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT)
780 {
781 JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " << Slice(it->first);
782 it = slots_.erase(it);
783 }
784 else
785 ++it;
786 }
787}
788
789} // namespace reduce_relay
790
791} // namespace xrpl
T begin(T... args)
A generic endpoint for log messages.
Definition Journal.h:40
Associative container where each element is also indexed by time.
Manages partitions for logging.
Definition Log.h:32
std::uint32_t id_t
Uniquely identifies a peer.
A public key.
Definition PublicKey.h:42
Service registry for dependency injection.
An immutable linear range of bytes.
Definition Slice.h:26
std::size_t size() const noexcept
Returns the number of bytes in the storage.
Definition Slice.h:61
bool isNonZero() const
Definition base_uint.h:518
Seed functor once per construction.
Slot is associated with a specific validator via validator's public key.
beast::Journal const journal_
std::uint16_t reachedThreshold_
std::set< id_t > getSelected() const
Return selected peers.
std::uint16_t inState(PeerState state) const
Return number of peers in state.
uint16_t const maxSelectedPeers_
void update(PublicKey const &validator, id_t id, protocol::MessageType type, ignored_squelch_callback callback)
Update peer info.
typename clock_type::time_point time_point
void initCounting()
Initialize slot to Counting state.
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
std::unordered_map< id_t, PeerInfo > peers_
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
time_point const & getLastSelected() const
Get the time of the last peer selection round.
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
SquelchHandler const & handler_
void resetCounts()
Reset counts of peers in Selected or Counting state.
Slot(SquelchHandler const &handler, beast::Journal journal, uint16_t maxSelectedPeers)
Constructor.
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
std::unordered_set< id_t > considered_
clock_type::time_point lastSelected_
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between MIN_UNSQUELCH_EXPIRE and min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT,...
SlotState getState() const
Return Slot's state.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
hash_map< PublicKey, Slot< clock_type > > slots_
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
bool baseSquelchReady()
Check if base squelching feature is enabled and ready.
std::unordered_map< typename Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type, typename Slot< clock_type >::ignored_squelch_callback callback)
Calls Slot::update of Slot associated with the validator.
bool reduceRelayReady()
Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup.
beast::Journal const journal_
SquelchHandler const & handler_
std::atomic_bool reduceRelayReady_
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
Slots(ServiceRegistry &registry, SquelchHandler const &handler, Config const &config)
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
static messages peersWithMessage_
typename Peer::id_t id_t
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
uint16_t const maxSelectedPeers_
typename clock_type::time_point time_point
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
T end(T... args)
T find(T... args)
T insert(T... args)
T is_same_v
T make_pair(T... args)
T make_tuple(T... args)
T max(T... args)
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
static constexpr auto IDLED
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT
static constexpr uint16_t MIN_MESSAGE_THRESHOLD
static constexpr auto WAIT_ON_BOOTUP
static constexpr auto SQUELCH_PER_PEER
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS
static constexpr auto MIN_UNSQUELCH_EXPIRE
Unit epoch(TP const &t)
PeerState
Peer's State.
SlotState
Slot's State.
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
base_uint< 256 > uint256
Definition base_uint.h:531
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition STExchange.h:148
T next(T... args)
T push_back(T... args)
T size(T... args)
T str(T... args)
Data maintained for each peer.