rippled
Loading...
Searching...
No Matches
Consensus.h
1#pragma once
2
3#include <xrpld/consensus/ConsensusParms.h>
4#include <xrpld/consensus/ConsensusProposal.h>
5#include <xrpld/consensus/ConsensusTypes.h>
6#include <xrpld/consensus/DisputedTx.h>
7
8#include <xrpl/basics/Log.h>
9#include <xrpl/basics/chrono.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/json/json_writer.h>
12#include <xrpl/ledger/LedgerTiming.h>
13
14#include <algorithm>
15#include <chrono>
16#include <deque>
17#include <optional>
18#include <sstream>
19
20namespace xrpl {
21
41bool
43 bool anyTransactions,
44 std::size_t prevProposers,
45 std::size_t proposersClosed,
46 std::size_t proposersValidated,
47 std::chrono::milliseconds prevRoundTime,
48 std::chrono::milliseconds timeSincePrevClose,
50 std::chrono::milliseconds idleInterval,
51 ConsensusParms const& parms,
53 std::unique_ptr<std::stringstream> const& clog = {});
54
76 std::size_t prevProposers,
77 std::size_t currentProposers,
78 std::size_t currentAgree,
79 std::size_t currentFinished,
80 std::chrono::milliseconds previousAgreeTime,
81 std::chrono::milliseconds currentAgreeTime,
82 bool stalled,
83 ConsensusParms const& parms,
84 bool proposing,
86 std::unique_ptr<std::stringstream> const& clog = {});
87
276template <class Adaptor>
278{
279 using Ledger_t = typename Adaptor::Ledger_t;
280 using TxSet_t = typename Adaptor::TxSet_t;
281 using NodeID_t = typename Adaptor::NodeID_t;
282 using Tx_t = typename TxSet_t::Tx;
283 using PeerPosition_t = typename Adaptor::PeerPosition_t;
285
287
288 // Helper class to ensure adaptor is notified whenever the ConsensusMode
289 // changes
291 {
293
294 public:
296 {
297 }
299 get() const
300 {
301 return mode_;
302 }
303
304 void
305 set(ConsensusMode mode, Adaptor& a)
306 {
307 a.onModeChange(mode_, mode);
308 mode_ = mode;
309 }
310 };
311
312public:
315
316 Consensus(Consensus&&) noexcept = default;
317
324 Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
325
341 void
343 NetClock::time_point const& now,
344 typename Ledger_t::ID const& prevLedgerID,
345 Ledger_t prevLedger,
346 hash_set<NodeID_t> const& nowUntrusted,
347 bool proposing,
348 std::unique_ptr<std::stringstream> const& clog = {});
349
356 bool
357 peerProposal(NetClock::time_point const& now, PeerPosition_t const& newProposal);
358
364 void
366 NetClock::time_point const& now,
367 std::unique_ptr<std::stringstream> const& clog = {});
368
374 void
375 gotTxSet(NetClock::time_point const& now, TxSet_t const& txSet);
376
393 void
395 NetClock::time_point const& now,
397
405 typename Ledger_t::ID
407 {
408 return prevLedgerID_;
409 }
410
412 phase() const
413 {
414 return phase_;
415 }
416
425 getJson(bool full) const;
426
427private:
428 void
430 NetClock::time_point const& now,
431 typename Ledger_t::ID const& prevLedgerID,
432 Ledger_t const& prevLedger,
433 ConsensusMode mode,
435
436 // Change our view of the previous ledger
437 void
439 typename Ledger_t::ID const& lgrId,
441
447 void
449
453 void
455
458 bool
460
467 void
469
478 void
480
503 bool
505
506 // Close the open ledger and establish initial position.
507 void
509
510 // Adjust our positions to try to agree with other validators.
511 void
513
514 bool
516
517 // Create disputes between our position and the provided one.
518 void
520
521 // Update our disputes given that this node has adopted a new position.
522 // Will call createDisputes as needed.
523 void
524 updateDisputes(NodeID_t const& node, TxSet_t const& other);
525
526 // Revoke our outstanding proposal, if any, and cease proposing
527 // until this round ends.
528 void
530
531 // The rounded or effective close time estimate from a proposer
534
535private:
536 Adaptor& adaptor_;
537
540 bool firstRound_ = true;
542
544
545 // How long the consensus convergence has taken, expressed as
546 // a percentage of the time that we expected it to take.
548
549 // How long has this round been open
551
553
555
556 // Time it took for the last consensus round to converge
558
559 //-------------------------------------------------------------------------
560 // Network time measurements of consensus progress
561
562 // The current network adjusted time. This is the network time the
563 // ledger would close if it closed now
566
567 //-------------------------------------------------------------------------
568 // Non-peer (self) consensus data
569
570 // Last validated ledger ID provided to consensus
571 typename Ledger_t::ID prevLedgerID_;
572 // Last validated ledger seen by consensus
574
575 // Transaction Sets, indexed by hash of transaction tree
577
580
581 // The number of calls to phaseEstablish where none of our peers
582 // have changed any votes on disputed transactions.
584
585 // The total number of times we have called phaseEstablish
587
588 //-------------------------------------------------------------------------
589 // Peer related consensus data
590
591 // Peer proposed positions for the current round
593
594 // Recently received peer positions, available when transitioning between
595 // ledgers or rounds
597
598 // The number of proposers who participated in the last consensus round
600
601 // nodes that have bowed out of this consensus process
603
604 // Journal for debugging
606};
607
608template <class Adaptor>
609Consensus<Adaptor>::Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal journal)
610 : adaptor_(adaptor), clock_(clock), j_{journal}
611{
612 JLOG(j_.debug()) << "Creating consensus object";
613}
614
615template <class Adaptor>
616void
618 NetClock::time_point const& now,
619 typename Ledger_t::ID const& prevLedgerID,
620 Ledger_t prevLedger,
621 hash_set<NodeID_t> const& nowUntrusted,
622 bool proposing,
624{
625 if (firstRound_)
626 {
627 // take our initial view of closeTime_ from the seed ledger
628 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
629 prevCloseTime_ = prevLedger.closeTime();
630 firstRound_ = false;
631 }
632 else
633 {
634 prevCloseTime_ = rawCloseTimes_.self;
635 }
636
637 for (NodeID_t const& n : nowUntrusted)
638 recentPeerPositions_.erase(n);
639
641
642 // We were handed the wrong ledger
643 if (prevLedger.id() != prevLedgerID)
644 {
645 // try to acquire the correct one
646 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID))
647 {
648 prevLedger = *newLedger;
649 }
650 else // Unable to acquire the correct ledger
651 {
652 startMode = ConsensusMode::wrongLedger;
653 JLOG(j_.info()) << "Entering consensus with: " << previousLedger_.id();
654 JLOG(j_.info()) << "Correct LCL is: " << prevLedgerID;
655 }
656 }
657
658 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
659}
660template <class Adaptor>
661void
663 NetClock::time_point const& now,
664 typename Ledger_t::ID const& prevLedgerID,
665 Ledger_t const& prevLedger,
666 ConsensusMode mode,
668{
669 phase_ = ConsensusPhase::open;
670 JLOG(j_.debug()) << "transitioned to ConsensusPhase::open ";
671 CLOG(clog) << "startRoundInternal transitioned to ConsensusPhase::open, "
672 "previous ledgerID: "
673 << prevLedgerID << ", seq: " << prevLedger.seq() << ". ";
674 mode_.set(mode, adaptor_);
675 now_ = now;
676 prevLedgerID_ = prevLedgerID;
677 previousLedger_ = prevLedger;
678 result_.reset();
679 convergePercent_ = 0;
680 closeTimeAvalancheState_ = ConsensusParms::init;
681 haveCloseTimeConsensus_ = false;
682 openTime_.reset(clock_.now());
683 currPeerPositions_.clear();
684 acquired_.clear();
685 rawCloseTimes_.peers.clear();
686 rawCloseTimes_.self = {};
687 deadNodes_.clear();
688
689 closeResolution_ = getNextLedgerTimeResolution(
690 previousLedger_.closeTimeResolution(),
691 previousLedger_.closeAgree(),
692 previousLedger_.seq() + typename Ledger_t::Seq{1});
693
694 playbackProposals();
695 CLOG(clog) << "number of peer proposals,previous proposers: " << currPeerPositions_.size()
696 << ',' << prevProposers_ << ". ";
697 if (currPeerPositions_.size() > (prevProposers_ / 2))
698 {
699 // We may be falling behind, don't wait for the timer
700 // consider closing the ledger immediately
701 CLOG(clog) << "consider closing the ledger immediately. ";
702 timerEntry(now_, clog);
703 }
704}
705
706template <class Adaptor>
707bool
709{
710 JLOG(j_.debug()) << "PROPOSAL " << newPeerPos.render();
711 auto const& peerID = newPeerPos.proposal().nodeID();
712
713 // Always need to store recent positions
714 {
715 auto& props = recentPeerPositions_[peerID];
716
717 if (props.size() >= 10)
718 props.pop_front();
719
720 props.push_back(newPeerPos);
721 }
722 return peerProposalInternal(now, newPeerPos);
723}
724
725template <class Adaptor>
726bool
728 NetClock::time_point const& now,
729 PeerPosition_t const& newPeerPos)
730{
731 // Nothing to do for now if we are currently working on a ledger
732 if (phase_ == ConsensusPhase::accepted)
733 return false;
734
735 now_ = now;
736
737 auto const& newPeerProp = newPeerPos.proposal();
738
739 if (newPeerProp.prevLedger() != prevLedgerID_)
740 {
741 JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger() << " but we are on "
742 << prevLedgerID_;
743 return false;
744 }
745
746 auto const& peerID = newPeerProp.nodeID();
747
748 if (deadNodes_.find(peerID) != deadNodes_.end())
749 {
750 JLOG(j_.info()) << "Position from dead node: " << peerID;
751 return false;
752 }
753
754 {
755 // update current position
756 auto peerPosIt = currPeerPositions_.find(peerID);
757
758 if (peerPosIt != currPeerPositions_.end())
759 {
760 if (newPeerProp.proposeSeq() <= peerPosIt->second.proposal().proposeSeq())
761 {
762 return false;
763 }
764 }
765
766 if (newPeerProp.isBowOut())
767 {
768 JLOG(j_.info()) << "Peer " << peerID << " bows out";
769 if (result_)
770 {
771 for (auto& it : result_->disputes)
772 it.second.unVote(peerID);
773 }
774 if (peerPosIt != currPeerPositions_.end())
775 currPeerPositions_.erase(peerID);
776 deadNodes_.insert(peerID);
777
778 return true;
779 }
780
781 if (peerPosIt != currPeerPositions_.end())
782 peerPosIt->second = newPeerPos;
783 else
784 currPeerPositions_.emplace(peerID, newPeerPos);
785 }
786
787 if (newPeerProp.isInitial())
788 {
789 // Record the close time estimate
790 JLOG(j_.trace()) << "Peer reports close time as "
791 << newPeerProp.closeTime().time_since_epoch().count();
792 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
793 }
794
795 JLOG(j_.trace()) << "Processing peer proposal " << newPeerProp.proposeSeq() << "/"
796 << newPeerProp.position();
797
798 {
799 auto const ait = acquired_.find(newPeerProp.position());
800 if (ait == acquired_.end())
801 {
802 // acquireTxSet will return the set if it is available, or
803 // spawn a request for it and return nullopt/nullptr. It will call
804 // gotTxSet once it arrives
805 if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
806 gotTxSet(now_, *set);
807 else
808 JLOG(j_.debug()) << "Don't have tx set for peer";
809 }
810 else if (result_)
811 {
812 updateDisputes(newPeerProp.nodeID(), ait->second);
813 }
814 }
815
816 return true;
817}
818
819template <class Adaptor>
820void
822 NetClock::time_point const& now,
824{
825 CLOG(clog) << "Consensus<Adaptor>::timerEntry. ";
826 // Nothing to do if we are currently working on a ledger
827 if (phase_ == ConsensusPhase::accepted)
828 {
829 CLOG(clog) << "Nothing to do during accepted phase. ";
830 return;
831 }
832
833 now_ = now;
834 CLOG(clog) << "Set network adjusted time to " << to_string(now) << ". ";
835
836 // Check we are on the proper ledger (this may change phase_)
837 auto const phaseOrig = phase_;
838 CLOG(clog) << "Phase " << to_string(phaseOrig) << ". ";
839 checkLedger(clog);
840 if (phaseOrig != phase_)
841 {
842 CLOG(clog) << "Changed phase to << " << to_string(phase_) << ". ";
843 }
844
845 if (phase_ == ConsensusPhase::open)
846 phaseOpen(clog);
847 else if (phase_ == ConsensusPhase::establish)
848 phaseEstablish(clog);
849 CLOG(clog) << "timerEntry finishing in phase " << to_string(phase_) << ". ";
850}
851
852template <class Adaptor>
853void
855{
856 // Nothing to do if we've finished work on a ledger
857 if (phase_ == ConsensusPhase::accepted)
858 return;
859
860 now_ = now;
861
862 auto id = txSet.id();
863
864 // If we've already processed this transaction set since requesting
865 // it from the network, there is nothing to do now
866 if (!acquired_.emplace(id, txSet).second)
867 return;
868
869 if (!result_)
870 {
871 JLOG(j_.debug()) << "Not creating disputes: no position yet.";
872 }
873 else
874 {
875 // Our position is added to acquired_ as soon as we create it,
876 // so this txSet must differ
877 XRPL_ASSERT(
878 id != result_->position.position(),
879 "xrpl::Consensus::gotTxSet : updated transaction set");
880 bool any = false;
881 for (auto const& [nodeId, peerPos] : currPeerPositions_)
882 {
883 if (peerPos.proposal().position() == id)
884 {
885 updateDisputes(nodeId, txSet);
886 any = true;
887 }
888 }
889
890 if (!any)
891 {
892 JLOG(j_.warn()) << "By the time we got " << id << " no peers were proposing it";
893 }
894 }
895}
896
897template <class Adaptor>
898void
900 NetClock::time_point const& now,
902{
903 using namespace std::chrono_literals;
904 JLOG(j_.info()) << "Simulating consensus";
905 now_ = now;
906 closeLedger({});
907 result_->roundTime.tick(consensusDelay.value_or(100ms));
908 result_->proposers = prevProposers_ = currPeerPositions_.size();
909 prevRoundTime_ = result_->roundTime.read();
911 adaptor_.onForceAccept(
912 *result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_.get(), getJson(true));
913 JLOG(j_.info()) << "Simulation complete";
914}
915
916template <class Adaptor>
919{
920 using std::to_string;
921 using Int = Json::Value::Int;
922
924
925 ret["proposing"] = (mode_.get() == ConsensusMode::proposing);
926 ret["proposers"] = static_cast<int>(currPeerPositions_.size());
927
928 if (mode_.get() != ConsensusMode::wrongLedger)
929 {
930 ret["synched"] = true;
931 ret["ledger_seq"] = static_cast<std::uint32_t>(previousLedger_.seq()) + 1;
932 ret["close_granularity"] = static_cast<Int>(closeResolution_.count());
933 }
934 else
935 ret["synched"] = false;
936
937 ret["phase"] = to_string(phase_);
938
939 if (result_ && !result_->disputes.empty() && !full)
940 ret["disputes"] = static_cast<Int>(result_->disputes.size());
941
942 if (result_)
943 ret["our_position"] = result_->position.getJson();
944
945 if (full)
946 {
947 if (result_)
948 ret["current_ms"] = static_cast<Int>(result_->roundTime.read().count());
949 ret["converge_percent"] = convergePercent_;
950 ret["close_resolution"] = static_cast<Int>(closeResolution_.count());
951 ret["have_time_consensus"] = haveCloseTimeConsensus_;
952 ret["previous_proposers"] = static_cast<Int>(prevProposers_);
953 ret["previous_mseconds"] = static_cast<Int>(prevRoundTime_.count());
954
955 if (!currPeerPositions_.empty())
956 {
958
959 for (auto const& [nodeId, peerPos] : currPeerPositions_)
960 {
961 ppj[to_string(nodeId)] = peerPos.getJson();
962 }
963 ret["peer_positions"] = std::move(ppj);
964 }
965
966 if (!acquired_.empty())
967 {
969 for (auto const& at : acquired_)
970 {
971 acq.append(to_string(at.first));
972 }
973 ret["acquired"] = std::move(acq);
974 }
975
976 if (result_ && !result_->disputes.empty())
977 {
979 for (auto const& [txId, dispute] : result_->disputes)
980 {
981 dsj[to_string(txId)] = dispute.getJson();
982 }
983 ret["disputes"] = std::move(dsj);
984 }
985
986 if (!rawCloseTimes_.peers.empty())
987 {
989 for (auto const& ct : rawCloseTimes_.peers)
990 {
991 ctj[std::to_string(ct.first.time_since_epoch().count())] = ct.second;
992 }
993 ret["close_times"] = std::move(ctj);
994 }
995
996 if (!deadNodes_.empty())
997 {
999 for (auto const& dn : deadNodes_)
1000 {
1001 dnj.append(to_string(dn));
1002 }
1003 ret["dead_nodes"] = std::move(dnj);
1004 }
1005 }
1006
1007 return ret;
1008}
1009
1010// Handle a change in the prior ledger during a consensus round
1011template <class Adaptor>
1012void
1014 typename Ledger_t::ID const& lgrId,
1016{
1017 CLOG(clog) << "handleWrongLedger. ";
1018 XRPL_ASSERT(
1019 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1020 "xrpl::Consensus::handleWrongLedger : have wrong ledger");
1021
1022 // Stop proposing because we are out of sync
1023 leaveConsensus(clog);
1024
1025 // First time switching to this ledger
1026 if (prevLedgerID_ != lgrId)
1027 {
1028 prevLedgerID_ = lgrId;
1029
1030 // Clear out state
1031 if (result_)
1032 {
1033 result_->disputes.clear();
1034 result_->compares.clear();
1035 }
1036
1037 currPeerPositions_.clear();
1038 rawCloseTimes_.peers.clear();
1039 deadNodes_.clear();
1040
1041 // Get back in sync, this will also recreate disputes
1042 playbackProposals();
1043 }
1044
1045 if (previousLedger_.id() == prevLedgerID_)
1046 {
1047 CLOG(clog) << "previousLedger_.id() == prevLeverID_ " << prevLedgerID_ << ". ";
1048 return;
1049 }
1050
1051 // we need to switch the ledger we're working from
1052 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1053 {
1054 JLOG(j_.info()) << "Have the consensus ledger " << prevLedgerID_;
1055 CLOG(clog) << "Have the consensus ledger " << prevLedgerID_ << ". ";
1056 startRoundInternal(now_, lgrId, *newLedger, ConsensusMode::switchedLedger, clog);
1057 }
1058 else
1059 {
1060 CLOG(clog) << "Still on wrong ledger. ";
1061 mode_.set(ConsensusMode::wrongLedger, adaptor_);
1062 }
1063}
1064
1065template <class Adaptor>
1066void
1068{
1069 CLOG(clog) << "checkLedger. ";
1070
1071 auto netLgr = adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1072 CLOG(clog) << "network ledgerid " << netLgr << ", "
1073 << "previous ledger " << prevLedgerID_ << ". ";
1074
1075 if (netLgr != prevLedgerID_)
1076 {
1078 ss << "View of consensus changed during " << to_string(phase_)
1079 << " mode=" << to_string(mode_.get()) << ", " << prevLedgerID_ << " to " << netLgr
1080 << ", " << Json::Compact{previousLedger_.getJson()} << ". ";
1081 JLOG(j_.warn()) << ss.str();
1082 CLOG(clog) << ss.str();
1083 CLOG(clog) << "State on consensus change " << Json::Compact{getJson(true)} << ". ";
1084 handleWrongLedger(netLgr, clog);
1085 }
1086 else if (previousLedger_.id() != prevLedgerID_)
1087 {
1088 CLOG(clog) << "previousLedger_.id() != prevLedgerID_: " << previousLedger_.id() << ','
1089 << to_string(prevLedgerID_) << ". ";
1090 handleWrongLedger(netLgr, clog);
1091 }
1092}
1093
1094template <class Adaptor>
1095void
1097{
1098 for (auto const& it : recentPeerPositions_)
1099 {
1100 for (auto const& pos : it.second)
1101 {
1102 if (pos.proposal().prevLedger() == prevLedgerID_)
1103 {
1104 if (peerProposalInternal(now_, pos))
1105 adaptor_.share(pos);
1106 }
1107 }
1108 }
1109}
1110
1111template <class Adaptor>
1112void
1114{
1115 CLOG(clog) << "phaseOpen. ";
1116 using namespace std::chrono;
1117
1118 // it is shortly before ledger close time
1119 bool const anyTransactions = adaptor_.hasOpenTransactions();
1120 auto proposersClosed = currPeerPositions_.size();
1121 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1122
1123 openTime_.tick(clock_.now());
1124
1125 // This computes how long since last ledger's close time
1126 milliseconds sinceClose;
1127 {
1128 auto const mode = mode_.get();
1129 bool const closeAgree = previousLedger_.closeAgree();
1130 auto const prevCloseTime = previousLedger_.closeTime();
1131 auto const prevParentCloseTimePlus1 = previousLedger_.parentCloseTime() + 1s;
1132 bool const previousCloseCorrect = (mode != ConsensusMode::wrongLedger) && closeAgree &&
1133 (prevCloseTime != prevParentCloseTimePlus1);
1134
1135 auto const lastCloseTime = previousCloseCorrect
1136 ? prevCloseTime // use consensus timing
1137 : prevCloseTime_; // use the time we saw internally
1138
1139 if (now_ >= lastCloseTime)
1140 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1141 else
1142 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1143 CLOG(clog) << "calculating how long since last ledger's close time "
1144 "based on mode : "
1145 << to_string(mode) << ", previous closeAgree: " << closeAgree
1146 << ", previous close time: " << to_string(prevCloseTime)
1147 << ", previous parent close time + 1s: " << to_string(prevParentCloseTimePlus1)
1148 << ", previous close time seen internally: " << to_string(prevCloseTime_)
1149 << ", last close time: " << to_string(lastCloseTime)
1150 << ", since close: " << sinceClose.count() << ". ";
1151 }
1152
1153 auto const idleInterval = std::max<milliseconds>(
1154 adaptor_.parms().ledgerIDLE_INTERVAL, 2 * previousLedger_.closeTimeResolution());
1155 CLOG(clog) << "idle interval set to " << idleInterval.count() << "ms based on "
1156 << "ledgerIDLE_INTERVAL: " << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1157 << ", previous ledger close time resolution: "
1158 << previousLedger_.closeTimeResolution().count() << "ms. ";
1159
1160 // Decide if we should close the ledger
1162 anyTransactions,
1163 prevProposers_,
1164 proposersClosed,
1165 proposersValidated,
1166 prevRoundTime_,
1167 sinceClose,
1168 openTime_.read(),
1169 idleInterval,
1170 adaptor_.parms(),
1171 j_,
1172 clog))
1173 {
1174 CLOG(clog) << "closing ledger. ";
1175 closeLedger(clog);
1176 }
1177}
1178
1179template <class Adaptor>
1180bool
1182{
1183 CLOG(clog) << "shouldPause? ";
1184 auto const& parms = adaptor_.parms();
1185 std::uint32_t const ahead(
1186 previousLedger_.seq() - std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1187 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1188 std::size_t const totalValidators = trustedKeys.size();
1189 std::size_t const laggards = adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1190 std::size_t const offline = trustedKeys.size();
1191
1192 std::stringstream vars;
1193 vars << " consensuslog (working seq: " << previousLedger_.seq() << ", "
1194 << "validated seq: " << adaptor_.getValidLedgerIndex() << ", "
1195 << "am validator: " << adaptor_.validator() << ", "
1196 << "have validated: " << adaptor_.haveValidated() << ", "
1197 << "roundTime: " << result_->roundTime.read().count() << ", "
1198 << "max consensus time: " << parms.ledgerMAX_CONSENSUS.count() << ", "
1199 << "validators: " << totalValidators << ", "
1200 << "laggards: " << laggards << ", "
1201 << "offline: " << offline << ", "
1202 << "quorum: " << quorum << ")";
1203
1204 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
1205 !adaptor_.haveValidated() || result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1206 {
1207 j_.debug() << "not pausing (early)" << vars.str();
1208 CLOG(clog) << "Not pausing (early). ";
1209 return false;
1210 }
1211
1212 bool willPause = false;
1213
1227 constexpr static std::size_t maxPausePhase = 4;
1228
1248 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1249
1250 // validators that remain after the laggards() function are considered
1251 // offline, and should be considered as laggards for purposes of
1252 // evaluating whether the threshold for non-laggards has been reached.
1253 switch (phase)
1254 {
1255 case 0:
1256 // Laggards and offline shouldn't preclude consensus.
1257 if (laggards + offline > totalValidators - quorum)
1258 willPause = true;
1259 break;
1260 case maxPausePhase:
1261 // No tolerance.
1262 willPause = true;
1263 break;
1264 default:
1265 // Ensure that sufficient validators are known to be not lagging.
1266 // Their sufficiently most recent validation sequence was equal to
1267 // or greater than our own.
1268 //
1269 // The threshold is the amount required for quorum plus
1270 // the proportion of the remainder based on number of intermediate
1271 // phases between 0 and max.
1272 float const nonLaggards = totalValidators - (laggards + offline);
1273 float const quorumRatio = static_cast<float>(quorum) / totalValidators;
1274 float const allowedDissent = 1.0f - quorumRatio;
1275 float const phaseFactor = static_cast<float>(phase) / maxPausePhase;
1276
1277 if (nonLaggards / totalValidators < quorumRatio + (allowedDissent * phaseFactor))
1278 {
1279 willPause = true;
1280 }
1281 }
1282
1283 if (willPause)
1284 {
1285 j_.warn() << "pausing" << vars.str();
1286 CLOG(clog) << "pausing " << vars.str() << ". ";
1287 }
1288 else
1289 {
1290 j_.debug() << "not pausing" << vars.str();
1291 CLOG(clog) << "not pausing. ";
1292 }
1293 return willPause;
1294}
1295
1296template <class Adaptor>
1297void
1299{
1300 CLOG(clog) << "phaseEstablish. ";
1301 // can only establish consensus if we already took a stance
1302 XRPL_ASSERT(result_, "xrpl::Consensus::phaseEstablish : result is set");
1303
1304 ++peerUnchangedCounter_;
1305 ++establishCounter_;
1306
1307 using namespace std::chrono;
1308 ConsensusParms const& parms = adaptor_.parms();
1309
1310 result_->roundTime.tick(clock_.now());
1311 result_->proposers = currPeerPositions_.size();
1312
1313 convergePercent_ = result_->roundTime.read() * 100 /
1314 std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
1315 CLOG(clog) << "convergePercent_ " << convergePercent_
1316 << " is based on round duration so far: " << result_->roundTime.read().count()
1317 << "ms, "
1318 << "previous round duration: " << prevRoundTime_.count() << "ms, "
1319 << "avMIN_CONSENSUS_TIME: " << parms.avMIN_CONSENSUS_TIME.count() << "ms. ";
1320
1321 // Give everyone a chance to take an initial position
1322 if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
1323 {
1324 CLOG(clog) << "ledgerMIN_CONSENSUS not reached: " << parms.ledgerMIN_CONSENSUS.count()
1325 << "ms. ";
1326 return;
1327 }
1328
1329 updateOurPositions(clog);
1330
1331 // Nothing to do if too many laggards or we don't have consensus.
1332 if (shouldPause(clog) || !haveConsensus(clog))
1333 return;
1334
1335 if (!haveCloseTimeConsensus_)
1336 {
1337 JLOG(j_.info()) << "We have TX consensus but not CT consensus";
1338 CLOG(clog) << "We have TX consensus but not CT consensus. ";
1339 return;
1340 }
1341
1342 JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size() << " participants)";
1343 CLOG(clog) << "Converge cutoff (" << currPeerPositions_.size()
1344 << " participants). Transitioned to ConsensusPhase::accepted. ";
1345 adaptor_.updateOperatingMode(currPeerPositions_.size());
1346 prevProposers_ = currPeerPositions_.size();
1347 prevRoundTime_ = result_->roundTime.read();
1348 phase_ = ConsensusPhase::accepted;
1349 JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
1350 adaptor_.onAccept(
1351 *result_,
1352 previousLedger_,
1353 closeResolution_,
1354 rawCloseTimes_,
1355 mode_.get(),
1356 getJson(true),
1357 adaptor_.validating());
1358}
1359
1360template <class Adaptor>
1361void
1363{
1364 // We should not be closing if we already have a position
1365 XRPL_ASSERT(!result_, "xrpl::Consensus::closeLedger : result is not set");
1366
1368 JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
1369 rawCloseTimes_.self = now_;
1370 peerUnchangedCounter_ = 0;
1371 establishCounter_ = 0;
1372
1373 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1374 result_->roundTime.reset(clock_.now());
1375 // Share the newly created transaction set if we haven't already
1376 // received it from a peer
1377 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1378 adaptor_.share(result_->txns);
1379
1380 auto const mode = mode_.get();
1381 CLOG(clog) << "closeLedger transitioned to ConsensusPhase::establish, mode: " << to_string(mode)
1382 << ", number of peer positions: " << currPeerPositions_.size() << ". ";
1383 if (mode == ConsensusMode::proposing)
1384 adaptor_.propose(result_->position);
1385
1386 // Create disputes with any peer positions we have transactions for
1387 for (auto const& pit : currPeerPositions_)
1388 {
1389 auto const& pos = pit.second.proposal().position();
1390 auto const it = acquired_.find(pos);
1391 if (it != acquired_.end())
1392 createDisputes(it->second, clog);
1393 }
1394}
1395
1408inline int
1409participantsNeeded(int participants, int percent)
1410{
1411 int const result = ((participants * percent) + (percent / 2)) / 100;
1412
1413 return (result == 0) ? 1 : result;
1414}
1415
1416template <class Adaptor>
1417void
1419{
1420 // We must have a position if we are updating it
1421 XRPL_ASSERT(result_, "xrpl::Consensus::updateOurPositions : result is set");
1422 ConsensusParms const& parms = adaptor_.parms();
1423
1424 // Compute a cutoff time
1425 auto const peerCutoff = now_ - parms.proposeFRESHNESS;
1426 auto const ourCutoff = now_ - parms.proposeINTERVAL;
1427 CLOG(clog) << "updateOurPositions. peerCutoff " << to_string(peerCutoff) << ", ourCutoff "
1428 << to_string(ourCutoff) << ". ";
1429
1430 // Verify freshness of peer positions and compute close times
1432 {
1433 auto it = currPeerPositions_.begin();
1434 while (it != currPeerPositions_.end())
1435 {
1436 Proposal_t const& peerProp = it->second.proposal();
1437 if (peerProp.isStale(peerCutoff))
1438 {
1439 // peer's proposal is stale, so remove it
1440 NodeID_t const& peerID = peerProp.nodeID();
1441 JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
1442 for (auto& dt : result_->disputes)
1443 dt.second.unVote(peerID);
1444 it = currPeerPositions_.erase(it);
1445 }
1446 else
1447 {
1448 // proposal is still fresh
1449 ++closeTimeVotes[asCloseTime(peerProp.closeTime())];
1450 ++it;
1451 }
1452 }
1453 }
1454
1455 // This will stay unseated unless there are any changes
1456 std::optional<TxSet_t> ourNewSet;
1457
1458 // Update votes on disputed transactions
1459 {
1461 for (auto& [txId, dispute] : result_->disputes)
1462 {
1463 // Because the threshold for inclusion increases,
1464 // time can change our position on a dispute
1465 if (dispute.updateVote(
1466 convergePercent_, mode_.get() == ConsensusMode::proposing, parms))
1467 {
1468 if (!mutableSet)
1469 mutableSet.emplace(result_->txns);
1470
1471 if (dispute.getOurVote())
1472 {
1473 // now a yes
1474 mutableSet->insert(dispute.tx());
1475 }
1476 else
1477 {
1478 // now a no
1479 mutableSet->erase(txId);
1480 }
1481 }
1482 }
1483
1484 if (mutableSet)
1485 ourNewSet.emplace(std::move(*mutableSet));
1486 }
1487
1488 NetClock::time_point consensusCloseTime = {};
1489 haveCloseTimeConsensus_ = false;
1490
1491 if (currPeerPositions_.empty())
1492 {
1493 // no other times
1494 haveCloseTimeConsensus_ = true;
1495 consensusCloseTime = asCloseTime(result_->position.closeTime());
1496 }
1497 else
1498 {
1499 // We don't track rounds for close time, so just pass 0s
1500 auto const [neededWeight, newState] =
1501 getNeededWeight(parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
1502 if (newState)
1503 closeTimeAvalancheState_ = *newState;
1504 CLOG(clog) << "neededWeight " << neededWeight << ". ";
1505
1506 int participants = currPeerPositions_.size();
1507 if (mode_.get() == ConsensusMode::proposing)
1508 {
1509 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1510 ++participants;
1511 }
1512
1513 // Threshold for non-zero vote
1514 int threshVote = participantsNeeded(participants, neededWeight);
1515
1516 // Threshold to declare consensus
1517 int const threshConsensus = participantsNeeded(participants, parms.avCT_CONSENSUS_PCT);
1518
1520 ss << "Proposers:" << currPeerPositions_.size() << " nw:" << neededWeight
1521 << " thrV:" << threshVote << " thrC:" << threshConsensus;
1522 JLOG(j_.info()) << ss.str();
1523 CLOG(clog) << ss.str();
1524
1525 for (auto const& [t, v] : closeTimeVotes)
1526 {
1527 JLOG(j_.debug()) << "CCTime: seq "
1528 << static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": "
1529 << t.time_since_epoch().count() << " has " << v << ", " << threshVote
1530 << " required";
1531
1532 if (v >= threshVote)
1533 {
1534 // A close time has enough votes for us to try to agree
1535 consensusCloseTime = t;
1536 threshVote = v;
1537
1538 if (threshVote >= threshConsensus)
1539 haveCloseTimeConsensus_ = true;
1540 }
1541 }
1542
1543 if (!haveCloseTimeConsensus_)
1544 {
1545 JLOG(j_.debug()) << "No CT consensus:"
1546 << " Proposers:" << currPeerPositions_.size()
1547 << " Mode:" << to_string(mode_.get()) << " Thresh:" << threshConsensus
1548 << " Pos:" << consensusCloseTime.time_since_epoch().count();
1549 CLOG(clog) << "No close time consensus. ";
1550 }
1551 }
1552
1553 if (!ourNewSet &&
1554 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1555 result_->position.isStale(ourCutoff)))
1556 {
1557 // close time changed or our position is stale
1558 ourNewSet.emplace(result_->txns);
1559 }
1560
1561 if (ourNewSet)
1562 {
1563 auto newID = ourNewSet->id();
1564
1565 result_->txns = std::move(*ourNewSet);
1566
1568 ss << "Position change: CTime " << consensusCloseTime.time_since_epoch().count() << ", tx "
1569 << newID;
1570 JLOG(j_.info()) << ss.str();
1571 CLOG(clog) << ss.str();
1572
1573 result_->position.changePosition(newID, consensusCloseTime, now_);
1574
1575 // Share our new transaction set and update disputes
1576 // if we haven't already received it
1577 if (acquired_.emplace(newID, result_->txns).second)
1578 {
1579 if (!result_->position.isBowOut())
1580 adaptor_.share(result_->txns);
1581
1582 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1583 {
1584 Proposal_t const& p = peerPos.proposal();
1585 if (p.position() == newID)
1586 updateDisputes(nodeId, result_->txns);
1587 }
1588 }
1589
1590 // Share our new position if we are still participating this round
1591 if (!result_->position.isBowOut() && (mode_.get() == ConsensusMode::proposing))
1592 adaptor_.propose(result_->position);
1593 }
1594}
1595
1596template <class Adaptor>
1597bool
1599{
1600 // Must have a stance if we are checking for consensus
1601 XRPL_ASSERT(result_, "xrpl::Consensus::haveConsensus : has result");
1602
1603 // CHECKME: should possibly count unacquired TX sets as disagreeing
1604 int agree = 0, disagree = 0;
1605
1606 auto ourPosition = result_->position.position();
1607
1608 // Count number of agreements/disagreements with our position
1609 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1610 {
1611 Proposal_t const& peerProp = peerPos.proposal();
1612 if (peerProp.position() == ourPosition)
1613 {
1614 ++agree;
1615 }
1616 else
1617 {
1618 JLOG(j_.debug()) << "Proposal disagreement: Peer " << nodeId << " has "
1619 << peerProp.position();
1620 ++disagree;
1621 }
1622 }
1623 auto currentFinished = adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1624
1625 JLOG(j_.debug()) << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree;
1626
1627 ConsensusParms const& parms = adaptor_.parms();
1628 // Stalling is BAD. It means that we have a consensus on the close time, so
1629 // peers are talking, but we have disputed transactions that peers are
1630 // unable or unwilling to come to agreement on one way or the other.
1631 bool const stalled =
1632 haveCloseTimeConsensus_ && !result_->disputes.empty() &&
1633 std::ranges::all_of(result_->disputes, [this, &parms, &clog](auto const& dispute) {
1634 return dispute.second.stalled(
1635 parms, mode_.get() == ConsensusMode::proposing, peerUnchangedCounter_, j_, clog);
1636 });
1637 if (stalled)
1638 {
1640 ss << "Consensus detects as stalled with " << (agree + disagree) << "/" << prevProposers_
1641 << " proposers, and " << result_->disputes.size() << " stalled disputed transactions.";
1642 JLOG(j_.error()) << ss.str();
1643 CLOG(clog) << ss.str();
1644 }
1645
1646 // Determine if we actually have consensus or not
1647 result_->state = checkConsensus(
1648 prevProposers_,
1649 agree + disagree,
1650 agree,
1651 currentFinished,
1652 prevRoundTime_,
1653 result_->roundTime.read(),
1654 stalled,
1655 parms,
1656 mode_.get() == ConsensusMode::proposing,
1657 j_,
1658 clog);
1659
1660 if (result_->state == ConsensusState::No)
1661 {
1662 CLOG(clog) << "No consensus. ";
1663 return false;
1664 }
1665
1666 // Consensus has taken far too long. Drop out of the round.
1667 if (result_->state == ConsensusState::Expired)
1668 {
1669 static auto const minimumCounter = parms.avalancheCutoffs.size() * parms.avMIN_ROUNDS;
1671 if (establishCounter_ < minimumCounter)
1672 {
1673 // If each round of phaseEstablish takes a very long time, we may
1674 // "expire" before we've given consensus enough time at each
1675 // avalanche level to actually come to a consensus. In that case,
1676 // keep trying. This should only happen if there are an extremely
1677 // large number of disputes such that each round takes an inordinate
1678 // amount of time.
1679
1680 ss << "Consensus time has expired in round " << establishCounter_
1681 << "; continue until round " << minimumCounter << ". "
1682 << Json::Compact{getJson(false)};
1683 JLOG(j_.error()) << ss.str();
1684 CLOG(clog) << ss.str() << ". ";
1685 return false;
1686 }
1687 ss << "Consensus expired. " << Json::Compact{getJson(true)};
1688 JLOG(j_.error()) << ss.str();
1689 CLOG(clog) << ss.str() << ". ";
1690 leaveConsensus(clog);
1691 }
1692 // There is consensus, but we need to track if the network moved on
1693 // without us.
1694 if (result_->state == ConsensusState::MovedOn)
1695 {
1696 JLOG(j_.error()) << "Unable to reach consensus";
1697 JLOG(j_.error()) << Json::Compact{getJson(true)};
1698 CLOG(clog) << "Unable to reach consensus " << Json::Compact{getJson(true)} << ". ";
1699 }
1700
1701 CLOG(clog) << "Consensus has been reached. ";
1702 return true;
1703}
1704
1705template <class Adaptor>
1706void
1708{
1709 if (mode_.get() == ConsensusMode::proposing)
1710 {
1711 if (result_ && !result_->position.isBowOut())
1712 {
1713 result_->position.bowOut(now_);
1714 adaptor_.propose(result_->position);
1715 }
1716
1717 mode_.set(ConsensusMode::observing, adaptor_);
1718 JLOG(j_.info()) << "Bowing out of consensus";
1719 CLOG(clog) << "Bowing out of consensus. ";
1720 }
1721}
1722
1723template <class Adaptor>
1724void
1726{
1727 // Cannot create disputes without our stance
1728 XRPL_ASSERT(result_, "xrpl::Consensus::createDisputes : result is set");
1729
1730 // Only create disputes if this is a new set
1731 auto const emplaced = result_->compares.emplace(o.id()).second;
1732 CLOG(clog) << "createDisputes: new set? " << !emplaced << ". ";
1733 if (!emplaced)
1734 return;
1735
1736 // Nothing to dispute if we agree
1737 if (result_->txns.id() == o.id())
1738 {
1739 CLOG(clog) << "both sets are identical. ";
1740 return;
1741 }
1742
1743 CLOG(clog) << "comparing existing with new set: " << result_->txns.id() << ',' << o.id()
1744 << ". ";
1745 JLOG(j_.debug()) << "createDisputes " << result_->txns.id() << " to " << o.id();
1746
1747 auto differences = result_->txns.compare(o);
1748
1749 int dc = 0;
1750
1751 for (auto const& [txId, inThisSet] : differences)
1752 {
1753 ++dc;
1754 // create disputed transactions (from the ledger that has them)
1755 XRPL_ASSERT(
1756 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1757 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1758 "xrpl::Consensus::createDisputes : has disputed transactions");
1759
1760 Tx_t const tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1761 auto txID = tx.id();
1762
1763 if (result_->disputes.find(txID) != result_->disputes.end())
1764 continue;
1765
1766 JLOG(j_.debug()) << "Transaction " << txID << " is disputed";
1767
1768 typename Result::Dispute_t dtx{
1769 tx,
1770 result_->txns.exists(txID),
1771 std::max(prevProposers_, currPeerPositions_.size()),
1772 j_};
1773
1774 // Update all of the available peer's votes on the disputed transaction
1775 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1776 {
1777 Proposal_t const& peerProp = peerPos.proposal();
1778 auto const cit = acquired_.find(peerProp.position());
1779 if (cit != acquired_.end() && dtx.setVote(nodeId, cit->second.exists(txID)))
1780 peerUnchangedCounter_ = 0;
1781 }
1782 adaptor_.share(dtx.tx());
1783
1784 result_->disputes.emplace(txID, std::move(dtx));
1785 }
1786 JLOG(j_.debug()) << dc << " differences found";
1787 CLOG(clog) << "disputes: " << dc << ". ";
1788}
1789
1790template <class Adaptor>
1791void
1793{
1794 // Cannot updateDisputes without our stance
1795 XRPL_ASSERT(result_, "xrpl::Consensus::updateDisputes : result is set");
1796
1797 // Ensure we have created disputes against this set if we haven't seen
1798 // it before
1799 if (result_->compares.find(other.id()) == result_->compares.end())
1800 createDisputes(other);
1801
1802 for (auto& it : result_->disputes)
1803 {
1804 auto& d = it.second;
1805 if (d.setVote(node, other.exists(d.tx().id())))
1806 peerUnchangedCounter_ = 0;
1807 }
1808}
1809
1810template <class Adaptor>
1813{
1814 return roundCloseTime(raw, closeResolution_);
1815}
1816
1817} // namespace xrpl
T all_of(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
Json::Int Int
Definition json_value.h:138
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Position_t const & position() const
Get the proposed position.
NodeID_t const & nodeID() const
Identifying which peer took this position.
Measures the duration of phases of consensus.
ConsensusMode get() const
Definition Consensus.h:299
MonitoredMode(ConsensusMode m)
Definition Consensus.h:295
void set(ConsensusMode mode, Adaptor &a)
Definition Consensus.h:305
Generic implementation of consensus algorithm.
Definition Consensus.h:278
typename Adaptor::TxSet_t TxSet_t
Definition Consensus.h:280
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
Definition Consensus.h:727
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
Definition Consensus.h:1792
typename Adaptor::Ledger_t Ledger_t
Definition Consensus.h:279
bool haveCloseTimeConsensus_
Definition Consensus.h:541
MonitoredMode mode_
Definition Consensus.h:539
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
Definition Consensus.h:596
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
Definition Consensus.h:918
NetClock::duration closeResolution_
Definition Consensus.h:552
ConsensusTimer openTime_
Definition Consensus.h:550
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
Definition Consensus.h:899
typename TxSet_t::Tx Tx_t
Definition Consensus.h:282
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
Definition Consensus.h:854
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
Definition Consensus.h:1181
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
Definition Consensus.h:1298
hash_set< NodeID_t > deadNodes_
Definition Consensus.h:602
std::size_t peerUnchangedCounter_
Definition Consensus.h:583
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1707
NetClock::time_point prevCloseTime_
Definition Consensus.h:565
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
Definition Consensus.h:406
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
Definition Consensus.h:592
std::chrono::milliseconds prevRoundTime_
Definition Consensus.h:557
typename Adaptor::PeerPosition_t PeerPosition_t
Definition Consensus.h:283
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
Definition Consensus.h:617
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1013
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
Definition Consensus.h:1067
Consensus(Consensus &&) noexcept=default
NetClock::time_point asCloseTime(NetClock::time_point raw) const
Definition Consensus.h:1812
ConsensusParms::AvalancheState closeTimeAvalancheState_
Definition Consensus.h:554
ConsensusPhase phase() const
Definition Consensus.h:412
ConsensusPhase phase_
Definition Consensus.h:538
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
Definition Consensus.h:1096
Ledger_t previousLedger_
Definition Consensus.h:573
beast::Journal const j_
Definition Consensus.h:605
Ledger_t::ID prevLedgerID_
Definition Consensus.h:571
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
Definition Consensus.h:576
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
Definition Consensus.h:1113
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1362
std::size_t prevProposers_
Definition Consensus.h:599
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:662
Adaptor & adaptor_
Definition Consensus.h:536
std::size_t establishCounter_
Definition Consensus.h:586
NetClock::time_point now_
Definition Consensus.h:564
clock_type const & clock_
Definition Consensus.h:543
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
Definition Consensus.h:1725
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
Definition Consensus.h:708
typename Adaptor::NodeID_t NodeID_t
Definition Consensus.h:281
std::optional< Result > result_
Definition Consensus.h:578
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
Definition Consensus.h:821
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1598
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1418
ConsensusCloseTimes rawCloseTimes_
Definition Consensus.h:579
A transaction discovered to be in dispute during consensus.
Definition DisputedTx.h:29
T emplace(T... args)
T is_same_v
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
Definition Consensus.h:1409
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Definition Consensus.cpp:8
T read(T... args)
T size(T... args)
T str(T... args)
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
Encapsulates the result of consensus.
T time_since_epoch(T... args)
T to_string(T... args)
T value_or(T... args)