xrpld
Loading...
Searching...
No Matches
overlay/Slot.h
1#pragma once
2
3#include <xrpld/core/Config.h>
4#include <xrpld/overlay/Peer.h>
5#include <xrpld/overlay/ReduceRelayCommon.h>
6
7#include <xrpl/basics/Log.h>
8#include <xrpl/basics/chrono.h>
9#include <xrpl/basics/random.h>
10#include <xrpl/beast/container/aged_unordered_map.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/protocol/PublicKey.h>
13#include <xrpl/protocol/messages.h>
14
15#include <algorithm>
16#include <optional>
17#include <set>
18#include <tuple>
19#include <unordered_map>
20#include <unordered_set>
21
22namespace xrpl::reduce_relay {
23
24template <typename ClockType>
25class Slots;
26
28enum class PeerState : uint8_t {
29 Counting, // counting messages
30 Selected, // selected to relay, counting if Slot in Counting
31 Squelched, // squelched, doesn't relay
32};
33
34enum class SlotState : uint8_t {
35 Counting, // counting messages
36 Selected, // peers selected, stop counting
37};
38
39template <typename Unit, typename TP>
40Unit
41epoch(TP const& t)
42{
43 return std::chrono::duration_cast<Unit>(t.time_since_epoch());
44}
45
51{
52public:
53 virtual ~SquelchHandler() = default;
59 virtual void
60 squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration) const = 0;
65 virtual void
66 unsquelch(PublicKey const& validator, Peer::id_t id) const = 0;
67};
68
79template <typename ClockType>
80class Slot final
81{
82private:
83 friend class Slots<ClockType>;
85 using time_point = ClockType::time_point;
86
87 // a callback to report ignored squelches
89
96 Slot(SquelchHandler const& handler, beast::Journal journal, uint16_t maxSelectedPeers)
97 : lastSelected_(ClockType::now())
98 , handler_(handler)
99 , journal_(journal)
100 , maxSelectedPeers_(maxSelectedPeers)
101 {
102 }
103
123 void
125 PublicKey const& validator,
126 id_t id,
127 protocol::MessageType type,
128 ignored_squelch_callback callback);
129
140 void
141 deletePeer(PublicKey const& validator, id_t id, bool erase);
142
144 [[nodiscard]] time_point const&
146 {
147 return lastSelected_;
148 }
149
151 [[nodiscard]] std::uint16_t
152 inState(PeerState state) const;
153
155 [[nodiscard]] std::uint16_t
156 notInState(PeerState state) const;
157
159 [[nodiscard]] SlotState
160 getState() const
161 {
162 return state_;
163 }
164
166 [[nodiscard]] std::set<id_t>
167 getSelected() const;
168
173 getPeers() const;
174
181 void
182 deleteIdlePeer(PublicKey const& validator);
183
191
192private:
194 void
196
198 void
200
202 struct PeerInfo
203 {
204 PeerState state; // peer's state
205 std::size_t count; // message count
206 time_point expire; // squelch expiration time
207 time_point lastMessage; // time last message received
208 };
209
211
212 // pool of peers considered as the source of messages
213 // from validator - peers that reached kMinMessageThreshold
215
216 // number of peers that reached kMaxMessageThreshold
218
219 // last time peers were selected, used to age the slot
220 ClockType::time_point lastSelected_;
221
223 SquelchHandler const& handler_; // squelch/unsquelch handler
224 beast::Journal const journal_; // logging
225
226 // the maximum number of peers that should be selected as a validator
227 // message source
228 uint16_t const maxSelectedPeers_;
229};
230
231template <typename ClockType>
232void
234{
235 using namespace std::chrono;
236 auto now = ClockType::now();
237 for (auto it = peers_.begin(); it != peers_.end();)
238 {
239 auto& peer = it->second;
240 auto id = it->first;
241 ++it;
242 if (now - peer.lastMessage > kIdled)
243 {
244 JLOG(journal_.trace())
245 << "deleteIdlePeer: " << Slice(validator) << " " << id << " idled "
246 << duration_cast<seconds>(now - peer.lastMessage).count() << " selected "
247 << (peer.state == PeerState::Selected);
248 deletePeer(validator, id, false);
249 }
250 }
251}
252
253template <typename ClockType>
254void
256 PublicKey const& validator,
257 id_t id,
258 protocol::MessageType type,
260{
261 using namespace std::chrono;
262 auto now = ClockType::now();
263 auto it = peers_.find(id);
264 // First message from this peer
265 if (it == peers_.end())
266 {
267 JLOG(journal_.trace()) << "update: adding peer " << Slice(validator) << " " << id;
268 peers_.emplace(std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
269 initCounting();
270 return;
271 }
272 // Message from a peer with expired squelch
273 if (it->second.state == PeerState::Squelched && now > it->second.expire)
274 {
275 JLOG(journal_.trace()) << "update: squelch expired " << Slice(validator) << " " << id;
276 it->second.state = PeerState::Counting;
277 it->second.lastMessage = now;
278 initCounting();
279 return;
280 }
281
282 auto& peer = it->second;
283
284 JLOG(journal_.trace()) << "update: existing peer " << Slice(validator) << " " << id
285 << " slot state " << static_cast<int>(state_) << " peer state "
286 << static_cast<int>(peer.state) << " count " << peer.count << " last "
287 << duration_cast<milliseconds>(now - peer.lastMessage).count()
288 << " pool " << considered_.size() << " threshold " << reachedThreshold_
289 << " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
290
291 peer.lastMessage = now;
292
293 // report if we received a message from a squelched peer
294 if (peer.state == PeerState::Squelched)
295 callback();
296
297 if (state_ != SlotState::Counting || peer.state == PeerState::Squelched)
298 return;
299
300 if (++peer.count > kMinMessageThreshold)
301 considered_.insert(id);
302 if (peer.count == (kMaxMessageThreshold + 1))
304
306 {
307 JLOG(journal_.trace()) << "update: resetting due to inactivity " << Slice(validator) << " "
308 << id << " " << duration_cast<seconds>(now - lastSelected_).count();
309 initCounting();
310 return;
311 }
312
314 {
315 // Randomly select maxSelectedPeers_ peers from considered.
316 // Exclude peers that have been idling > IDLED -
317 // it's possible that deleteIdlePeer() has not been called yet.
318 // If number of remaining peers != maxSelectedPeers_
319 // then reset the Counting state and let deleteIdlePeer() handle
320 // idled peers.
322 auto const consideredPoolSize = considered_.size();
323 while (selected.size() != maxSelectedPeers_ && !considered_.empty())
324 {
325 auto i = considered_.size() == 1 ? 0 : randInt(considered_.size() - 1);
326 auto it = std::next(considered_.begin(), i);
327 auto id = *it;
328 considered_.erase(it);
329 auto const& itPeers = peers_.find(id);
330 if (itPeers == peers_.end())
331 {
332 JLOG(journal_.error())
333 << "update: peer not found " << Slice(validator) << " " << id;
334 continue;
335 }
336 if (now - itPeers->second.lastMessage < kIdled)
337 selected.insert(id);
338 }
339
340 if (selected.size() != maxSelectedPeers_)
341 {
342 JLOG(journal_.trace()) << "update: selection failed " << Slice(validator) << " " << id;
343 initCounting();
344 return;
345 }
346
347 lastSelected_ = now;
348
349 auto s = selected.begin();
350 JLOG(journal_.trace()) << "update: " << Slice(validator) << " " << id << " pool size "
351 << consideredPoolSize << " selected " << *s << " "
352 << *std::next(s, 1) << " " << *std::next(s, 2);
353
354 XRPL_ASSERT(
355 peers_.size() >= maxSelectedPeers_, "xrpl::reduce_relay::Slot::update : minimum peers");
356
357 // squelch peers which are not selected and
358 // not already squelched
360 for (auto& [k, v] : peers_)
361 {
362 v.count = 0;
363
364 if (selected.find(k) != selected.end())
365 {
366 v.state = PeerState::Selected;
367 }
368 else if (v.state != PeerState::Squelched)
369 {
370 if (journal_.trace())
371 str << k << " ";
372 v.state = PeerState::Squelched;
375 v.expire = now + duration;
376 handler_.squelch(validator, k, duration.count());
377 }
378 }
379 JLOG(journal_.trace()) << "update: squelching " << Slice(validator) << " " << id << " "
380 << str.str();
381 considered_.clear();
384 }
385}
386
387template <typename ClockType>
390{
391 using namespace std::chrono;
394 {
396 JLOG(journal_.warn()) << "getSquelchDuration: unexpected squelch duration " << npeers;
397 }
398 return seconds{xrpl::randInt(kMinUnsquelchExpire / 1s, m / 1s)};
399}
400
401template <typename ClockType>
402void
404{
405 auto it = peers_.find(id);
406 if (it != peers_.end())
407 {
408 std::vector<Peer::id_t> toUnsquelch;
409
410 JLOG(journal_.trace()) << "deletePeer: " << Slice(validator) << " " << id << " selected "
411 << (it->second.state == PeerState::Selected) << " considered "
412 << (considered_.contains(id)) << " erase " << erase;
413 auto now = ClockType::now();
414 if (it->second.state == PeerState::Selected)
415 {
416 for (auto& [k, v] : peers_)
417 {
418 if (v.state == PeerState::Squelched)
419 toUnsquelch.push_back(k);
420 v.state = PeerState::Counting;
421 v.count = 0;
422 v.expire = now;
423 }
424
425 considered_.clear();
428 }
429 else if (considered_.contains(id))
430 {
431 if (it->second.count > kMaxMessageThreshold)
433 considered_.erase(id);
434 }
435
436 it->second.lastMessage = now;
437 it->second.count = 0;
438
439 if (erase)
440 peers_.erase(it);
441
442 // Must be after peers_.erase(it)
443 for (auto const& k : toUnsquelch)
444 handler_.unsquelch(validator, k);
445 }
446}
447
448template <typename ClockType>
449void
451{
452 for (auto& [_, peer] : peers_)
453 {
454 (void)_;
455 peer.count = 0;
456 }
457}
458
459template <typename ClockType>
460void
468
469template <typename ClockType>
472{
473 return std::count_if(
474 peers_.begin(), peers_.end(), [&](auto const& it) { return (it.second.state == state); });
475}
476
477template <typename ClockType>
480{
481 return std::count_if(
482 peers_.begin(), peers_.end(), [&](auto const& it) { return (it.second.state != state); });
483}
484
485template <typename ClockType>
488{
490 for (auto const& [id, info] : peers_)
491 {
492 if (info.state == PeerState::Selected)
493 r.insert(id);
494 }
495 return r;
496}
497
498template <typename ClockType>
501{
502 using namespace std::chrono;
503 auto r = std::
504 unordered_map<id_t, std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>();
505
506 for (auto const& [id, info] : peers_)
507 {
508 r.emplace(
510 id,
511 std::move(
513 info.state,
514 info.count,
515 epoch<milliseconds>(info.expire).count(),
516 epoch<milliseconds>(info.lastMessage).count()))));
517 }
518
519 return r;
520}
521
526template <typename ClockType>
527class Slots final
528{
529 using time_point = ClockType::time_point;
532 uint256,
534 ClockType,
536
537public:
543 Slots(ServiceRegistry& registry, SquelchHandler const& handler, Config const& config)
544 : handler_(handler)
545 , logs_(registry.getLogs())
546 , journal_(registry.getJournal("Slots"))
547 , baseSquelchEnabled_(config.vpReduceRelayBaseSquelchEnable)
548 , maxSelectedPeers_(config.vpReduceRelaySquelchMaxSelectedPeers)
549 {
550 }
551 ~Slots() = default;
552
554 bool
559
561 bool
572
580 void
582 uint256 const& key,
583 PublicKey const& validator,
584 id_t id,
585 protocol::MessageType type)
586 {
587 updateSlotAndSquelch(key, validator, id, type, []() {});
588 }
589
597 void
599 uint256 const& key,
600 PublicKey const& validator,
601 id_t id,
602 protocol::MessageType type,
604
608 void
610
612 [[nodiscard]] std::optional<std::uint16_t>
613 inState(PublicKey const& validator, PeerState state) const
614 {
615 auto const& it = slots_.find(validator);
616 if (it != slots_.end())
617 return it->second.inState(state);
618 return {};
619 }
620
622 [[nodiscard]] std::optional<std::uint16_t>
623 notInState(PublicKey const& validator, PeerState state) const
624 {
625 auto const& it = slots_.find(validator);
626 if (it != slots_.end())
627 return it->second.notInState(state);
628 return {};
629 }
630
632 [[nodiscard]] bool
633 inState(PublicKey const& validator, SlotState state) const
634 {
635 auto const& it = slots_.find(validator);
636 if (it != slots_.end())
637 return it->second.state_ == state;
638 return false;
639 }
640
643 getSelected(PublicKey const& validator)
644 {
645 auto const& it = slots_.find(validator);
646 if (it != slots_.end())
647 return it->second.getSelected();
648 return {};
649 }
650
655 getPeers(PublicKey const& validator)
656 {
657 auto const& it = slots_.find(validator);
658 if (it != slots_.end())
659 return it->second.getPeers();
660 return {};
661 }
662
665 getState(PublicKey const& validator)
666 {
667 auto const& it = slots_.find(validator);
668 if (it != slots_.end())
669 return it->second.getState();
670 return {};
671 }
672
679 void
681
682private:
686 bool
687 addPeerMessage(uint256 const& key, id_t id);
688
690
692 SquelchHandler const& handler_; // squelch/unsquelch handler
695
698
699 // Maintain aged container of message/peers. This is required
700 // to discard duplicate message from the same peer. A message
701 // is aged after IDLED seconds. A message received IDLED seconds
702 // after it was relayed is ignored by PeerImp.
704};
705
706template <typename ClockType>
707bool
709{
711
712 if (key.isNonZero())
713 {
714 auto it = peersWithMessage.find(key);
715 if (it == peersWithMessage.end())
716 {
717 JLOG(journal_.trace()) << "addPeerMessage: new " << to_string(key) << " " << id;
719 return true;
720 }
721
722 if (it->second.find(id) != it->second.end())
723 {
724 JLOG(journal_.trace())
725 << "addPeerMessage: duplicate message " << to_string(key) << " " << id;
726 return false;
727 }
728
729 JLOG(journal_.trace()) << "addPeerMessage: added " << to_string(key) << " " << id;
730
731 it->second.insert(id);
732 }
733
734 return true;
735}
736
737template <typename ClockType>
738void
740 uint256 const& key,
741 PublicKey const& validator,
742 id_t id,
743 protocol::MessageType type,
745{
746 if (!addPeerMessage(key, id))
747 return;
748
749 auto it = slots_.find(validator);
750 if (it == slots_.end())
751 {
752 JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator);
753 auto it = slots_
754 .emplace(
756 validator,
758 .first;
759 it->second.update(validator, id, type, callback);
760 }
761 else
762 {
763 it->second.update(validator, id, type, callback);
764 }
765}
766
767template <typename ClockType>
768void
770{
771 for (auto& [validator, slot] : slots_)
772 slot.deletePeer(validator, id, erase);
773}
774
775template <typename ClockType>
776void
778{
779 auto now = ClockType::now();
780
781 for (auto it = slots_.begin(); it != slots_.end();)
782 {
783 it->second.deleteIdlePeer(it->first);
784 if (now - it->second.getLastSelected() > kMaxUnsquelchExpireDefault)
785 {
786 JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " << Slice(it->first);
787 it = slots_.erase(it);
788 }
789 else
790 {
791 ++it;
792 }
793 }
794}
795
796} // namespace xrpl::reduce_relay
T begin(T... args)
A generic endpoint for log messages.
Definition Journal.h:38
bool isNonZero() const
Definition base_uint.h:549
Seed functor once per construction.
Manages partitions for logging.
Definition Log.h:20
std::uint32_t id_t
Uniquely identifies a peer.
A public key.
Definition PublicKey.h:42
Service registry for dependency injection.
An immutable linear range of bytes.
Definition Slice.h:26
Slot is associated with a specific validator via validator's public key.
std::function< void()> ignored_squelch_callback
time_point const & getLastSelected() const
Get the time of the last peer selection round.
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
std::uint16_t inState(PeerState state) const
Return number of peers in state.
std::set< id_t > getSelected() const
Return selected peers.
SlotState getState() const
Return Slot's state.
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
Slot(SquelchHandler const &handler, beast::Journal journal, uint16_t maxSelectedPeers)
Constructor.
std::unordered_set< id_t > considered_
void resetCounts()
Reset counts of peers in Selected or Counting state.
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between kMinUnsquelchExpire and min(max(kMaxUnsquelchExpireDefault,...
void initCounting()
Initialize slot to Counting state.
void update(PublicKey const &validator, id_t id, protocol::MessageType type, ignored_squelch_callback callback)
Update peer info.
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
ManualClock::time_point lastSelected_
std::unordered_map< id_t, PeerInfo > peers_
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
Slots(ServiceRegistry &registry, SquelchHandler const &handler, Config const &config)
uint16_t const maxSelectedPeers_
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
beast::Journal const journal_
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
static messages peersWithMessage
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
bool reduceRelayReady()
Check if reduce_relay::kWaitOnBootup time passed since startup.
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
hash_map< PublicKey, Slot< ClockType > > slots_
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
std::atomic_bool reduceRelayReady_
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
beast::aged_unordered_map< uint256, std::unordered_set< Peer::id_t >, ClockType, HardenedHash< strong_hash > > messages
SquelchHandler const & handler_
bool baseSquelchReady()
Check if base squelching feature is enabled and ready.
ClockType::time_point time_point
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
std::unordered_map< Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
T duration_cast(T... args)
T end(T... args)
T find(T... args)
T insert(T... args)
T make_pair(T... args)
T make_tuple(T... args)
T max(T... args)
std::enable_if_t< IsAgedContainer< AgedContainer >::value, std::size_t > expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
AbstractClock< Facade > & getAbstractClock()
Returns a global instance of an abstract clock.
detail::AgedUnorderedContainer< false, true, Key, T, Clock, Hash, KeyEqual, Allocator > aged_unordered_map
static constexpr uint16_t kMinMessageThreshold
static constexpr auto kSquelchPerPeer
static constexpr auto kMinUnsquelchExpire
static constexpr uint16_t kMaxMessageThreshold
Unit epoch(TP const &t)
static constexpr auto kWaitOnBootup
PeerState
Peer's State.
static constexpr auto kMaxUnsquelchExpireDefault
SlotState
Slot's State.
static constexpr auto kMaxUnsquelchExpirePeers
static constexpr auto kIdled
std::enable_if_t< std::is_integral_v< Integral >, Integral > randInt()
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition STExchange.h:148
BaseUInt< 256 > uint256
Definition base_uint.h:562
T next(T... args)
T push_back(T... args)
T size(T... args)
T str(T... args)
Data maintained for each peer.