rippled
Loading...
Searching...
No Matches
Consensus.h
1#pragma once
2
3#include <xrpld/consensus/ConsensusParms.h>
4#include <xrpld/consensus/ConsensusProposal.h>
5#include <xrpld/consensus/ConsensusTypes.h>
6#include <xrpld/consensus/DisputedTx.h>
7#include <xrpld/consensus/LedgerTiming.h>
8
9#include <xrpl/basics/Log.h>
10#include <xrpl/basics/chrono.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/json/json_writer.h>
13
14#include <algorithm>
15#include <chrono>
16#include <deque>
17#include <optional>
18#include <sstream>
19
20namespace xrpl {
21
41bool
43 bool anyTransactions,
44 std::size_t prevProposers,
45 std::size_t proposersClosed,
46 std::size_t proposersValidated,
47 std::chrono::milliseconds prevRoundTime,
48 std::chrono::milliseconds timeSincePrevClose,
50 std::chrono::milliseconds idleInterval,
51 ConsensusParms const& parms,
53 std::unique_ptr<std::stringstream> const& clog = {});
54
76 std::size_t prevProposers,
77 std::size_t currentProposers,
78 std::size_t currentAgree,
79 std::size_t currentFinished,
80 std::chrono::milliseconds previousAgreeTime,
81 std::chrono::milliseconds currentAgreeTime,
82 bool stalled,
83 ConsensusParms const& parms,
84 bool proposing,
86 std::unique_ptr<std::stringstream> const& clog = {});
87
276template <class Adaptor>
278{
279 using Ledger_t = typename Adaptor::Ledger_t;
280 using TxSet_t = typename Adaptor::TxSet_t;
281 using NodeID_t = typename Adaptor::NodeID_t;
282 using Tx_t = typename TxSet_t::Tx;
283 using PeerPosition_t = typename Adaptor::PeerPosition_t;
285
287
288 // Helper class to ensure adaptor is notified whenever the ConsensusMode
289 // changes
291 {
293
294 public:
296 {
297 }
299 get() const
300 {
301 return mode_;
302 }
303
304 void
305 set(ConsensusMode mode, Adaptor& a)
306 {
307 a.onModeChange(mode_, mode);
308 mode_ = mode;
309 }
310 };
311
312public:
315
316 Consensus(Consensus&&) noexcept = default;
317
324 Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
325
341 void
343 NetClock::time_point const& now,
344 typename Ledger_t::ID const& prevLedgerID,
345 Ledger_t prevLedger,
346 hash_set<NodeID_t> const& nowUntrusted,
347 bool proposing,
348 std::unique_ptr<std::stringstream> const& clog = {});
349
356 bool
357 peerProposal(NetClock::time_point const& now, PeerPosition_t const& newProposal);
358
364 void
366
372 void
373 gotTxSet(NetClock::time_point const& now, TxSet_t const& txSet);
374
391 void
393
401 typename Ledger_t::ID
403 {
404 return prevLedgerID_;
405 }
406
408 phase() const
409 {
410 return phase_;
411 }
412
421 getJson(bool full) const;
422
423private:
424 void
426 NetClock::time_point const& now,
427 typename Ledger_t::ID const& prevLedgerID,
428 Ledger_t const& prevLedger,
429 ConsensusMode mode,
431
432 // Change our view of the previous ledger
433 void
434 handleWrongLedger(typename Ledger_t::ID const& lgrId, std::unique_ptr<std::stringstream> const& clog);
435
441 void
443
447 void
449
452 bool
454
461 void
463
472 void
474
497 bool
499
500 // Close the open ledger and establish initial position.
501 void
503
504 // Adjust our positions to try to agree with other validators.
505 void
507
508 bool
510
511 // Create disputes between our position and the provided one.
512 void
514
515 // Update our disputes given that this node has adopted a new position.
516 // Will call createDisputes as needed.
517 void
518 updateDisputes(NodeID_t const& node, TxSet_t const& other);
519
520 // Revoke our outstanding proposal, if any, and cease proposing
521 // until this round ends.
522 void
524
525 // The rounded or effective close time estimate from a proposer
528
529private:
530 Adaptor& adaptor_;
531
534 bool firstRound_ = true;
536
538
539 // How long the consensus convergence has taken, expressed as
540 // a percentage of the time that we expected it to take.
542
543 // How long has this round been open
545
547
549
550 // Time it took for the last consensus round to converge
552
553 //-------------------------------------------------------------------------
554 // Network time measurements of consensus progress
555
556 // The current network adjusted time. This is the network time the
557 // ledger would close if it closed now
560
561 //-------------------------------------------------------------------------
562 // Non-peer (self) consensus data
563
564 // Last validated ledger ID provided to consensus
565 typename Ledger_t::ID prevLedgerID_;
566 // Last validated ledger seen by consensus
568
569 // Transaction Sets, indexed by hash of transaction tree
571
574
575 // The number of calls to phaseEstablish where none of our peers
576 // have changed any votes on disputed transactions.
578
579 // The total number of times we have called phaseEstablish
581
582 //-------------------------------------------------------------------------
583 // Peer related consensus data
584
585 // Peer proposed positions for the current round
587
588 // Recently received peer positions, available when transitioning between
589 // ledgers or rounds
591
592 // The number of proposers who participated in the last consensus round
594
595 // nodes that have bowed out of this consensus process
597
598 // Journal for debugging
600};
601
602template <class Adaptor>
603Consensus<Adaptor>::Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal journal)
604 : adaptor_(adaptor), clock_(clock), j_{journal}
605{
606 JLOG(j_.debug()) << "Creating consensus object";
607}
608
609template <class Adaptor>
610void
612 NetClock::time_point const& now,
613 typename Ledger_t::ID const& prevLedgerID,
614 Ledger_t prevLedger,
615 hash_set<NodeID_t> const& nowUntrusted,
616 bool proposing,
618{
619 if (firstRound_)
620 {
621 // take our initial view of closeTime_ from the seed ledger
622 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
623 prevCloseTime_ = prevLedger.closeTime();
624 firstRound_ = false;
625 }
626 else
627 {
628 prevCloseTime_ = rawCloseTimes_.self;
629 }
630
631 for (NodeID_t const& n : nowUntrusted)
632 recentPeerPositions_.erase(n);
633
635
636 // We were handed the wrong ledger
637 if (prevLedger.id() != prevLedgerID)
638 {
639 // try to acquire the correct one
640 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID))
641 {
642 prevLedger = *newLedger;
643 }
644 else // Unable to acquire the correct ledger
645 {
646 startMode = ConsensusMode::wrongLedger;
647 JLOG(j_.info()) << "Entering consensus with: " << previousLedger_.id();
648 JLOG(j_.info()) << "Correct LCL is: " << prevLedgerID;
649 }
650 }
651
652 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
653}
654template <class Adaptor>
655void
657 NetClock::time_point const& now,
658 typename Ledger_t::ID const& prevLedgerID,
659 Ledger_t const& prevLedger,
660 ConsensusMode mode,
662{
663 phase_ = ConsensusPhase::open;
664 JLOG(j_.debug()) << "transitioned to ConsensusPhase::open ";
665 CLOG(clog) << "startRoundInternal transitioned to ConsensusPhase::open, "
666 "previous ledgerID: "
667 << prevLedgerID << ", seq: " << prevLedger.seq() << ". ";
668 mode_.set(mode, adaptor_);
669 now_ = now;
670 prevLedgerID_ = prevLedgerID;
671 previousLedger_ = prevLedger;
672 result_.reset();
673 convergePercent_ = 0;
674 closeTimeAvalancheState_ = ConsensusParms::init;
675 haveCloseTimeConsensus_ = false;
676 openTime_.reset(clock_.now());
677 currPeerPositions_.clear();
678 acquired_.clear();
679 rawCloseTimes_.peers.clear();
680 rawCloseTimes_.self = {};
681 deadNodes_.clear();
682
683 closeResolution_ = getNextLedgerTimeResolution(
684 previousLedger_.closeTimeResolution(),
685 previousLedger_.closeAgree(),
686 previousLedger_.seq() + typename Ledger_t::Seq{1});
687
688 playbackProposals();
689 CLOG(clog) << "number of peer proposals,previous proposers: " << currPeerPositions_.size() << ',' << prevProposers_
690 << ". ";
691 if (currPeerPositions_.size() > (prevProposers_ / 2))
692 {
693 // We may be falling behind, don't wait for the timer
694 // consider closing the ledger immediately
695 CLOG(clog) << "consider closing the ledger immediately. ";
696 timerEntry(now_, clog);
697 }
698}
699
700template <class Adaptor>
701bool
703{
704 JLOG(j_.debug()) << "PROPOSAL " << newPeerPos.render();
705 auto const& peerID = newPeerPos.proposal().nodeID();
706
707 // Always need to store recent positions
708 {
709 auto& props = recentPeerPositions_[peerID];
710
711 if (props.size() >= 10)
712 props.pop_front();
713
714 props.push_back(newPeerPos);
715 }
716 return peerProposalInternal(now, newPeerPos);
717}
718
719template <class Adaptor>
720bool
722{
723 // Nothing to do for now if we are currently working on a ledger
724 if (phase_ == ConsensusPhase::accepted)
725 return false;
726
727 now_ = now;
728
729 auto const& newPeerProp = newPeerPos.proposal();
730
731 if (newPeerProp.prevLedger() != prevLedgerID_)
732 {
733 JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger() << " but we are on " << prevLedgerID_;
734 return false;
735 }
736
737 auto const& peerID = newPeerProp.nodeID();
738
739 if (deadNodes_.find(peerID) != deadNodes_.end())
740 {
741 JLOG(j_.info()) << "Position from dead node: " << peerID;
742 return false;
743 }
744
745 {
746 // update current position
747 auto peerPosIt = currPeerPositions_.find(peerID);
748
749 if (peerPosIt != currPeerPositions_.end())
750 {
751 if (newPeerProp.proposeSeq() <= peerPosIt->second.proposal().proposeSeq())
752 {
753 return false;
754 }
755 }
756
757 if (newPeerProp.isBowOut())
758 {
759 JLOG(j_.info()) << "Peer " << peerID << " bows out";
760 if (result_)
761 {
762 for (auto& it : result_->disputes)
763 it.second.unVote(peerID);
764 }
765 if (peerPosIt != currPeerPositions_.end())
766 currPeerPositions_.erase(peerID);
767 deadNodes_.insert(peerID);
768
769 return true;
770 }
771
772 if (peerPosIt != currPeerPositions_.end())
773 peerPosIt->second = newPeerPos;
774 else
775 currPeerPositions_.emplace(peerID, newPeerPos);
776 }
777
778 if (newPeerProp.isInitial())
779 {
780 // Record the close time estimate
781 JLOG(j_.trace()) << "Peer reports close time as " << newPeerProp.closeTime().time_since_epoch().count();
782 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
783 }
784
785 JLOG(j_.trace()) << "Processing peer proposal " << newPeerProp.proposeSeq() << "/" << newPeerProp.position();
786
787 {
788 auto const ait = acquired_.find(newPeerProp.position());
789 if (ait == acquired_.end())
790 {
791 // acquireTxSet will return the set if it is available, or
792 // spawn a request for it and return nullopt/nullptr. It will call
793 // gotTxSet once it arrives
794 if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
795 gotTxSet(now_, *set);
796 else
797 JLOG(j_.debug()) << "Don't have tx set for peer";
798 }
799 else if (result_)
800 {
801 updateDisputes(newPeerProp.nodeID(), ait->second);
802 }
803 }
804
805 return true;
806}
807
808template <class Adaptor>
809void
811{
812 CLOG(clog) << "Consensus<Adaptor>::timerEntry. ";
813 // Nothing to do if we are currently working on a ledger
814 if (phase_ == ConsensusPhase::accepted)
815 {
816 CLOG(clog) << "Nothing to do during accepted phase. ";
817 return;
818 }
819
820 now_ = now;
821 CLOG(clog) << "Set network adjusted time to " << to_string(now) << ". ";
822
823 // Check we are on the proper ledger (this may change phase_)
824 auto const phaseOrig = phase_;
825 CLOG(clog) << "Phase " << to_string(phaseOrig) << ". ";
826 checkLedger(clog);
827 if (phaseOrig != phase_)
828 {
829 CLOG(clog) << "Changed phase to << " << to_string(phase_) << ". ";
830 }
831
832 if (phase_ == ConsensusPhase::open)
833 phaseOpen(clog);
834 else if (phase_ == ConsensusPhase::establish)
835 phaseEstablish(clog);
836 CLOG(clog) << "timerEntry finishing in phase " << to_string(phase_) << ". ";
837}
838
839template <class Adaptor>
840void
842{
843 // Nothing to do if we've finished work on a ledger
844 if (phase_ == ConsensusPhase::accepted)
845 return;
846
847 now_ = now;
848
849 auto id = txSet.id();
850
851 // If we've already processed this transaction set since requesting
852 // it from the network, there is nothing to do now
853 if (!acquired_.emplace(id, txSet).second)
854 return;
855
856 if (!result_)
857 {
858 JLOG(j_.debug()) << "Not creating disputes: no position yet.";
859 }
860 else
861 {
862 // Our position is added to acquired_ as soon as we create it,
863 // so this txSet must differ
864 XRPL_ASSERT(id != result_->position.position(), "xrpl::Consensus::gotTxSet : updated transaction set");
865 bool any = false;
866 for (auto const& [nodeId, peerPos] : currPeerPositions_)
867 {
868 if (peerPos.proposal().position() == id)
869 {
870 updateDisputes(nodeId, txSet);
871 any = true;
872 }
873 }
874
875 if (!any)
876 {
877 JLOG(j_.warn()) << "By the time we got " << id << " no peers were proposing it";
878 }
879 }
880}
881
882template <class Adaptor>
883void
885{
886 using namespace std::chrono_literals;
887 JLOG(j_.info()) << "Simulating consensus";
888 now_ = now;
889 closeLedger({});
890 result_->roundTime.tick(consensusDelay.value_or(100ms));
891 result_->proposers = prevProposers_ = currPeerPositions_.size();
892 prevRoundTime_ = result_->roundTime.read();
894 adaptor_.onForceAccept(*result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_.get(), getJson(true));
895 JLOG(j_.info()) << "Simulation complete";
896}
897
898template <class Adaptor>
901{
902 using std::to_string;
903 using Int = Json::Value::Int;
904
906
907 ret["proposing"] = (mode_.get() == ConsensusMode::proposing);
908 ret["proposers"] = static_cast<int>(currPeerPositions_.size());
909
910 if (mode_.get() != ConsensusMode::wrongLedger)
911 {
912 ret["synched"] = true;
913 ret["ledger_seq"] = static_cast<std::uint32_t>(previousLedger_.seq()) + 1;
914 ret["close_granularity"] = static_cast<Int>(closeResolution_.count());
915 }
916 else
917 ret["synched"] = false;
918
919 ret["phase"] = to_string(phase_);
920
921 if (result_ && !result_->disputes.empty() && !full)
922 ret["disputes"] = static_cast<Int>(result_->disputes.size());
923
924 if (result_)
925 ret["our_position"] = result_->position.getJson();
926
927 if (full)
928 {
929 if (result_)
930 ret["current_ms"] = static_cast<Int>(result_->roundTime.read().count());
931 ret["converge_percent"] = convergePercent_;
932 ret["close_resolution"] = static_cast<Int>(closeResolution_.count());
933 ret["have_time_consensus"] = haveCloseTimeConsensus_;
934 ret["previous_proposers"] = static_cast<Int>(prevProposers_);
935 ret["previous_mseconds"] = static_cast<Int>(prevRoundTime_.count());
936
937 if (!currPeerPositions_.empty())
938 {
940
941 for (auto const& [nodeId, peerPos] : currPeerPositions_)
942 {
943 ppj[to_string(nodeId)] = peerPos.getJson();
944 }
945 ret["peer_positions"] = std::move(ppj);
946 }
947
948 if (!acquired_.empty())
949 {
951 for (auto const& at : acquired_)
952 {
953 acq.append(to_string(at.first));
954 }
955 ret["acquired"] = std::move(acq);
956 }
957
958 if (result_ && !result_->disputes.empty())
959 {
961 for (auto const& [txId, dispute] : result_->disputes)
962 {
963 dsj[to_string(txId)] = dispute.getJson();
964 }
965 ret["disputes"] = std::move(dsj);
966 }
967
968 if (!rawCloseTimes_.peers.empty())
969 {
971 for (auto const& ct : rawCloseTimes_.peers)
972 {
973 ctj[std::to_string(ct.first.time_since_epoch().count())] = ct.second;
974 }
975 ret["close_times"] = std::move(ctj);
976 }
977
978 if (!deadNodes_.empty())
979 {
981 for (auto const& dn : deadNodes_)
982 {
983 dnj.append(to_string(dn));
984 }
985 ret["dead_nodes"] = std::move(dnj);
986 }
987 }
988
989 return ret;
990}
991
992// Handle a change in the prior ledger during a consensus round
993template <class Adaptor>
994void
996 typename Ledger_t::ID const& lgrId,
998{
999 CLOG(clog) << "handleWrongLedger. ";
1000 XRPL_ASSERT(
1001 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1002 "xrpl::Consensus::handleWrongLedger : have wrong ledger");
1003
1004 // Stop proposing because we are out of sync
1005 leaveConsensus(clog);
1006
1007 // First time switching to this ledger
1008 if (prevLedgerID_ != lgrId)
1009 {
1010 prevLedgerID_ = lgrId;
1011
1012 // Clear out state
1013 if (result_)
1014 {
1015 result_->disputes.clear();
1016 result_->compares.clear();
1017 }
1018
1019 currPeerPositions_.clear();
1020 rawCloseTimes_.peers.clear();
1021 deadNodes_.clear();
1022
1023 // Get back in sync, this will also recreate disputes
1024 playbackProposals();
1025 }
1026
1027 if (previousLedger_.id() == prevLedgerID_)
1028 {
1029 CLOG(clog) << "previousLedger_.id() == prevLeverID_ " << prevLedgerID_ << ". ";
1030 return;
1031 }
1032
1033 // we need to switch the ledger we're working from
1034 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1035 {
1036 JLOG(j_.info()) << "Have the consensus ledger " << prevLedgerID_;
1037 CLOG(clog) << "Have the consensus ledger " << prevLedgerID_ << ". ";
1038 startRoundInternal(now_, lgrId, *newLedger, ConsensusMode::switchedLedger, clog);
1039 }
1040 else
1041 {
1042 CLOG(clog) << "Still on wrong ledger. ";
1043 mode_.set(ConsensusMode::wrongLedger, adaptor_);
1044 }
1045}
1046
1047template <class Adaptor>
1048void
1050{
1051 CLOG(clog) << "checkLedger. ";
1052
1053 auto netLgr = adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1054 CLOG(clog) << "network ledgerid " << netLgr << ", "
1055 << "previous ledger " << prevLedgerID_ << ". ";
1056
1057 if (netLgr != prevLedgerID_)
1058 {
1060 ss << "View of consensus changed during " << to_string(phase_) << " mode=" << to_string(mode_.get()) << ", "
1061 << prevLedgerID_ << " to " << netLgr << ", " << Json::Compact{previousLedger_.getJson()} << ". ";
1062 JLOG(j_.warn()) << ss.str();
1063 CLOG(clog) << ss.str();
1064 CLOG(clog) << "State on consensus change " << Json::Compact{getJson(true)} << ". ";
1065 handleWrongLedger(netLgr, clog);
1066 }
1067 else if (previousLedger_.id() != prevLedgerID_)
1068 {
1069 CLOG(clog) << "previousLedger_.id() != prevLedgerID_: " << previousLedger_.id() << ','
1070 << to_string(prevLedgerID_) << ". ";
1071 handleWrongLedger(netLgr, clog);
1072 }
1073}
1074
1075template <class Adaptor>
1076void
1078{
1079 for (auto const& it : recentPeerPositions_)
1080 {
1081 for (auto const& pos : it.second)
1082 {
1083 if (pos.proposal().prevLedger() == prevLedgerID_)
1084 {
1085 if (peerProposalInternal(now_, pos))
1086 adaptor_.share(pos);
1087 }
1088 }
1089 }
1090}
1091
1092template <class Adaptor>
1093void
1095{
1096 CLOG(clog) << "phaseOpen. ";
1097 using namespace std::chrono;
1098
1099 // it is shortly before ledger close time
1100 bool anyTransactions = adaptor_.hasOpenTransactions();
1101 auto proposersClosed = currPeerPositions_.size();
1102 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1103
1104 openTime_.tick(clock_.now());
1105
1106 // This computes how long since last ledger's close time
1107 milliseconds sinceClose;
1108 {
1109 auto const mode = mode_.get();
1110 bool const closeAgree = previousLedger_.closeAgree();
1111 auto const prevCloseTime = previousLedger_.closeTime();
1112 auto const prevParentCloseTimePlus1 = previousLedger_.parentCloseTime() + 1s;
1113 bool const previousCloseCorrect =
1114 (mode != ConsensusMode::wrongLedger) && closeAgree && (prevCloseTime != prevParentCloseTimePlus1);
1115
1116 auto const lastCloseTime = previousCloseCorrect ? prevCloseTime // use consensus timing
1117 : prevCloseTime_; // use the time we saw internally
1118
1119 if (now_ >= lastCloseTime)
1120 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1121 else
1122 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1123 CLOG(clog) << "calculating how long since last ledger's close time "
1124 "based on mode : "
1125 << to_string(mode) << ", previous closeAgree: " << closeAgree
1126 << ", previous close time: " << to_string(prevCloseTime)
1127 << ", previous parent close time + 1s: " << to_string(prevParentCloseTimePlus1)
1128 << ", previous close time seen internally: " << to_string(prevCloseTime_)
1129 << ", last close time: " << to_string(lastCloseTime) << ", since close: " << sinceClose.count()
1130 << ". ";
1131 }
1132
1133 auto const idleInterval =
1134 std::max<milliseconds>(adaptor_.parms().ledgerIDLE_INTERVAL, 2 * previousLedger_.closeTimeResolution());
1135 CLOG(clog) << "idle interval set to " << idleInterval.count() << "ms based on "
1136 << "ledgerIDLE_INTERVAL: " << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1137 << ", previous ledger close time resolution: " << previousLedger_.closeTimeResolution().count()
1138 << "ms. ";
1139
1140 // Decide if we should close the ledger
1142 anyTransactions,
1143 prevProposers_,
1144 proposersClosed,
1145 proposersValidated,
1146 prevRoundTime_,
1147 sinceClose,
1148 openTime_.read(),
1149 idleInterval,
1150 adaptor_.parms(),
1151 j_,
1152 clog))
1153 {
1154 CLOG(clog) << "closing ledger. ";
1155 closeLedger(clog);
1156 }
1157}
1158
1159template <class Adaptor>
1160bool
1162{
1163 CLOG(clog) << "shouldPause? ";
1164 auto const& parms = adaptor_.parms();
1165 std::uint32_t const ahead(previousLedger_.seq() - std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1166 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1167 std::size_t const totalValidators = trustedKeys.size();
1168 std::size_t laggards = adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1169 std::size_t const offline = trustedKeys.size();
1170
1171 std::stringstream vars;
1172 vars << " consensuslog (working seq: " << previousLedger_.seq() << ", "
1173 << "validated seq: " << adaptor_.getValidLedgerIndex() << ", "
1174 << "am validator: " << adaptor_.validator() << ", "
1175 << "have validated: " << adaptor_.haveValidated() << ", "
1176 << "roundTime: " << result_->roundTime.read().count() << ", "
1177 << "max consensus time: " << parms.ledgerMAX_CONSENSUS.count() << ", "
1178 << "validators: " << totalValidators << ", "
1179 << "laggards: " << laggards << ", "
1180 << "offline: " << offline << ", "
1181 << "quorum: " << quorum << ")";
1182
1183 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() || !adaptor_.haveValidated() ||
1184 result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1185 {
1186 j_.debug() << "not pausing (early)" << vars.str();
1187 CLOG(clog) << "Not pausing (early). ";
1188 return false;
1189 }
1190
1191 bool willPause = false;
1192
1206 constexpr static std::size_t maxPausePhase = 4;
1207
1227 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1228
1229 // validators that remain after the laggards() function are considered
1230 // offline, and should be considered as laggards for purposes of
1231 // evaluating whether the threshold for non-laggards has been reached.
1232 switch (phase)
1233 {
1234 case 0:
1235 // Laggards and offline shouldn't preclude consensus.
1236 if (laggards + offline > totalValidators - quorum)
1237 willPause = true;
1238 break;
1239 case maxPausePhase:
1240 // No tolerance.
1241 willPause = true;
1242 break;
1243 default:
1244 // Ensure that sufficient validators are known to be not lagging.
1245 // Their sufficiently most recent validation sequence was equal to
1246 // or greater than our own.
1247 //
1248 // The threshold is the amount required for quorum plus
1249 // the proportion of the remainder based on number of intermediate
1250 // phases between 0 and max.
1251 float const nonLaggards = totalValidators - (laggards + offline);
1252 float const quorumRatio = static_cast<float>(quorum) / totalValidators;
1253 float const allowedDissent = 1.0f - quorumRatio;
1254 float const phaseFactor = static_cast<float>(phase) / maxPausePhase;
1255
1256 if (nonLaggards / totalValidators < quorumRatio + (allowedDissent * phaseFactor))
1257 {
1258 willPause = true;
1259 }
1260 }
1261
1262 if (willPause)
1263 {
1264 j_.warn() << "pausing" << vars.str();
1265 CLOG(clog) << "pausing " << vars.str() << ". ";
1266 }
1267 else
1268 {
1269 j_.debug() << "not pausing" << vars.str();
1270 CLOG(clog) << "not pausing. ";
1271 }
1272 return willPause;
1273}
1274
1275template <class Adaptor>
1276void
1278{
1279 CLOG(clog) << "phaseEstablish. ";
1280 // can only establish consensus if we already took a stance
1281 XRPL_ASSERT(result_, "xrpl::Consensus::phaseEstablish : result is set");
1282
1283 ++peerUnchangedCounter_;
1284 ++establishCounter_;
1285
1286 using namespace std::chrono;
1287 ConsensusParms const& parms = adaptor_.parms();
1288
1289 result_->roundTime.tick(clock_.now());
1290 result_->proposers = currPeerPositions_.size();
1291
1292 convergePercent_ =
1293 result_->roundTime.read() * 100 / std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
1294 CLOG(clog) << "convergePercent_ " << convergePercent_
1295 << " is based on round duration so far: " << result_->roundTime.read().count() << "ms, "
1296 << "previous round duration: " << prevRoundTime_.count() << "ms, "
1297 << "avMIN_CONSENSUS_TIME: " << parms.avMIN_CONSENSUS_TIME.count() << "ms. ";
1298
1299 // Give everyone a chance to take an initial position
1300 if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
1301 {
1302 CLOG(clog) << "ledgerMIN_CONSENSUS not reached: " << parms.ledgerMIN_CONSENSUS.count() << "ms. ";
1303 return;
1304 }
1305
1306 updateOurPositions(clog);
1307
1308 // Nothing to do if too many laggards or we don't have consensus.
1309 if (shouldPause(clog) || !haveConsensus(clog))
1310 return;
1311
1312 if (!haveCloseTimeConsensus_)
1313 {
1314 JLOG(j_.info()) << "We have TX consensus but not CT consensus";
1315 CLOG(clog) << "We have TX consensus but not CT consensus. ";
1316 return;
1317 }
1318
1319 JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size() << " participants)";
1320 CLOG(clog) << "Converge cutoff (" << currPeerPositions_.size()
1321 << " participants). Transitioned to ConsensusPhase::accepted. ";
1322 adaptor_.updateOperatingMode(currPeerPositions_.size());
1323 prevProposers_ = currPeerPositions_.size();
1324 prevRoundTime_ = result_->roundTime.read();
1325 phase_ = ConsensusPhase::accepted;
1326 JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
1327 adaptor_.onAccept(
1328 *result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_.get(), getJson(true), adaptor_.validating());
1329}
1330
1331template <class Adaptor>
1332void
1334{
1335 // We should not be closing if we already have a position
1336 XRPL_ASSERT(!result_, "xrpl::Consensus::closeLedger : result is not set");
1337
1339 JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
1340 rawCloseTimes_.self = now_;
1341 peerUnchangedCounter_ = 0;
1342 establishCounter_ = 0;
1343
1344 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1345 result_->roundTime.reset(clock_.now());
1346 // Share the newly created transaction set if we haven't already
1347 // received it from a peer
1348 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1349 adaptor_.share(result_->txns);
1350
1351 auto const mode = mode_.get();
1352 CLOG(clog) << "closeLedger transitioned to ConsensusPhase::establish, mode: " << to_string(mode)
1353 << ", number of peer positions: " << currPeerPositions_.size() << ". ";
1354 if (mode == ConsensusMode::proposing)
1355 adaptor_.propose(result_->position);
1356
1357 // Create disputes with any peer positions we have transactions for
1358 for (auto const& pit : currPeerPositions_)
1359 {
1360 auto const& pos = pit.second.proposal().position();
1361 auto const it = acquired_.find(pos);
1362 if (it != acquired_.end())
1363 createDisputes(it->second, clog);
1364 }
1365}
1366
1379inline int
1380participantsNeeded(int participants, int percent)
1381{
1382 int result = ((participants * percent) + (percent / 2)) / 100;
1383
1384 return (result == 0) ? 1 : result;
1385}
1386
1387template <class Adaptor>
1388void
1390{
1391 // We must have a position if we are updating it
1392 XRPL_ASSERT(result_, "xrpl::Consensus::updateOurPositions : result is set");
1393 ConsensusParms const& parms = adaptor_.parms();
1394
1395 // Compute a cutoff time
1396 auto const peerCutoff = now_ - parms.proposeFRESHNESS;
1397 auto const ourCutoff = now_ - parms.proposeINTERVAL;
1398 CLOG(clog) << "updateOurPositions. peerCutoff " << to_string(peerCutoff) << ", ourCutoff " << to_string(ourCutoff)
1399 << ". ";
1400
1401 // Verify freshness of peer positions and compute close times
1403 {
1404 auto it = currPeerPositions_.begin();
1405 while (it != currPeerPositions_.end())
1406 {
1407 Proposal_t const& peerProp = it->second.proposal();
1408 if (peerProp.isStale(peerCutoff))
1409 {
1410 // peer's proposal is stale, so remove it
1411 NodeID_t const& peerID = peerProp.nodeID();
1412 JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
1413 for (auto& dt : result_->disputes)
1414 dt.second.unVote(peerID);
1415 it = currPeerPositions_.erase(it);
1416 }
1417 else
1418 {
1419 // proposal is still fresh
1420 ++closeTimeVotes[asCloseTime(peerProp.closeTime())];
1421 ++it;
1422 }
1423 }
1424 }
1425
1426 // This will stay unseated unless there are any changes
1427 std::optional<TxSet_t> ourNewSet;
1428
1429 // Update votes on disputed transactions
1430 {
1432 for (auto& [txId, dispute] : result_->disputes)
1433 {
1434 // Because the threshold for inclusion increases,
1435 // time can change our position on a dispute
1436 if (dispute.updateVote(convergePercent_, mode_.get() == ConsensusMode::proposing, parms))
1437 {
1438 if (!mutableSet)
1439 mutableSet.emplace(result_->txns);
1440
1441 if (dispute.getOurVote())
1442 {
1443 // now a yes
1444 mutableSet->insert(dispute.tx());
1445 }
1446 else
1447 {
1448 // now a no
1449 mutableSet->erase(txId);
1450 }
1451 }
1452 }
1453
1454 if (mutableSet)
1455 ourNewSet.emplace(std::move(*mutableSet));
1456 }
1457
1458 NetClock::time_point consensusCloseTime = {};
1459 haveCloseTimeConsensus_ = false;
1460
1461 if (currPeerPositions_.empty())
1462 {
1463 // no other times
1464 haveCloseTimeConsensus_ = true;
1465 consensusCloseTime = asCloseTime(result_->position.closeTime());
1466 }
1467 else
1468 {
1469 // We don't track rounds for close time, so just pass 0s
1470 auto const [neededWeight, newState] = getNeededWeight(parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
1471 if (newState)
1472 closeTimeAvalancheState_ = *newState;
1473 CLOG(clog) << "neededWeight " << neededWeight << ". ";
1474
1475 int participants = currPeerPositions_.size();
1476 if (mode_.get() == ConsensusMode::proposing)
1477 {
1478 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1479 ++participants;
1480 }
1481
1482 // Threshold for non-zero vote
1483 int threshVote = participantsNeeded(participants, neededWeight);
1484
1485 // Threshold to declare consensus
1486 int const threshConsensus = participantsNeeded(participants, parms.avCT_CONSENSUS_PCT);
1487
1489 ss << "Proposers:" << currPeerPositions_.size() << " nw:" << neededWeight << " thrV:" << threshVote
1490 << " thrC:" << threshConsensus;
1491 JLOG(j_.info()) << ss.str();
1492 CLOG(clog) << ss.str();
1493
1494 for (auto const& [t, v] : closeTimeVotes)
1495 {
1496 JLOG(j_.debug()) << "CCTime: seq " << static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": "
1497 << t.time_since_epoch().count() << " has " << v << ", " << threshVote << " required";
1498
1499 if (v >= threshVote)
1500 {
1501 // A close time has enough votes for us to try to agree
1502 consensusCloseTime = t;
1503 threshVote = v;
1504
1505 if (threshVote >= threshConsensus)
1506 haveCloseTimeConsensus_ = true;
1507 }
1508 }
1509
1510 if (!haveCloseTimeConsensus_)
1511 {
1512 JLOG(j_.debug()) << "No CT consensus:"
1513 << " Proposers:" << currPeerPositions_.size() << " Mode:" << to_string(mode_.get())
1514 << " Thresh:" << threshConsensus
1515 << " Pos:" << consensusCloseTime.time_since_epoch().count();
1516 CLOG(clog) << "No close time consensus. ";
1517 }
1518 }
1519
1520 if (!ourNewSet &&
1521 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) || result_->position.isStale(ourCutoff)))
1522 {
1523 // close time changed or our position is stale
1524 ourNewSet.emplace(result_->txns);
1525 }
1526
1527 if (ourNewSet)
1528 {
1529 auto newID = ourNewSet->id();
1530
1531 result_->txns = std::move(*ourNewSet);
1532
1534 ss << "Position change: CTime " << consensusCloseTime.time_since_epoch().count() << ", tx " << newID;
1535 JLOG(j_.info()) << ss.str();
1536 CLOG(clog) << ss.str();
1537
1538 result_->position.changePosition(newID, consensusCloseTime, now_);
1539
1540 // Share our new transaction set and update disputes
1541 // if we haven't already received it
1542 if (acquired_.emplace(newID, result_->txns).second)
1543 {
1544 if (!result_->position.isBowOut())
1545 adaptor_.share(result_->txns);
1546
1547 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1548 {
1549 Proposal_t const& p = peerPos.proposal();
1550 if (p.position() == newID)
1551 updateDisputes(nodeId, result_->txns);
1552 }
1553 }
1554
1555 // Share our new position if we are still participating this round
1556 if (!result_->position.isBowOut() && (mode_.get() == ConsensusMode::proposing))
1557 adaptor_.propose(result_->position);
1558 }
1559}
1560
1561template <class Adaptor>
1562bool
1564{
1565 // Must have a stance if we are checking for consensus
1566 XRPL_ASSERT(result_, "xrpl::Consensus::haveConsensus : has result");
1567
1568 // CHECKME: should possibly count unacquired TX sets as disagreeing
1569 int agree = 0, disagree = 0;
1570
1571 auto ourPosition = result_->position.position();
1572
1573 // Count number of agreements/disagreements with our position
1574 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1575 {
1576 Proposal_t const& peerProp = peerPos.proposal();
1577 if (peerProp.position() == ourPosition)
1578 {
1579 ++agree;
1580 }
1581 else
1582 {
1583 JLOG(j_.debug()) << "Proposal disagreement: Peer " << nodeId << " has " << peerProp.position();
1584 ++disagree;
1585 }
1586 }
1587 auto currentFinished = adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1588
1589 JLOG(j_.debug()) << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree;
1590
1591 ConsensusParms const& parms = adaptor_.parms();
1592 // Stalling is BAD. It means that we have a consensus on the close time, so
1593 // peers are talking, but we have disputed transactions that peers are
1594 // unable or unwilling to come to agreement on one way or the other.
1595 bool const stalled = haveCloseTimeConsensus_ && !result_->disputes.empty() &&
1596 std::ranges::all_of(result_->disputes, [this, &parms, &clog](auto const& dispute) {
1597 return dispute.second.stalled(
1598 parms, mode_.get() == ConsensusMode::proposing, peerUnchangedCounter_, j_, clog);
1599 });
1600 if (stalled)
1601 {
1603 ss << "Consensus detects as stalled with " << (agree + disagree) << "/" << prevProposers_ << " proposers, and "
1604 << result_->disputes.size() << " stalled disputed transactions.";
1605 JLOG(j_.error()) << ss.str();
1606 CLOG(clog) << ss.str();
1607 }
1608
1609 // Determine if we actually have consensus or not
1610 result_->state = checkConsensus(
1611 prevProposers_,
1612 agree + disagree,
1613 agree,
1614 currentFinished,
1615 prevRoundTime_,
1616 result_->roundTime.read(),
1617 stalled,
1618 parms,
1619 mode_.get() == ConsensusMode::proposing,
1620 j_,
1621 clog);
1622
1623 if (result_->state == ConsensusState::No)
1624 {
1625 CLOG(clog) << "No consensus. ";
1626 return false;
1627 }
1628
1629 // Consensus has taken far too long. Drop out of the round.
1630 if (result_->state == ConsensusState::Expired)
1631 {
1632 static auto const minimumCounter = parms.avalancheCutoffs.size() * parms.avMIN_ROUNDS;
1634 if (establishCounter_ < minimumCounter)
1635 {
1636 // If each round of phaseEstablish takes a very long time, we may
1637 // "expire" before we've given consensus enough time at each
1638 // avalanche level to actually come to a consensus. In that case,
1639 // keep trying. This should only happen if there are an extremely
1640 // large number of disputes such that each round takes an inordinate
1641 // amount of time.
1642
1643 ss << "Consensus time has expired in round " << establishCounter_ << "; continue until round "
1644 << minimumCounter << ". " << Json::Compact{getJson(false)};
1645 JLOG(j_.error()) << ss.str();
1646 CLOG(clog) << ss.str() << ". ";
1647 return false;
1648 }
1649 ss << "Consensus expired. " << Json::Compact{getJson(true)};
1650 JLOG(j_.error()) << ss.str();
1651 CLOG(clog) << ss.str() << ". ";
1652 leaveConsensus(clog);
1653 }
1654 // There is consensus, but we need to track if the network moved on
1655 // without us.
1656 if (result_->state == ConsensusState::MovedOn)
1657 {
1658 JLOG(j_.error()) << "Unable to reach consensus";
1659 JLOG(j_.error()) << Json::Compact{getJson(true)};
1660 CLOG(clog) << "Unable to reach consensus " << Json::Compact{getJson(true)} << ". ";
1661 }
1662
1663 CLOG(clog) << "Consensus has been reached. ";
1664 return true;
1665}
1666
1667template <class Adaptor>
1668void
1670{
1671 if (mode_.get() == ConsensusMode::proposing)
1672 {
1673 if (result_ && !result_->position.isBowOut())
1674 {
1675 result_->position.bowOut(now_);
1676 adaptor_.propose(result_->position);
1677 }
1678
1679 mode_.set(ConsensusMode::observing, adaptor_);
1680 JLOG(j_.info()) << "Bowing out of consensus";
1681 CLOG(clog) << "Bowing out of consensus. ";
1682 }
1683}
1684
1685template <class Adaptor>
1686void
1688{
1689 // Cannot create disputes without our stance
1690 XRPL_ASSERT(result_, "xrpl::Consensus::createDisputes : result is set");
1691
1692 // Only create disputes if this is a new set
1693 auto const emplaced = result_->compares.emplace(o.id()).second;
1694 CLOG(clog) << "createDisputes: new set? " << !emplaced << ". ";
1695 if (!emplaced)
1696 return;
1697
1698 // Nothing to dispute if we agree
1699 if (result_->txns.id() == o.id())
1700 {
1701 CLOG(clog) << "both sets are identical. ";
1702 return;
1703 }
1704
1705 CLOG(clog) << "comparing existing with new set: " << result_->txns.id() << ',' << o.id() << ". ";
1706 JLOG(j_.debug()) << "createDisputes " << result_->txns.id() << " to " << o.id();
1707
1708 auto differences = result_->txns.compare(o);
1709
1710 int dc = 0;
1711
1712 for (auto const& [txId, inThisSet] : differences)
1713 {
1714 ++dc;
1715 // create disputed transactions (from the ledger that has them)
1716 XRPL_ASSERT(
1717 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1718 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1719 "xrpl::Consensus::createDisputes : has disputed transactions");
1720
1721 Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1722 auto txID = tx.id();
1723
1724 if (result_->disputes.find(txID) != result_->disputes.end())
1725 continue;
1726
1727 JLOG(j_.debug()) << "Transaction " << txID << " is disputed";
1728
1729 typename Result::Dispute_t dtx{
1730 tx, result_->txns.exists(txID), std::max(prevProposers_, currPeerPositions_.size()), j_};
1731
1732 // Update all of the available peer's votes on the disputed transaction
1733 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1734 {
1735 Proposal_t const& peerProp = peerPos.proposal();
1736 auto const cit = acquired_.find(peerProp.position());
1737 if (cit != acquired_.end() && dtx.setVote(nodeId, cit->second.exists(txID)))
1738 peerUnchangedCounter_ = 0;
1739 }
1740 adaptor_.share(dtx.tx());
1741
1742 result_->disputes.emplace(txID, std::move(dtx));
1743 }
1744 JLOG(j_.debug()) << dc << " differences found";
1745 CLOG(clog) << "disputes: " << dc << ". ";
1746}
1747
1748template <class Adaptor>
1749void
1751{
1752 // Cannot updateDisputes without our stance
1753 XRPL_ASSERT(result_, "xrpl::Consensus::updateDisputes : result is set");
1754
1755 // Ensure we have created disputes against this set if we haven't seen
1756 // it before
1757 if (result_->compares.find(other.id()) == result_->compares.end())
1758 createDisputes(other);
1759
1760 for (auto& it : result_->disputes)
1761 {
1762 auto& d = it.second;
1763 if (d.setVote(node, other.exists(d.tx().id())))
1764 peerUnchangedCounter_ = 0;
1765 }
1766}
1767
1768template <class Adaptor>
1771{
1772 return roundCloseTime(raw, closeResolution_);
1773}
1774
1775} // namespace xrpl
T all_of(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
Json::Int Int
Definition json_value.h:138
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:318
Stream debug() const
Definition Journal.h:300
Stream info() const
Definition Journal.h:306
Stream trace() const
Severity stream access functions.
Definition Journal.h:294
Stream warn() const
Definition Journal.h:312
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Position_t const & position() const
Get the proposed position.
NodeID_t const & nodeID() const
Identifying which peer took this position.
Measures the duration of phases of consensus.
ConsensusMode get() const
Definition Consensus.h:299
MonitoredMode(ConsensusMode m)
Definition Consensus.h:295
void set(ConsensusMode mode, Adaptor &a)
Definition Consensus.h:305
Generic implementation of consensus algorithm.
Definition Consensus.h:278
typename Adaptor::TxSet_t TxSet_t
Definition Consensus.h:280
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
Definition Consensus.h:721
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
Definition Consensus.h:1750
typename Adaptor::Ledger_t Ledger_t
Definition Consensus.h:279
bool haveCloseTimeConsensus_
Definition Consensus.h:535
MonitoredMode mode_
Definition Consensus.h:533
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
Definition Consensus.h:590
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
Definition Consensus.h:900
NetClock::duration closeResolution_
Definition Consensus.h:546
ConsensusTimer openTime_
Definition Consensus.h:544
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
Definition Consensus.h:884
typename TxSet_t::Tx Tx_t
Definition Consensus.h:282
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
Definition Consensus.h:841
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
Definition Consensus.h:1161
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
Definition Consensus.h:1277
hash_set< NodeID_t > deadNodes_
Definition Consensus.h:596
std::size_t peerUnchangedCounter_
Definition Consensus.h:577
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1669
NetClock::time_point prevCloseTime_
Definition Consensus.h:559
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
Definition Consensus.h:402
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
Definition Consensus.h:586
std::chrono::milliseconds prevRoundTime_
Definition Consensus.h:551
typename Adaptor::PeerPosition_t PeerPosition_t
Definition Consensus.h:283
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
Definition Consensus.h:611
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:995
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
Definition Consensus.h:1049
Consensus(Consensus &&) noexcept=default
NetClock::time_point asCloseTime(NetClock::time_point raw) const
Definition Consensus.h:1770
ConsensusParms::AvalancheState closeTimeAvalancheState_
Definition Consensus.h:548
ConsensusPhase phase() const
Definition Consensus.h:408
ConsensusPhase phase_
Definition Consensus.h:532
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
Definition Consensus.h:1077
Ledger_t previousLedger_
Definition Consensus.h:567
beast::Journal const j_
Definition Consensus.h:599
Ledger_t::ID prevLedgerID_
Definition Consensus.h:565
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
Definition Consensus.h:570
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
Definition Consensus.h:1094
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1333
std::size_t prevProposers_
Definition Consensus.h:593
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:656
Adaptor & adaptor_
Definition Consensus.h:530
std::size_t establishCounter_
Definition Consensus.h:580
NetClock::time_point now_
Definition Consensus.h:558
clock_type const & clock_
Definition Consensus.h:537
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
Definition Consensus.h:1687
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
Definition Consensus.h:702
typename Adaptor::NodeID_t NodeID_t
Definition Consensus.h:281
std::optional< Result > result_
Definition Consensus.h:572
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
Definition Consensus.h:810
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1563
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1389
ConsensusCloseTimes rawCloseTimes_
Definition Consensus.h:573
A transaction discovered to be in dispute during consensus.
Definition DisputedTx.h:29
T emplace(T... args)
T is_same_v
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:597
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
Definition Consensus.h:1380
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Definition Consensus.cpp:8
T read(T... args)
T size(T... args)
T str(T... args)
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
Encapsulates the result of consensus.
T time_since_epoch(T... args)
T to_string(T... args)
T value_or(T... args)