xrpld
Loading...
Searching...
No Matches
Consensus.h
1#pragma once
2
3#include <xrpld/consensus/ConsensusParms.h>
4#include <xrpld/consensus/ConsensusProposal.h>
5#include <xrpld/consensus/ConsensusTypes.h>
6#include <xrpld/consensus/DisputedTx.h>
7
8#include <xrpl/basics/Log.h>
9#include <xrpl/basics/chrono.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/json/json_writer.h>
12#include <xrpl/ledger/LedgerTiming.h>
13
14#include <algorithm>
15#include <chrono>
16#include <deque>
17#include <optional>
18#include <sstream>
19
20namespace xrpl {
21
41bool
43 bool anyTransactions,
44 std::size_t prevProposers,
45 std::size_t proposersClosed,
46 std::size_t proposersValidated,
47 std::chrono::milliseconds prevRoundTime,
48 std::chrono::milliseconds timeSincePrevClose,
49 std::chrono::milliseconds openTime,
50 std::chrono::milliseconds idleInterval,
51 ConsensusParms const& parms,
52 beast::Journal j,
53 std::unique_ptr<std::stringstream> const& clog = {});
54
76 std::size_t prevProposers,
77 std::size_t currentProposers,
78 std::size_t currentAgree,
79 std::size_t currentFinished,
80 std::chrono::milliseconds previousAgreeTime,
81 std::chrono::milliseconds currentAgreeTime,
82 bool stalled,
83 ConsensusParms const& parms,
84 bool proposing,
85 beast::Journal j,
86 std::unique_ptr<std::stringstream> const& clog = {});
87
276template <class Adaptor>
278{
279 using Ledger_t = Adaptor::Ledger_t;
280 using TxSet_t = Adaptor::TxSet_t;
281 using NodeID_t = Adaptor::NodeID_t;
282 using Tx_t = TxSet_t::Tx;
283 using PeerPosition_t = Adaptor::PeerPosition_t;
285
287
288 // Helper class to ensure adaptor is notified whenever the ConsensusMode
289 // changes
291 {
293
294 public:
296 {
297 }
298 [[nodiscard]] ConsensusMode
299 get() const
300 {
301 return mode_;
302 }
303
304 void
305 set(ConsensusMode mode, Adaptor& a)
306 {
307 a.onModeChange(mode_, mode);
308 mode_ = mode;
309 }
310 };
311
312public:
315
316 Consensus(Consensus&&) noexcept = default;
317
324 Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
325
341 void
343 NetClock::time_point const& now,
344 Ledger_t::ID const& prevLedgerID,
345 Ledger_t prevLedger,
346 hash_set<NodeID_t> const& nowUntrusted,
347 bool proposing,
348 std::unique_ptr<std::stringstream> const& clog = {});
349
356 bool
357 peerProposal(NetClock::time_point const& now, PeerPosition_t const& newProposal);
358
364 void
366 NetClock::time_point const& now,
367 std::unique_ptr<std::stringstream> const& clog = {});
368
374 void
375 gotTxSet(NetClock::time_point const& now, TxSet_t const& txSet);
376
393 void
395 NetClock::time_point const& now,
397
405 Ledger_t::ID
407 {
408 return prevLedgerID_;
409 }
410
411 [[nodiscard]] ConsensusPhase
412 phase() const
413 {
414 return phase_;
415 }
416
424 [[nodiscard]] json::Value
425 getJson(bool full) const;
426
427private:
428 void
430 NetClock::time_point const& now,
431 Ledger_t::ID const& prevLedgerID,
432 Ledger_t const& prevLedger,
433 ConsensusMode mode,
435
436 // Change our view of the previous ledger
437 void
438 handleWrongLedger(Ledger_t::ID const& lgrId, std::unique_ptr<std::stringstream> const& clog);
439
445 void
447
451 void
453
456 bool
458
465 void
467
476 void
478
501 [[nodiscard]] bool
503
504 // Close the open ledger and establish initial position.
505 void
507
508 // Adjust our positions to try to agree with other validators.
509 void
511
512 bool
514
515 // Create disputes between our position and the provided one.
516 void
518
519 // Update our disputes given that this node has adopted a new position.
520 // Will call createDisputes as needed.
521 void
522 updateDisputes(NodeID_t const& node, TxSet_t const& other);
523
524 // Revoke our outstanding proposal, if any, and cease proposing
525 // until this round ends.
526 void
528
529 // The rounded or effective close time estimate from a proposer
530 [[nodiscard]] NetClock::time_point
532
533private:
534 Adaptor& adaptor_;
535
538 bool firstRound_ = true;
540
542
543 // How long the consensus convergence has taken, expressed as
544 // a percentage of the time that we expected it to take.
546
547 // How long has this round been open
549
551
553
554 // Time it took for the last consensus round to converge
556
557 //-------------------------------------------------------------------------
558 // Network time measurements of consensus progress
559
560 // The current network adjusted time. This is the network time the
561 // ledger would close if it closed now
564
565 //-------------------------------------------------------------------------
566 // Non-peer (self) consensus data
567
568 // Last validated ledger ID provided to consensus
569 Ledger_t::ID prevLedgerID_;
570 // Last validated ledger seen by consensus
572
573 // Transaction Sets, indexed by hash of transaction tree
575
578
579 // The number of calls to phaseEstablish where none of our peers
580 // have changed any votes on disputed transactions.
582
583 // The total number of times we have called phaseEstablish
585
586 //-------------------------------------------------------------------------
587 // Peer related consensus data
588
589 // Peer proposed positions for the current round
591
592 // Recently received peer positions, available when transitioning between
593 // ledgers or rounds
595
596 // The number of proposers who participated in the last consensus round
598
599 // nodes that have bowed out of this consensus process
601
602 // Journal for debugging
604};
605
606template <class Adaptor>
607Consensus<Adaptor>::Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal journal)
608 : adaptor_(adaptor), clock_(clock), j_{journal}
609{
610 JLOG(j_.debug()) << "Creating consensus object";
611}
612
613template <class Adaptor>
614void
616 NetClock::time_point const& now,
617 Ledger_t::ID const& prevLedgerID,
618 Ledger_t prevLedger,
619 hash_set<NodeID_t> const& nowUntrusted,
620 bool proposing,
622{
623 if (firstRound_)
624 {
625 // take our initial view of closeTime_ from the seed ledger
626 prevRoundTime_ = adaptor_.parms().ledgerIdleInterval;
627 prevCloseTime_ = prevLedger.closeTime();
628 firstRound_ = false;
629 }
630 else
631 {
633 }
634
635 for (NodeID_t const& n : nowUntrusted)
636 recentPeerPositions_.erase(n);
637
639
640 // We were handed the wrong ledger
641 if (prevLedger.id() != prevLedgerID)
642 {
643 // try to acquire the correct one
644 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID))
645 {
646 prevLedger = *newLedger;
647 }
648 else // Unable to acquire the correct ledger
649 {
650 startMode = ConsensusMode::WrongLedger;
651 JLOG(j_.info()) << "Entering consensus with: " << previousLedger_.id();
652 JLOG(j_.info()) << "Correct LCL is: " << prevLedgerID;
653 }
654 }
655
656 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
657}
658template <class Adaptor>
659void
661 NetClock::time_point const& now,
662 Ledger_t::ID const& prevLedgerID,
663 Ledger_t const& prevLedger,
664 ConsensusMode mode,
666{
668 JLOG(j_.debug()) << "transitioned to ConsensusPhase::Open ";
669 CLOG(clog) << "startRoundInternal transitioned to ConsensusPhase::Open, "
670 "previous ledgerID: "
671 << prevLedgerID << ", seq: " << prevLedger.seq() << ". ";
672 mode_.set(mode, adaptor_);
673 now_ = now;
675 previousLedger_ = prevLedger;
676 result_.reset();
680 openTime_.reset(clock_.now());
681 currPeerPositions_.clear();
682 acquired_.clear();
683 rawCloseTimes_.peers.clear();
684 rawCloseTimes_.self = {};
685 deadNodes_.clear();
686
688 previousLedger_.closeTimeResolution(),
689 previousLedger_.closeAgree(),
690 previousLedger_.seq() + typename Ledger_t::Seq{1});
691
693 CLOG(clog) << "number of peer proposals,previous proposers: " << currPeerPositions_.size()
694 << ',' << prevProposers_ << ". ";
695 if (currPeerPositions_.size() > (prevProposers_ / 2))
696 {
697 // We may be falling behind, don't wait for the timer
698 // consider closing the ledger immediately
699 CLOG(clog) << "consider closing the ledger immediately. ";
700 timerEntry(now_, clog);
701 }
702}
703
704template <class Adaptor>
705bool
707{
708 JLOG(j_.debug()) << "PROPOSAL " << newPeerPos.render();
709 auto const& peerID = newPeerPos.proposal().nodeID();
710
711 // Always need to store recent positions
712 {
713 auto& props = recentPeerPositions_[peerID];
714
715 if (props.size() >= 10)
716 props.pop_front();
717
718 props.push_back(newPeerPos);
719 }
720 return peerProposalInternal(now, newPeerPos);
721}
722
723template <class Adaptor>
724bool
726 NetClock::time_point const& now,
727 PeerPosition_t const& newPeerPos)
728{
729 // Nothing to do for now if we are currently working on a ledger
731 return false;
732
733 now_ = now;
734
735 auto const& newPeerProp = newPeerPos.proposal();
736
737 if (newPeerProp.prevLedger() != prevLedgerID_)
738 {
739 JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger() << " but we are on "
740 << prevLedgerID_;
741 return false;
742 }
743
744 auto const& peerID = newPeerProp.nodeID();
745
746 if (deadNodes_.find(peerID) != deadNodes_.end())
747 {
748 JLOG(j_.info()) << "Position from dead node: " << peerID;
749 return false;
750 }
751
752 {
753 // update current position
754 auto peerPosIt = currPeerPositions_.find(peerID);
755
756 if (peerPosIt != currPeerPositions_.end())
757 {
758 if (newPeerProp.proposeSeq() <= peerPosIt->second.proposal().proposeSeq())
759 {
760 return false;
761 }
762 }
763
764 if (newPeerProp.isBowOut())
765 {
766 JLOG(j_.info()) << "Peer " << peerID << " bows out";
767 if (result_)
768 {
769 for (auto& it : result_->disputes)
770 it.second.unVote(peerID);
771 }
772 if (peerPosIt != currPeerPositions_.end())
773 currPeerPositions_.erase(peerID);
774 deadNodes_.insert(peerID);
775
776 return true;
777 }
778
779 if (peerPosIt != currPeerPositions_.end())
780 {
781 peerPosIt->second = newPeerPos;
782 }
783 else
784 {
785 currPeerPositions_.emplace(peerID, newPeerPos);
786 }
787 }
788
789 if (newPeerProp.isInitial())
790 {
791 // Record the close time estimate
792 JLOG(j_.trace()) << "Peer reports close time as "
793 << newPeerProp.closeTime().time_since_epoch().count();
794 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
795 }
796
797 JLOG(j_.trace()) << "Processing peer proposal " << newPeerProp.proposeSeq() << "/"
798 << newPeerProp.position();
799
800 {
801 auto const ait = acquired_.find(newPeerProp.position());
802 if (ait == acquired_.end())
803 {
804 // acquireTxSet will return the set if it is available, or
805 // spawn a request for it and return nullopt/nullptr. It will call
806 // gotTxSet once it arrives
807 if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
808 {
809 gotTxSet(now_, *set);
810 }
811 else
812 {
813 JLOG(j_.debug()) << "Don't have tx set for peer";
814 }
815 }
816 else if (result_)
817 {
818 updateDisputes(newPeerProp.nodeID(), ait->second);
819 }
820 }
821
822 return true;
823}
824
825template <class Adaptor>
826void
828 NetClock::time_point const& now,
830{
831 CLOG(clog) << "Consensus<Adaptor>::timerEntry. ";
832 // Nothing to do if we are currently working on a ledger
834 {
835 CLOG(clog) << "Nothing to do during accepted phase. ";
836 return;
837 }
838
839 now_ = now;
840 CLOG(clog) << "Set network adjusted time to " << to_string(now) << ". ";
841
842 // Check we are on the proper ledger (this may change phase_)
843 auto const phaseOrig = phase_;
844 CLOG(clog) << "Phase " << to_string(phaseOrig) << ". ";
845 checkLedger(clog);
846 if (phaseOrig != phase_)
847 {
848 CLOG(clog) << "Changed phase to << " << to_string(phase_) << ". ";
849 }
850
852 {
853 phaseOpen(clog);
854 }
856 {
857 phaseEstablish(clog);
858 }
859 CLOG(clog) << "timerEntry finishing in phase " << to_string(phase_) << ". ";
860}
861
862template <class Adaptor>
863void
865{
866 // Nothing to do if we've finished work on a ledger
868 return;
869
870 now_ = now;
871
872 auto id = txSet.id();
873
874 // If we've already processed this transaction set since requesting
875 // it from the network, there is nothing to do now
876 if (!acquired_.emplace(id, txSet).second)
877 return;
878
879 if (!result_)
880 {
881 JLOG(j_.debug()) << "Not creating disputes: no position yet.";
882 }
883 else
884 {
885 // Our position is added to acquired_ as soon as we create it,
886 // so this txSet must differ
887 XRPL_ASSERT(
888 id != result_->position.position(),
889 "xrpl::Consensus::gotTxSet : updated transaction set");
890 bool any = false;
891 for (auto const& [nodeId, peerPos] : currPeerPositions_)
892 {
893 if (peerPos.proposal().position() == id)
894 {
895 updateDisputes(nodeId, txSet);
896 any = true;
897 }
898 }
899
900 if (!any)
901 {
902 JLOG(j_.warn()) << "By the time we got " << id << " no peers were proposing it";
903 }
904 }
905}
906
907template <class Adaptor>
908void
910 NetClock::time_point const& now,
912{
913 using namespace std::chrono_literals;
914 JLOG(j_.info()) << "Simulating consensus";
915 now_ = now;
916 closeLedger({});
917 // NOLINTBEGIN(bugprone-unchecked-optional-access) closeLedger sets result_
918 result_->roundTime.tick(consensusDelay.value_or(100ms));
919 result_->proposers = prevProposers_ = currPeerPositions_.size();
920 prevRoundTime_ = result_->roundTime.read();
922 adaptor_.onForceAccept(
924 // NOLINTEND(bugprone-unchecked-optional-access)
925 JLOG(j_.info()) << "Simulation complete";
926}
927
928template <class Adaptor>
931{
932 using std::to_string;
933 using Int = json::Value::Int;
934
936
937 ret["proposing"] = (mode_.get() == ConsensusMode::Proposing);
938 ret["proposers"] = static_cast<int>(currPeerPositions_.size());
939
941 {
942 ret["synched"] = true;
943 ret["ledger_seq"] = static_cast<std::uint32_t>(previousLedger_.seq()) + 1;
944 ret["close_granularity"] = static_cast<Int>(closeResolution_.count());
945 }
946 else
947 {
948 ret["synched"] = false;
949 }
950
951 ret["phase"] = to_string(phase_);
952
953 if (result_ && !result_->disputes.empty() && !full)
954 ret["disputes"] = static_cast<Int>(result_->disputes.size());
955
956 if (result_)
957 ret["our_position"] = result_->position.getJson();
958
959 if (full)
960 {
961 if (result_)
962 ret["current_ms"] = static_cast<Int>(result_->roundTime.read().count());
963 ret["converge_percent"] = convergePercent_;
964 ret["close_resolution"] = static_cast<Int>(closeResolution_.count());
965 ret["have_time_consensus"] = haveCloseTimeConsensus_;
966 ret["previous_proposers"] = static_cast<Int>(prevProposers_);
967 ret["previous_mseconds"] = static_cast<Int>(prevRoundTime_.count());
968
969 if (!currPeerPositions_.empty())
970 {
972
973 for (auto const& [nodeId, peerPos] : currPeerPositions_)
974 {
975 ppj[to_string(nodeId)] = peerPos.getJson();
976 }
977 ret["peer_positions"] = std::move(ppj);
978 }
979
980 if (!acquired_.empty())
981 {
983 for (auto const& at : acquired_)
984 {
985 acq.append(to_string(at.first));
986 }
987 ret["acquired"] = std::move(acq);
988 }
989
990 if (result_ && !result_->disputes.empty())
991 {
993 for (auto const& [txId, dispute] : result_->disputes)
994 {
995 dsj[to_string(txId)] = dispute.getJson();
996 }
997 ret["disputes"] = std::move(dsj);
998 }
999
1000 if (!rawCloseTimes_.peers.empty())
1001 {
1003 for (auto const& ct : rawCloseTimes_.peers)
1004 {
1005 ctj[std::to_string(ct.first.time_since_epoch().count())] = ct.second;
1006 }
1007 ret["close_times"] = std::move(ctj);
1008 }
1009
1010 if (!deadNodes_.empty())
1011 {
1013 for (auto const& dn : deadNodes_)
1014 {
1015 dnj.append(to_string(dn));
1016 }
1017 ret["dead_nodes"] = std::move(dnj);
1018 }
1019 }
1020
1021 return ret;
1022}
1023
1024// Handle a change in the prior ledger during a consensus round
1025template <class Adaptor>
1026void
1028 Ledger_t::ID const& lgrId,
1030{
1031 CLOG(clog) << "handleWrongLedger. ";
1032 XRPL_ASSERT(
1033 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1034 "xrpl::Consensus::handleWrongLedger : have wrong ledger");
1035
1036 // Stop proposing because we are out of sync
1037 leaveConsensus(clog);
1038
1039 // First time switching to this ledger
1040 if (prevLedgerID_ != lgrId)
1041 {
1042 prevLedgerID_ = lgrId;
1043
1044 // Clear out state
1045 if (result_)
1046 {
1047 result_->disputes.clear();
1048 result_->compares.clear();
1049 }
1050
1051 currPeerPositions_.clear();
1052 rawCloseTimes_.peers.clear();
1053 deadNodes_.clear();
1054
1055 // Get back in sync, this will also recreate disputes
1057 }
1058
1059 if (previousLedger_.id() == prevLedgerID_)
1060 {
1061 CLOG(clog) << "previousLedger_.id() == prevLeverID_ " << prevLedgerID_ << ". ";
1062 return;
1063 }
1064
1065 // we need to switch the ledger we're working from
1066 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1067 {
1068 JLOG(j_.info()) << "Have the consensus ledger " << prevLedgerID_;
1069 CLOG(clog) << "Have the consensus ledger " << prevLedgerID_ << ". ";
1070 startRoundInternal(now_, lgrId, *newLedger, ConsensusMode::SwitchedLedger, clog);
1071 }
1072 else
1073 {
1074 CLOG(clog) << "Still on wrong ledger. ";
1076 }
1077}
1078
1079template <class Adaptor>
1080void
1082{
1083 CLOG(clog) << "checkLedger. ";
1084
1085 auto netLgr = adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1086 CLOG(clog) << "network ledgerid " << netLgr << ", "
1087 << "previous ledger " << prevLedgerID_ << ". ";
1088
1089 if (netLgr != prevLedgerID_)
1090 {
1092 ss << "View of consensus changed during " << to_string(phase_)
1093 << " mode=" << to_string(mode_.get()) << ", " << prevLedgerID_ << " to " << netLgr
1094 << ", " << json::Compact{previousLedger_.getJson()} << ". ";
1095 JLOG(j_.warn()) << ss.str();
1096 CLOG(clog) << ss.str();
1097 CLOG(clog) << "State on consensus change " << json::Compact{getJson(true)} << ". ";
1098 handleWrongLedger(netLgr, clog);
1099 }
1100 else if (previousLedger_.id() != prevLedgerID_)
1101 {
1102 CLOG(clog) << "previousLedger_.id() != prevLedgerID_: " << previousLedger_.id() << ','
1103 << to_string(prevLedgerID_) << ". ";
1104 handleWrongLedger(netLgr, clog);
1105 }
1106}
1107
1108template <class Adaptor>
1109void
1111{
1112 for (auto const& it : recentPeerPositions_)
1113 {
1114 for (auto const& pos : it.second)
1115 {
1116 if (pos.proposal().prevLedger() == prevLedgerID_)
1117 {
1118 if (peerProposalInternal(now_, pos))
1119 adaptor_.share(pos);
1120 }
1121 }
1122 }
1123}
1124
1125template <class Adaptor>
1126void
1128{
1129 CLOG(clog) << "phaseOpen. ";
1130 using namespace std::chrono;
1131
1132 // it is shortly before ledger close time
1133 bool const anyTransactions = adaptor_.hasOpenTransactions();
1134 auto proposersClosed = currPeerPositions_.size();
1135 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1136
1137 openTime_.tick(clock_.now());
1138
1139 // This computes how long since last ledger's close time
1140 milliseconds sinceClose;
1141 {
1142 auto const mode = mode_.get();
1143 bool const closeAgree = previousLedger_.closeAgree();
1144 auto const prevCloseTime = previousLedger_.closeTime();
1145 auto const prevParentCloseTimePlus1 = previousLedger_.parentCloseTime() + 1s;
1146 bool const previousCloseCorrect = (mode != ConsensusMode::WrongLedger) && closeAgree &&
1147 (prevCloseTime != prevParentCloseTimePlus1);
1148
1149 auto const lastCloseTime = previousCloseCorrect
1150 ? prevCloseTime // use consensus timing
1151 : prevCloseTime_; // use the time we saw internally
1152
1153 if (now_ >= lastCloseTime)
1154 {
1155 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1156 }
1157 else
1158 {
1159 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1160 }
1161 CLOG(clog) << "calculating how long since last ledger's close time "
1162 "based on mode : "
1163 << to_string(mode) << ", previous closeAgree: " << closeAgree
1164 << ", previous close time: " << to_string(prevCloseTime)
1165 << ", previous parent close time + 1s: " << to_string(prevParentCloseTimePlus1)
1166 << ", previous close time seen internally: " << to_string(prevCloseTime_)
1167 << ", last close time: " << to_string(lastCloseTime)
1168 << ", since close: " << sinceClose.count() << ". ";
1169 }
1170
1171 auto const idleInterval = std::max<milliseconds>(
1172 adaptor_.parms().ledgerIdleInterval, 2 * previousLedger_.closeTimeResolution());
1173 CLOG(clog) << "idle interval set to " << idleInterval.count() << "ms based on "
1174 << "ledgerIDLE_INTERVAL: " << adaptor_.parms().ledgerIdleInterval.count()
1175 << ", previous ledger close time resolution: "
1176 << previousLedger_.closeTimeResolution().count() << "ms. ";
1177
1178 // Decide if we should close the ledger
1180 anyTransactions,
1182 proposersClosed,
1183 proposersValidated,
1185 sinceClose,
1186 openTime_.read(),
1187 idleInterval,
1188 adaptor_.parms(),
1189 j_,
1190 clog))
1191 {
1192 CLOG(clog) << "closing ledger. ";
1194 }
1195}
1196
1197template <class Adaptor>
1198bool
1200{
1201 CLOG(clog) << "shouldPause? ";
1202 auto const& parms = adaptor_.parms();
1203 std::uint32_t const ahead(
1204 previousLedger_.seq() - std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1205 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1206 std::size_t const totalValidators = trustedKeys.size();
1207 std::size_t const laggards = adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1208 std::size_t const offline = trustedKeys.size();
1209
1210 std::stringstream vars;
1211 vars << " consensuslog (working seq: " << previousLedger_.seq() << ", "
1212 << "validated seq: " << adaptor_.getValidLedgerIndex() << ", "
1213 << "am validator: " << adaptor_.validator() << ", "
1214 << "have validated: " << adaptor_.haveValidated()
1215 << ", "
1216 // NOLINTBEGIN(bugprone-unchecked-optional-access) result_ is always set when shouldPause
1217 // is called (from phaseEstablish after assert)
1218 << "roundTime: " << result_->roundTime.read().count()
1219 << ", "
1220 // NOLINTEND(bugprone-unchecked-optional-access)
1221 << "max consensus time: " << parms.ledgerMaxConsensus.count() << ", "
1222 << "validators: " << totalValidators << ", "
1223 << "laggards: " << laggards << ", "
1224 << "offline: " << offline << ", "
1225 << "quorum: " << quorum << ")";
1226
1227 if ((ahead == 0u) || (laggards == 0u) || (totalValidators == 0u) || !adaptor_.validator() ||
1228 !adaptor_.haveValidated() ||
1229 // NOLINTNEXTLINE(bugprone-unchecked-optional-access) result_ set as shouldPause called
1230 result_->roundTime.read() > parms.ledgerMaxConsensus)
1231 {
1232 j_.debug() << "not pausing (early)" << vars.str();
1233 CLOG(clog) << "Not pausing (early). ";
1234 return false;
1235 }
1236
1237 bool willPause = false;
1238
1252 static constexpr std::size_t kMaxPausePhase = 4;
1253
1273 std::size_t const phase = (ahead - 1) % (kMaxPausePhase + 1);
1274
1275 // validators that remain after the laggards() function are considered
1276 // offline, and should be considered as laggards for purposes of
1277 // evaluating whether the threshold for non-laggards has been reached.
1278 switch (phase)
1279 {
1280 case 0:
1281 // Laggards and offline shouldn't preclude consensus.
1282 if (laggards + offline > totalValidators - quorum)
1283 willPause = true;
1284 break;
1285 case kMaxPausePhase:
1286 // No tolerance.
1287 willPause = true;
1288 break;
1289 default:
1290 // Ensure that sufficient validators are known to be not lagging.
1291 // Their sufficiently most recent validation sequence was equal to
1292 // or greater than our own.
1293 //
1294 // The threshold is the amount required for quorum plus
1295 // the proportion of the remainder based on number of intermediate
1296 // phases between 0 and max.
1297 float const nonLaggards = totalValidators - (laggards + offline);
1298 float const quorumRatio = static_cast<float>(quorum) / totalValidators;
1299 float const allowedDissent = 1.0f - quorumRatio;
1300 float const phaseFactor = static_cast<float>(phase) / kMaxPausePhase;
1301
1302 if (nonLaggards / totalValidators < quorumRatio + (allowedDissent * phaseFactor))
1303 {
1304 willPause = true;
1305 }
1306 }
1307
1308 if (willPause)
1309 {
1310 j_.warn() << "pausing" << vars.str();
1311 CLOG(clog) << "pausing " << vars.str() << ". ";
1312 }
1313 else
1314 {
1315 j_.debug() << "not pausing" << vars.str();
1316 CLOG(clog) << "not pausing. ";
1317 }
1318 return willPause;
1319}
1320
1321template <class Adaptor>
1322void
1324{
1325 CLOG(clog) << "phaseEstablish. ";
1326 // can only establish consensus if we already took a stance
1327 XRPL_ASSERT(result_, "xrpl::Consensus::phaseEstablish : result is set");
1328 // NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
1329
1332
1333 using namespace std::chrono;
1334 ConsensusParms const& parms = adaptor_.parms();
1335
1336 result_->roundTime.tick(clock_.now());
1337 result_->proposers = currPeerPositions_.size();
1338
1339 convergePercent_ = result_->roundTime.read() * 100 /
1341 CLOG(clog) << "convergePercent_ " << convergePercent_
1342 << " is based on round duration so far: " << result_->roundTime.read().count()
1343 << "ms, "
1344 << "previous round duration: " << prevRoundTime_.count() << "ms, "
1345 << "avMIN_CONSENSUS_TIME: " << parms.avMinConsensusTime.count() << "ms. ";
1346
1347 // Give everyone a chance to take an initial position
1348 if (result_->roundTime.read() < parms.ledgerMinConsensus)
1349 {
1350 CLOG(clog) << "ledgerMIN_CONSENSUS not reached: " << parms.ledgerMinConsensus.count()
1351 << "ms. ";
1352 return;
1353 }
1354
1356
1357 // Nothing to do if too many laggards or we don't have consensus.
1359 return;
1360
1362 {
1363 JLOG(j_.info()) << "We have TX consensus but not CT consensus";
1364 CLOG(clog) << "We have TX consensus but not CT consensus. ";
1365 return;
1366 }
1367
1368 JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size() << " participants)";
1369 CLOG(clog) << "Converge cutoff (" << currPeerPositions_.size()
1370 << " participants). Transitioned to ConsensusPhase::Accepted. ";
1371 adaptor_.updateOperatingMode(currPeerPositions_.size());
1373 prevRoundTime_ = result_->roundTime.read();
1375 JLOG(j_.debug()) << "transitioned to ConsensusPhase::Accepted";
1376 adaptor_.onAccept(
1377 *result_,
1381 mode_.get(),
1382 getJson(true),
1383 adaptor_.validating());
1384 // NOLINTEND(bugprone-unchecked-optional-access)
1385}
1386
1387template <class Adaptor>
1388void
1390{
1391 // We should not be closing if we already have a position
1392 XRPL_ASSERT(!result_, "xrpl::Consensus::closeLedger : result is not set");
1393
1395 JLOG(j_.debug()) << "transitioned to ConsensusPhase::Establish";
1396 rawCloseTimes_.self = now_;
1399
1400 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1401 result_->roundTime.reset(clock_.now());
1402 // Share the newly created transaction set if we haven't already
1403 // received it from a peer
1404 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1405 adaptor_.share(result_->txns);
1406
1407 auto const mode = mode_.get();
1408 CLOG(clog) << "closeLedger transitioned to ConsensusPhase::Establish, mode: " << to_string(mode)
1409 << ", number of peer positions: " << currPeerPositions_.size() << ". ";
1410 if (mode == ConsensusMode::Proposing)
1411 adaptor_.propose(result_->position);
1412
1413 // Create disputes with any peer positions we have transactions for
1414 for (auto const& pit : currPeerPositions_)
1415 {
1416 auto const& pos = pit.second.proposal().position();
1417 auto const it = acquired_.find(pos);
1418 if (it != acquired_.end())
1419 createDisputes(it->second, clog);
1420 }
1421}
1422
1435inline int
1436participantsNeeded(int participants, int percent)
1437{
1438 int const result = ((participants * percent) + (percent / 2)) / 100;
1439
1440 return (result == 0) ? 1 : result;
1441}
1442
1443template <class Adaptor>
1444void
1446{
1447 // We must have a position if we are updating it
1448 XRPL_ASSERT(result_, "xrpl::Consensus::updateOurPositions : result is set");
1449 // NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
1450 ConsensusParms const& parms = adaptor_.parms();
1451
1452 // Compute a cutoff time
1453 auto const peerCutoff = now_ - parms.proposeFRESHNESS;
1454 auto const ourCutoff = now_ - parms.proposeINTERVAL;
1455 CLOG(clog) << "updateOurPositions. peerCutoff " << to_string(peerCutoff) << ", ourCutoff "
1456 << to_string(ourCutoff) << ". ";
1457
1458 // Verify freshness of peer positions and compute close times
1460 {
1461 auto it = currPeerPositions_.begin();
1462 while (it != currPeerPositions_.end())
1463 {
1464 Proposal_t const& peerProp = it->second.proposal();
1465 if (peerProp.isStale(peerCutoff))
1466 {
1467 // peer's proposal is stale, so remove it
1468 NodeID_t const& peerID = peerProp.nodeID();
1469 JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
1470 for (auto& dt : result_->disputes)
1471 dt.second.unVote(peerID);
1472 it = currPeerPositions_.erase(it);
1473 }
1474 else
1475 {
1476 // proposal is still fresh
1477 ++closeTimeVotes[asCloseTime(peerProp.closeTime())];
1478 ++it;
1479 }
1480 }
1481 }
1482
1483 // This will stay unseated unless there are any changes
1484 std::optional<TxSet_t> ourNewSet;
1485
1486 // Update votes on disputed transactions
1487 {
1489 for (auto& [txId, dispute] : result_->disputes)
1490 {
1491 // Because the threshold for inclusion increases,
1492 // time can change our position on a dispute
1493 if (dispute.updateVote(
1495 {
1496 if (!mutableSet)
1497 mutableSet.emplace(result_->txns);
1498
1499 if (dispute.getOurVote())
1500 {
1501 // now a yes
1502 mutableSet->insert(dispute.tx());
1503 }
1504 else
1505 {
1506 // now a no
1507 mutableSet->erase(txId);
1508 }
1509 }
1510 }
1511
1512 if (mutableSet)
1513 ourNewSet.emplace(std::move(*mutableSet));
1514 }
1515
1516 NetClock::time_point consensusCloseTime = {};
1518
1519 if (currPeerPositions_.empty())
1520 {
1521 // no other times
1523 consensusCloseTime = asCloseTime(result_->position.closeTime());
1524 }
1525 else
1526 {
1527 // We don't track rounds for close time, so just pass 0s
1528 auto const [neededWeight, newState] =
1530 if (newState)
1531 closeTimeAvalancheState_ = *newState;
1532 CLOG(clog) << "neededWeight " << neededWeight << ". ";
1533
1534 int participants = currPeerPositions_.size();
1535 if (mode_.get() == ConsensusMode::Proposing)
1536 {
1537 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1538 ++participants;
1539 }
1540
1541 // Threshold for non-zero vote
1542 int threshVote = participantsNeeded(participants, neededWeight);
1543
1544 // Threshold to declare consensus
1545 int const threshConsensus = participantsNeeded(participants, parms.avCtConsensusPct);
1546
1548 ss << "Proposers:" << currPeerPositions_.size() << " nw:" << neededWeight
1549 << " thrV:" << threshVote << " thrC:" << threshConsensus;
1550 JLOG(j_.info()) << ss.str();
1551 CLOG(clog) << ss.str();
1552
1553 for (auto const& [t, v] : closeTimeVotes)
1554 {
1555 JLOG(j_.debug()) << "CCTime: seq "
1556 << static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": "
1557 << t.time_since_epoch().count() << " has " << v << ", " << threshVote
1558 << " required";
1559
1560 if (v >= threshVote)
1561 {
1562 // A close time has enough votes for us to try to agree
1563 consensusCloseTime = t;
1564 threshVote = v;
1565
1566 if (threshVote >= threshConsensus)
1568 }
1569 }
1570
1572 {
1573 JLOG(j_.debug()) << "No CT consensus:"
1574 << " Proposers:" << currPeerPositions_.size()
1575 << " Mode:" << to_string(mode_.get()) << " Thresh:" << threshConsensus
1576 << " Pos:" << consensusCloseTime.time_since_epoch().count();
1577 CLOG(clog) << "No close time consensus. ";
1578 }
1579 }
1580
1581 if (!ourNewSet &&
1582 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1583 result_->position.isStale(ourCutoff)))
1584 {
1585 // close time changed or our position is stale
1586 ourNewSet.emplace(result_->txns);
1587 }
1588
1589 if (ourNewSet)
1590 {
1591 auto newID = ourNewSet->id();
1592
1593 result_->txns = std::move(*ourNewSet);
1594
1596 ss << "Position change: CTime " << consensusCloseTime.time_since_epoch().count() << ", tx "
1597 << newID;
1598 JLOG(j_.info()) << ss.str();
1599 CLOG(clog) << ss.str();
1600
1601 result_->position.changePosition(newID, consensusCloseTime, now_);
1602
1603 // Share our new transaction set and update disputes
1604 // if we haven't already received it
1605 if (acquired_.emplace(newID, result_->txns).second)
1606 {
1607 if (!result_->position.isBowOut())
1608 adaptor_.share(result_->txns);
1609
1610 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1611 {
1612 Proposal_t const& p = peerPos.proposal();
1613 if (p.position() == newID)
1614 updateDisputes(nodeId, result_->txns);
1615 }
1616 }
1617
1618 // Share our new position if we are still participating this round
1619 if (!result_->position.isBowOut() && (mode_.get() == ConsensusMode::Proposing))
1620 adaptor_.propose(result_->position);
1621 }
1622 // NOLINTEND(bugprone-unchecked-optional-access)
1623}
1624
1625template <class Adaptor>
1626bool
1628{
1629 // Must have a stance if we are checking for consensus
1630 XRPL_ASSERT(result_, "xrpl::Consensus::haveConsensus : has result");
1631 // NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
1632
1633 // CHECKME: should possibly count unacquired TX sets as disagreeing
1634 int agree = 0, disagree = 0;
1635
1636 auto ourPosition = result_->position.position();
1637
1638 // Count number of agreements/disagreements with our position
1639 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1640 {
1641 Proposal_t const& peerProp = peerPos.proposal();
1642 if (peerProp.position() == ourPosition)
1643 {
1644 ++agree;
1645 }
1646 else
1647 {
1648 JLOG(j_.debug()) << "Proposal disagreement: Peer " << nodeId << " has "
1649 << peerProp.position();
1650 ++disagree;
1651 }
1652 }
1653 auto currentFinished = adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1654
1655 JLOG(j_.debug()) << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree;
1656
1657 ConsensusParms const& parms = adaptor_.parms();
1658 // Stalling is BAD. It means that we have a consensus on the close time, so
1659 // peers are talking, but we have disputed transactions that peers are
1660 // unable or unwilling to come to agreement on one way or the other.
1661 bool const stalled =
1662 haveCloseTimeConsensus_ && !result_->disputes.empty() &&
1663 std::ranges::all_of(result_->disputes, [this, &parms, &clog](auto const& dispute) {
1664 return dispute.second.stalled(
1665 parms, mode_.get() == ConsensusMode::Proposing, peerUnchangedCounter_, j_, clog);
1666 });
1667 if (stalled)
1668 {
1670 ss << "Consensus detects as stalled with " << (agree + disagree) << "/" << prevProposers_
1671 << " proposers, and " << result_->disputes.size() << " stalled disputed transactions.";
1672 JLOG(j_.error()) << ss.str();
1673 CLOG(clog) << ss.str();
1674 }
1675
1676 // Determine if we actually have consensus or not
1677 result_->state = checkConsensus(
1679 agree + disagree,
1680 agree,
1681 currentFinished,
1683 result_->roundTime.read(),
1684 stalled,
1685 parms,
1687 j_,
1688 clog);
1689
1690 if (result_->state == ConsensusState::No)
1691 {
1692 CLOG(clog) << "No consensus. ";
1693 return false;
1694 }
1695
1696 // Consensus has taken far too long. Drop out of the round.
1697 if (result_->state == ConsensusState::Expired)
1698 {
1699 static auto const kMinimumCounter = parms.avalancheCutoffs.size() * parms.avMinRounds;
1701 if (establishCounter_ < kMinimumCounter)
1702 {
1703 // If each round of phaseEstablish takes a very long time, we may
1704 // "expire" before we've given consensus enough time at each
1705 // avalanche level to actually come to a consensus. In that case,
1706 // keep trying. This should only happen if there are an extremely
1707 // large number of disputes such that each round takes an inordinate
1708 // amount of time.
1709
1710 ss << "Consensus time has expired in round " << establishCounter_
1711 << "; continue until round " << kMinimumCounter << ". "
1712 << json::Compact{getJson(false)};
1713 JLOG(j_.error()) << ss.str();
1714 CLOG(clog) << ss.str() << ". ";
1715 return false;
1716 }
1717 ss << "Consensus expired. " << json::Compact{getJson(true)};
1718 JLOG(j_.error()) << ss.str();
1719 CLOG(clog) << ss.str() << ". ";
1721 }
1722 // There is consensus, but we need to track if the network moved on
1723 // without us.
1724 if (result_->state == ConsensusState::MovedOn)
1725 {
1726 JLOG(j_.error()) << "Unable to reach consensus";
1727 JLOG(j_.error()) << json::Compact{getJson(true)};
1728 CLOG(clog) << "Unable to reach consensus " << json::Compact{getJson(true)} << ". ";
1729 }
1730
1731 CLOG(clog) << "Consensus has been reached. ";
1732 // NOLINTEND(bugprone-unchecked-optional-access)
1733 return true;
1734}
1735
1736template <class Adaptor>
1737void
1739{
1740 if (mode_.get() == ConsensusMode::Proposing)
1741 {
1742 if (result_ && !result_->position.isBowOut())
1743 {
1744 result_->position.bowOut(now_);
1745 adaptor_.propose(result_->position);
1746 }
1747
1749 JLOG(j_.info()) << "Bowing out of consensus";
1750 CLOG(clog) << "Bowing out of consensus. ";
1751 }
1752}
1753
1754template <class Adaptor>
1755void
1757{
1758 // Cannot create disputes without our stance
1759 XRPL_ASSERT(result_, "xrpl::Consensus::createDisputes : result is set");
1760 // NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
1761
1762 // Only create disputes if this is a new set
1763 auto const emplaced = result_->compares.emplace(o.id()).second;
1764 CLOG(clog) << "createDisputes: new set? " << !emplaced << ". ";
1765 if (!emplaced)
1766 return;
1767
1768 // Nothing to dispute if we agree
1769 if (result_->txns.id() == o.id())
1770 {
1771 CLOG(clog) << "both sets are identical. ";
1772 return;
1773 }
1774
1775 CLOG(clog) << "comparing existing with new set: " << result_->txns.id() << ',' << o.id()
1776 << ". ";
1777 JLOG(j_.debug()) << "createDisputes " << result_->txns.id() << " to " << o.id();
1778
1779 auto differences = result_->txns.compare(o);
1780
1781 int dc = 0;
1782
1783 for (auto const& [txId, inThisSet] : differences)
1784 {
1785 ++dc;
1786 // create disputed transactions (from the ledger that has them)
1787 XRPL_ASSERT(
1788 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1789 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1790 "xrpl::Consensus::createDisputes : has disputed transactions");
1791
1792 Tx_t const tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1793 auto txID = tx.id();
1794
1795 if (result_->disputes.find(txID) != result_->disputes.end())
1796 continue;
1797
1798 JLOG(j_.debug()) << "Transaction " << txID << " is disputed";
1799
1800 typename Result::Dispute_t dtx{
1801 tx,
1802 result_->txns.exists(txID),
1804 j_};
1805
1806 // Update all of the available peer's votes on the disputed transaction
1807 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1808 {
1809 Proposal_t const& peerProp = peerPos.proposal();
1810 auto const cit = acquired_.find(peerProp.position());
1811 if (cit != acquired_.end() && dtx.setVote(nodeId, cit->second.exists(txID)))
1813 }
1814 adaptor_.share(dtx.tx());
1815
1816 result_->disputes.emplace(txID, std::move(dtx));
1817 }
1818 JLOG(j_.debug()) << dc << " differences found";
1819 CLOG(clog) << "disputes: " << dc << ". ";
1820 // NOLINTEND(bugprone-unchecked-optional-access)
1821}
1822
1823template <class Adaptor>
1824void
1826{
1827 // Cannot updateDisputes without our stance
1828 XRPL_ASSERT(result_, "xrpl::Consensus::updateDisputes : result is set");
1829 // NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
1830
1831 // Ensure we have created disputes against this set if we haven't seen
1832 // it before
1833 if (result_->compares.find(other.id()) == result_->compares.end())
1834 createDisputes(other);
1835
1836 for (auto& it : result_->disputes)
1837 {
1838 auto& d = it.second;
1839 if (d.setVote(node, other.exists(d.tx().id())))
1841 }
1842 // NOLINTEND(bugprone-unchecked-optional-access)
1843}
1844
1845template <class Adaptor>
1851
1852} // namespace xrpl
T all_of(T... args)
Abstract interface to a clock.
A generic endpoint for log messages.
Definition Journal.h:38
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
json::Int Int
Definition json_value.h:138
Value & append(Value const &value)
Append value to array at the end.
Represents a proposed position taken during a round of consensus.
Position const & position() const
Get the proposed position.
NodeId const & nodeID() const
Identifying which peer took this position.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Measures the duration of phases of consensus.
ConsensusMode get() const
Definition Consensus.h:299
MonitoredMode(ConsensusMode m)
Definition Consensus.h:295
void set(ConsensusMode mode, Adaptor &a)
Definition Consensus.h:305
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
Definition Consensus.h:725
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
Definition Consensus.h:1825
TxSet_t::Tx Tx_t
Definition Consensus.h:282
bool haveCloseTimeConsensus_
Definition Consensus.h:539
ConsensusProposal< NodeID_t, typename Ledger_t::ID, typename TxSet_t::ID > Proposal_t
Definition Consensus.h:284
MonitoredMode mode_
Definition Consensus.h:537
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
Definition Consensus.h:594
NetClock::duration closeResolution_
Definition Consensus.h:550
ConsensusTimer openTime_
Definition Consensus.h:548
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
Definition Consensus.h:909
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
Definition Consensus.h:864
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
Definition Consensus.h:1199
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
Definition Consensus.h:1323
hash_set< NodeID_t > deadNodes_
Definition Consensus.h:600
Adaptor::Ledger_t Ledger_t
Definition Consensus.h:279
std::size_t peerUnchangedCounter_
Definition Consensus.h:581
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1738
beast::AbstractClock< std::chrono::steady_clock > clock_type
Clock type for measuring time within the consensus code.
Definition Consensus.h:314
NetClock::time_point prevCloseTime_
Definition Consensus.h:563
json::Value getJson(bool full) const
Get the Json state of the consensus process.
Definition Consensus.h:930
ConsensusResult< Adaptor > Result
Definition Consensus.h:286
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
Definition Consensus.h:590
std::chrono::milliseconds prevRoundTime_
Definition Consensus.h:555
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
Definition Consensus.h:1081
Consensus(Consensus &&) noexcept=default
void startRound(NetClock::time_point const &now, Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Definition Consensus.h:615
NetClock::time_point asCloseTime(NetClock::time_point raw) const
Definition Consensus.h:1847
ConsensusParms::AvalancheState closeTimeAvalancheState_
Definition Consensus.h:552
ConsensusPhase phase() const
Definition Consensus.h:412
ConsensusPhase phase_
Definition Consensus.h:536
Adaptor::PeerPosition_t PeerPosition_t
Definition Consensus.h:283
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
Definition Consensus.h:1110
void handleWrongLedger(Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1027
Ledger_t previousLedger_
Definition Consensus.h:571
beast::Journal const j_
Definition Consensus.h:603
Ledger_t::ID prevLedgerID_
Definition Consensus.h:569
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
Definition Consensus.h:574
void startRoundInternal(NetClock::time_point const &now, Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:660
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
Definition Consensus.h:1127
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1389
std::size_t prevProposers_
Definition Consensus.h:597
Adaptor & adaptor_
Definition Consensus.h:534
std::size_t establishCounter_
Definition Consensus.h:584
NetClock::time_point now_
Definition Consensus.h:562
clock_type const & clock_
Definition Consensus.h:541
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
Definition Consensus.h:1756
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
Definition Consensus.h:706
Adaptor::TxSet_t TxSet_t
Definition Consensus.h:280
std::optional< Result > result_
Definition Consensus.h:576
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
Definition Consensus.h:827
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1627
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1445
ConsensusCloseTimes rawCloseTimes_
Definition Consensus.h:577
Adaptor::NodeID_t NodeID_t
Definition Consensus.h:281
bool setVote(NodeId const &peer, bool votesYes)
Change a peer's vote.
Definition DisputedTx.h:197
Tx const & tx() const
The disputed transaction.
Definition DisputedTx.h:131
std::chrono::time_point< NetClock > time_point
Definition chrono.h:46
std::chrono::duration< rep, period > duration
Definition chrono.h:45
T duration_cast(T... args)
T emplace(T... args)
T max(T... args)
T min(T... args)
@ Array
array value (ordered list)
Definition json_value.h:25
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
ConsensusMode
Represents how a node currently participates in Consensus.
@ WrongLedger
We have the wrong ledger and are attempting to acquire it.
@ Proposing
We are normal participant in consensus and propose our position.
@ Observing
We are observing peer positions, but not proposing our position.
std::unordered_set< Value, Hash, Pred, Allocator > hash_set
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
ConsensusPhase
Phases of consensus for a single ledger round.
@ Establish
Establishing consensus by exchanging proposals with our peers.
@ Open
We haven't closed our ledger yet, but others might have.
json::Value getJson(LedgerFill const &fill)
Return a new json::Value representing the ledger with given options.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
Definition Consensus.h:1436
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
constexpr auto kLedgerDefaultTimeResolution
Initial resolution of ledger close time.
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Definition Consensus.cpp:18
constexpr bool any(HashRouterFlags flags)
Definition HashRouter.h:63
T str(T... args)
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCtConsensusPct
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::milliseconds const ledgerMinConsensus
The number of seconds we wait minimum to ensure participation.
std::size_t const avMinRounds
Number of rounds before certain actions can happen.
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
std::chrono::milliseconds const avMinConsensusTime
The minimum amount of time to consider the previous round to have taken.
Encapsulates the result of consensus.
DisputedTx< Tx_t, NodeID_t > Dispute_t
T time_since_epoch(T... args)
T to_string(T... args)
T value_or(T... args)