rippled
Loading...
Searching...
No Matches
Consensus.h
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012-2017 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#ifndef RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED
21#define RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED
22
23#include <xrpld/consensus/ConsensusParms.h>
24#include <xrpld/consensus/ConsensusProposal.h>
25#include <xrpld/consensus/ConsensusTypes.h>
26#include <xrpld/consensus/DisputedTx.h>
27#include <xrpld/consensus/LedgerTiming.h>
28
29#include <xrpl/basics/Log.h>
30#include <xrpl/basics/chrono.h>
31#include <xrpl/beast/utility/Journal.h>
32#include <xrpl/json/json_writer.h>
33
34#include <algorithm>
35#include <chrono>
36#include <deque>
37#include <optional>
38#include <sstream>
39
40namespace ripple {
41
61bool
63 bool anyTransactions,
64 std::size_t prevProposers,
65 std::size_t proposersClosed,
66 std::size_t proposersValidated,
67 std::chrono::milliseconds prevRoundTime,
68 std::chrono::milliseconds timeSincePrevClose,
70 std::chrono::milliseconds idleInterval,
71 ConsensusParms const& parms,
73 std::unique_ptr<std::stringstream> const& clog = {});
74
96 std::size_t prevProposers,
97 std::size_t currentProposers,
98 std::size_t currentAgree,
99 std::size_t currentFinished,
100 std::chrono::milliseconds previousAgreeTime,
101 std::chrono::milliseconds currentAgreeTime,
102 bool stalled,
103 ConsensusParms const& parms,
104 bool proposing,
106 std::unique_ptr<std::stringstream> const& clog = {});
107
296template <class Adaptor>
298{
299 using Ledger_t = typename Adaptor::Ledger_t;
300 using TxSet_t = typename Adaptor::TxSet_t;
301 using NodeID_t = typename Adaptor::NodeID_t;
302 using Tx_t = typename TxSet_t::Tx;
303 using PeerPosition_t = typename Adaptor::PeerPosition_t;
305 NodeID_t,
306 typename Ledger_t::ID,
307 typename TxSet_t::ID>;
308
310
311 // Helper class to ensure adaptor is notified whenever the ConsensusMode
312 // changes
314 {
316
317 public:
319 {
320 }
322 get() const
323 {
324 return mode_;
325 }
326
327 void
328 set(ConsensusMode mode, Adaptor& a)
329 {
330 a.onModeChange(mode_, mode);
331 mode_ = mode;
332 }
333 };
334
335public:
338
339 Consensus(Consensus&&) noexcept = default;
340
347 Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
348
364 void
366 NetClock::time_point const& now,
367 typename Ledger_t::ID const& prevLedgerID,
368 Ledger_t prevLedger,
369 hash_set<NodeID_t> const& nowUntrusted,
370 bool proposing,
371 std::unique_ptr<std::stringstream> const& clog = {});
372
379 bool
381 NetClock::time_point const& now,
382 PeerPosition_t const& newProposal);
383
389 void
391 NetClock::time_point const& now,
392 std::unique_ptr<std::stringstream> const& clog = {});
393
399 void
400 gotTxSet(NetClock::time_point const& now, TxSet_t const& txSet);
401
418 void
420 NetClock::time_point const& now,
422
430 typename Ledger_t::ID
432 {
433 return prevLedgerID_;
434 }
435
437 phase() const
438 {
439 return phase_;
440 }
441
450 getJson(bool full) const;
451
452private:
453 void
455 NetClock::time_point const& now,
456 typename Ledger_t::ID const& prevLedgerID,
457 Ledger_t const& prevLedger,
458 ConsensusMode mode,
460
461 // Change our view of the previous ledger
462 void
464 typename Ledger_t::ID const& lgrId,
466
472 void
474
478 void
480
483 bool
485 NetClock::time_point const& now,
486 PeerPosition_t const& newProposal);
487
494 void
496
505 void
507
530 bool
532
533 // Close the open ledger and establish initial position.
534 void
536
537 // Adjust our positions to try to agree with other validators.
538 void
540
541 bool
543
544 // Create disputes between our position and the provided one.
545 void
547 TxSet_t const& o,
548 std::unique_ptr<std::stringstream> const& clog = {});
549
550 // Update our disputes given that this node has adopted a new position.
551 // Will call createDisputes as needed.
552 void
553 updateDisputes(NodeID_t const& node, TxSet_t const& other);
554
555 // Revoke our outstanding proposal, if any, and cease proposing
556 // until this round ends.
557 void
559
560 // The rounded or effective close time estimate from a proposer
563
564private:
565 Adaptor& adaptor_;
566
569 bool firstRound_ = true;
571
573
574 // How long the consensus convergence has taken, expressed as
575 // a percentage of the time that we expected it to take.
577
578 // How long has this round been open
580
582
585
586 // Time it took for the last consensus round to converge
588
589 //-------------------------------------------------------------------------
590 // Network time measurements of consensus progress
591
592 // The current network adjusted time. This is the network time the
593 // ledger would close if it closed now
596
597 //-------------------------------------------------------------------------
598 // Non-peer (self) consensus data
599
600 // Last validated ledger ID provided to consensus
601 typename Ledger_t::ID prevLedgerID_;
602 // Last validated ledger seen by consensus
604
605 // Transaction Sets, indexed by hash of transaction tree
607
610
611 // The number of calls to phaseEstablish where none of our peers
612 // have changed any votes on disputed transactions.
614
615 // The total number of times we have called phaseEstablish
617
618 //-------------------------------------------------------------------------
619 // Peer related consensus data
620
621 // Peer proposed positions for the current round
623
624 // Recently received peer positions, available when transitioning between
625 // ledgers or rounds
627
628 // The number of proposers who participated in the last consensus round
630
631 // nodes that have bowed out of this consensus process
633
634 // Journal for debugging
636};
637
638template <class Adaptor>
640 clock_type const& clock,
641 Adaptor& adaptor,
642 beast::Journal journal)
643 : adaptor_(adaptor), clock_(clock), j_{journal}
644{
645 JLOG(j_.debug()) << "Creating consensus object";
646}
647
648template <class Adaptor>
649void
651 NetClock::time_point const& now,
652 typename Ledger_t::ID const& prevLedgerID,
653 Ledger_t prevLedger,
654 hash_set<NodeID_t> const& nowUntrusted,
655 bool proposing,
657{
658 if (firstRound_)
659 {
660 // take our initial view of closeTime_ from the seed ledger
661 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
662 prevCloseTime_ = prevLedger.closeTime();
663 firstRound_ = false;
664 }
665 else
666 {
667 prevCloseTime_ = rawCloseTimes_.self;
668 }
669
670 for (NodeID_t const& n : nowUntrusted)
671 recentPeerPositions_.erase(n);
672
673 ConsensusMode startMode =
675
676 // We were handed the wrong ledger
677 if (prevLedger.id() != prevLedgerID)
678 {
679 // try to acquire the correct one
680 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID))
681 {
682 prevLedger = *newLedger;
683 }
684 else // Unable to acquire the correct ledger
685 {
686 startMode = ConsensusMode::wrongLedger;
687 JLOG(j_.info())
688 << "Entering consensus with: " << previousLedger_.id();
689 JLOG(j_.info()) << "Correct LCL is: " << prevLedgerID;
690 }
691 }
692
693 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
694}
695template <class Adaptor>
696void
698 NetClock::time_point const& now,
699 typename Ledger_t::ID const& prevLedgerID,
700 Ledger_t const& prevLedger,
701 ConsensusMode mode,
703{
704 phase_ = ConsensusPhase::open;
705 JLOG(j_.debug()) << "transitioned to ConsensusPhase::open ";
706 CLOG(clog) << "startRoundInternal transitioned to ConsensusPhase::open, "
707 "previous ledgerID: "
708 << prevLedgerID << ", seq: " << prevLedger.seq() << ". ";
709 mode_.set(mode, adaptor_);
710 now_ = now;
711 prevLedgerID_ = prevLedgerID;
712 previousLedger_ = prevLedger;
713 result_.reset();
714 convergePercent_ = 0;
715 closeTimeAvalancheState_ = ConsensusParms::init;
716 haveCloseTimeConsensus_ = false;
717 openTime_.reset(clock_.now());
718 currPeerPositions_.clear();
719 acquired_.clear();
720 rawCloseTimes_.peers.clear();
721 rawCloseTimes_.self = {};
722 deadNodes_.clear();
723
724 closeResolution_ = getNextLedgerTimeResolution(
725 previousLedger_.closeTimeResolution(),
726 previousLedger_.closeAgree(),
727 previousLedger_.seq() + typename Ledger_t::Seq{1});
728
729 playbackProposals();
730 CLOG(clog) << "number of peer proposals,previous proposers: "
731 << currPeerPositions_.size() << ',' << prevProposers_ << ". ";
732 if (currPeerPositions_.size() > (prevProposers_ / 2))
733 {
734 // We may be falling behind, don't wait for the timer
735 // consider closing the ledger immediately
736 CLOG(clog) << "consider closing the ledger immediately. ";
737 timerEntry(now_, clog);
738 }
739}
740
741template <class Adaptor>
742bool
744 NetClock::time_point const& now,
745 PeerPosition_t const& newPeerPos)
746{
747 JLOG(j_.debug()) << "PROPOSAL " << newPeerPos.render();
748 auto const& peerID = newPeerPos.proposal().nodeID();
749
750 // Always need to store recent positions
751 {
752 auto& props = recentPeerPositions_[peerID];
753
754 if (props.size() >= 10)
755 props.pop_front();
756
757 props.push_back(newPeerPos);
758 }
759 return peerProposalInternal(now, newPeerPos);
760}
761
762template <class Adaptor>
763bool
765 NetClock::time_point const& now,
766 PeerPosition_t const& newPeerPos)
767{
768 // Nothing to do for now if we are currently working on a ledger
769 if (phase_ == ConsensusPhase::accepted)
770 return false;
771
772 now_ = now;
773
774 auto const& newPeerProp = newPeerPos.proposal();
775
776 if (newPeerProp.prevLedger() != prevLedgerID_)
777 {
778 JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger()
779 << " but we are on " << prevLedgerID_;
780 return false;
781 }
782
783 auto const& peerID = newPeerProp.nodeID();
784
785 if (deadNodes_.find(peerID) != deadNodes_.end())
786 {
787 JLOG(j_.info()) << "Position from dead node: " << peerID;
788 return false;
789 }
790
791 {
792 // update current position
793 auto peerPosIt = currPeerPositions_.find(peerID);
794
795 if (peerPosIt != currPeerPositions_.end())
796 {
797 if (newPeerProp.proposeSeq() <=
798 peerPosIt->second.proposal().proposeSeq())
799 {
800 return false;
801 }
802 }
803
804 if (newPeerProp.isBowOut())
805 {
806 JLOG(j_.info()) << "Peer " << peerID << " bows out";
807 if (result_)
808 {
809 for (auto& it : result_->disputes)
810 it.second.unVote(peerID);
811 }
812 if (peerPosIt != currPeerPositions_.end())
813 currPeerPositions_.erase(peerID);
814 deadNodes_.insert(peerID);
815
816 return true;
817 }
818
819 if (peerPosIt != currPeerPositions_.end())
820 peerPosIt->second = newPeerPos;
821 else
822 currPeerPositions_.emplace(peerID, newPeerPos);
823 }
824
825 if (newPeerProp.isInitial())
826 {
827 // Record the close time estimate
828 JLOG(j_.trace()) << "Peer reports close time as "
829 << newPeerProp.closeTime().time_since_epoch().count();
830 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
831 }
832
833 JLOG(j_.trace()) << "Processing peer proposal " << newPeerProp.proposeSeq()
834 << "/" << newPeerProp.position();
835
836 {
837 auto const ait = acquired_.find(newPeerProp.position());
838 if (ait == acquired_.end())
839 {
840 // acquireTxSet will return the set if it is available, or
841 // spawn a request for it and return nullopt/nullptr. It will call
842 // gotTxSet once it arrives
843 if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
844 gotTxSet(now_, *set);
845 else
846 JLOG(j_.debug()) << "Don't have tx set for peer";
847 }
848 else if (result_)
849 {
850 updateDisputes(newPeerProp.nodeID(), ait->second);
851 }
852 }
853
854 return true;
855}
856
857template <class Adaptor>
858void
860 NetClock::time_point const& now,
862{
863 CLOG(clog) << "Consensus<Adaptor>::timerEntry. ";
864 // Nothing to do if we are currently working on a ledger
865 if (phase_ == ConsensusPhase::accepted)
866 {
867 CLOG(clog) << "Nothing to do during accepted phase. ";
868 return;
869 }
870
871 now_ = now;
872 CLOG(clog) << "Set network adjusted time to " << to_string(now) << ". ";
873
874 // Check we are on the proper ledger (this may change phase_)
875 auto const phaseOrig = phase_;
876 CLOG(clog) << "Phase " << to_string(phaseOrig) << ". ";
877 checkLedger(clog);
878 if (phaseOrig != phase_)
879 {
880 CLOG(clog) << "Changed phase to << " << to_string(phase_) << ". ";
881 }
882
883 if (phase_ == ConsensusPhase::open)
884 phaseOpen(clog);
885 else if (phase_ == ConsensusPhase::establish)
886 phaseEstablish(clog);
887 CLOG(clog) << "timerEntry finishing in phase " << to_string(phase_) << ". ";
888}
889
890template <class Adaptor>
891void
893 NetClock::time_point const& now,
894 TxSet_t const& txSet)
895{
896 // Nothing to do if we've finished work on a ledger
897 if (phase_ == ConsensusPhase::accepted)
898 return;
899
900 now_ = now;
901
902 auto id = txSet.id();
903
904 // If we've already processed this transaction set since requesting
905 // it from the network, there is nothing to do now
906 if (!acquired_.emplace(id, txSet).second)
907 return;
908
909 if (!result_)
910 {
911 JLOG(j_.debug()) << "Not creating disputes: no position yet.";
912 }
913 else
914 {
915 // Our position is added to acquired_ as soon as we create it,
916 // so this txSet must differ
917 XRPL_ASSERT(
918 id != result_->position.position(),
919 "ripple::Consensus::gotTxSet : updated transaction set");
920 bool any = false;
921 for (auto const& [nodeId, peerPos] : currPeerPositions_)
922 {
923 if (peerPos.proposal().position() == id)
924 {
925 updateDisputes(nodeId, txSet);
926 any = true;
927 }
928 }
929
930 if (!any)
931 {
932 JLOG(j_.warn())
933 << "By the time we got " << id << " no peers were proposing it";
934 }
935 }
936}
937
938template <class Adaptor>
939void
941 NetClock::time_point const& now,
943{
944 using namespace std::chrono_literals;
945 JLOG(j_.info()) << "Simulating consensus";
946 now_ = now;
947 closeLedger({});
948 result_->roundTime.tick(consensusDelay.value_or(100ms));
949 result_->proposers = prevProposers_ = currPeerPositions_.size();
950 prevRoundTime_ = result_->roundTime.read();
952 adaptor_.onForceAccept(
953 *result_,
954 previousLedger_,
955 closeResolution_,
956 rawCloseTimes_,
957 mode_.get(),
958 getJson(true));
959 JLOG(j_.info()) << "Simulation complete";
960}
961
962template <class Adaptor>
965{
966 using std::to_string;
967 using Int = Json::Value::Int;
968
970
971 ret["proposing"] = (mode_.get() == ConsensusMode::proposing);
972 ret["proposers"] = static_cast<int>(currPeerPositions_.size());
973
974 if (mode_.get() != ConsensusMode::wrongLedger)
975 {
976 ret["synched"] = true;
977 ret["ledger_seq"] =
978 static_cast<std::uint32_t>(previousLedger_.seq()) + 1;
979 ret["close_granularity"] = static_cast<Int>(closeResolution_.count());
980 }
981 else
982 ret["synched"] = false;
983
984 ret["phase"] = to_string(phase_);
985
986 if (result_ && !result_->disputes.empty() && !full)
987 ret["disputes"] = static_cast<Int>(result_->disputes.size());
988
989 if (result_)
990 ret["our_position"] = result_->position.getJson();
991
992 if (full)
993 {
994 if (result_)
995 ret["current_ms"] =
996 static_cast<Int>(result_->roundTime.read().count());
997 ret["converge_percent"] = convergePercent_;
998 ret["close_resolution"] = static_cast<Int>(closeResolution_.count());
999 ret["have_time_consensus"] = haveCloseTimeConsensus_;
1000 ret["previous_proposers"] = static_cast<Int>(prevProposers_);
1001 ret["previous_mseconds"] = static_cast<Int>(prevRoundTime_.count());
1002
1003 if (!currPeerPositions_.empty())
1004 {
1006
1007 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1008 {
1009 ppj[to_string(nodeId)] = peerPos.getJson();
1010 }
1011 ret["peer_positions"] = std::move(ppj);
1012 }
1013
1014 if (!acquired_.empty())
1015 {
1017 for (auto const& at : acquired_)
1018 {
1019 acq.append(to_string(at.first));
1020 }
1021 ret["acquired"] = std::move(acq);
1022 }
1023
1024 if (result_ && !result_->disputes.empty())
1025 {
1027 for (auto const& [txId, dispute] : result_->disputes)
1028 {
1029 dsj[to_string(txId)] = dispute.getJson();
1030 }
1031 ret["disputes"] = std::move(dsj);
1032 }
1033
1034 if (!rawCloseTimes_.peers.empty())
1035 {
1037 for (auto const& ct : rawCloseTimes_.peers)
1038 {
1039 ctj[std::to_string(ct.first.time_since_epoch().count())] =
1040 ct.second;
1041 }
1042 ret["close_times"] = std::move(ctj);
1043 }
1044
1045 if (!deadNodes_.empty())
1046 {
1048 for (auto const& dn : deadNodes_)
1049 {
1050 dnj.append(to_string(dn));
1051 }
1052 ret["dead_nodes"] = std::move(dnj);
1053 }
1054 }
1055
1056 return ret;
1057}
1058
1059// Handle a change in the prior ledger during a consensus round
1060template <class Adaptor>
1061void
1063 typename Ledger_t::ID const& lgrId,
1065{
1066 CLOG(clog) << "handleWrongLedger. ";
1067 XRPL_ASSERT(
1068 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1069 "ripple::Consensus::handleWrongLedger : have wrong ledger");
1070
1071 // Stop proposing because we are out of sync
1072 leaveConsensus(clog);
1073
1074 // First time switching to this ledger
1075 if (prevLedgerID_ != lgrId)
1076 {
1077 prevLedgerID_ = lgrId;
1078
1079 // Clear out state
1080 if (result_)
1081 {
1082 result_->disputes.clear();
1083 result_->compares.clear();
1084 }
1085
1086 currPeerPositions_.clear();
1087 rawCloseTimes_.peers.clear();
1088 deadNodes_.clear();
1089
1090 // Get back in sync, this will also recreate disputes
1091 playbackProposals();
1092 }
1093
1094 if (previousLedger_.id() == prevLedgerID_)
1095 {
1096 CLOG(clog) << "previousLedger_.id() == prevLeverID_ " << prevLedgerID_
1097 << ". ";
1098 return;
1099 }
1100
1101 // we need to switch the ledger we're working from
1102 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1103 {
1104 JLOG(j_.info()) << "Have the consensus ledger " << prevLedgerID_;
1105 CLOG(clog) << "Have the consensus ledger " << prevLedgerID_ << ". ";
1106 startRoundInternal(
1107 now_, lgrId, *newLedger, ConsensusMode::switchedLedger, clog);
1108 }
1109 else
1110 {
1111 CLOG(clog) << "Still on wrong ledger. ";
1112 mode_.set(ConsensusMode::wrongLedger, adaptor_);
1113 }
1114}
1115
1116template <class Adaptor>
1117void
1119{
1120 CLOG(clog) << "checkLedger. ";
1121
1122 auto netLgr =
1123 adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1124 CLOG(clog) << "network ledgerid " << netLgr << ", "
1125 << "previous ledger " << prevLedgerID_ << ". ";
1126
1127 if (netLgr != prevLedgerID_)
1128 {
1130 ss << "View of consensus changed during " << to_string(phase_)
1131 << " mode=" << to_string(mode_.get()) << ", " << prevLedgerID_
1132 << " to " << netLgr << ", "
1133 << Json::Compact{previousLedger_.getJson()} << ". ";
1134 JLOG(j_.warn()) << ss.str();
1135 CLOG(clog) << ss.str();
1136 CLOG(clog) << "State on consensus change "
1137 << Json::Compact{getJson(true)} << ". ";
1138 handleWrongLedger(netLgr, clog);
1139 }
1140 else if (previousLedger_.id() != prevLedgerID_)
1141 {
1142 CLOG(clog) << "previousLedger_.id() != prevLedgerID_: "
1143 << previousLedger_.id() << ',' << to_string(prevLedgerID_)
1144 << ". ";
1145 handleWrongLedger(netLgr, clog);
1146 }
1147}
1148
1149template <class Adaptor>
1150void
1152{
1153 for (auto const& it : recentPeerPositions_)
1154 {
1155 for (auto const& pos : it.second)
1156 {
1157 if (pos.proposal().prevLedger() == prevLedgerID_)
1158 {
1159 if (peerProposalInternal(now_, pos))
1160 adaptor_.share(pos);
1161 }
1162 }
1163 }
1164}
1165
1166template <class Adaptor>
1167void
1169{
1170 CLOG(clog) << "phaseOpen. ";
1171 using namespace std::chrono;
1172
1173 // it is shortly before ledger close time
1174 bool anyTransactions = adaptor_.hasOpenTransactions();
1175 auto proposersClosed = currPeerPositions_.size();
1176 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1177
1178 openTime_.tick(clock_.now());
1179
1180 // This computes how long since last ledger's close time
1181 milliseconds sinceClose;
1182 {
1183 auto const mode = mode_.get();
1184 bool const closeAgree = previousLedger_.closeAgree();
1185 auto const prevCloseTime = previousLedger_.closeTime();
1186 auto const prevParentCloseTimePlus1 =
1187 previousLedger_.parentCloseTime() + 1s;
1188 bool const previousCloseCorrect =
1189 (mode != ConsensusMode::wrongLedger) && closeAgree &&
1190 (prevCloseTime != prevParentCloseTimePlus1);
1191
1192 auto const lastCloseTime = previousCloseCorrect
1193 ? prevCloseTime // use consensus timing
1194 : prevCloseTime_; // use the time we saw internally
1195
1196 if (now_ >= lastCloseTime)
1197 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1198 else
1199 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1200 CLOG(clog) << "calculating how long since last ledger's close time "
1201 "based on mode : "
1202 << to_string(mode) << ", previous closeAgree: " << closeAgree
1203 << ", previous close time: " << to_string(prevCloseTime)
1204 << ", previous parent close time + 1s: "
1205 << to_string(prevParentCloseTimePlus1)
1206 << ", previous close time seen internally: "
1207 << to_string(prevCloseTime_)
1208 << ", last close time: " << to_string(lastCloseTime)
1209 << ", since close: " << sinceClose.count() << ". ";
1210 }
1211
1212 auto const idleInterval = std::max<milliseconds>(
1213 adaptor_.parms().ledgerIDLE_INTERVAL,
1214 2 * previousLedger_.closeTimeResolution());
1215 CLOG(clog) << "idle interval set to " << idleInterval.count()
1216 << "ms based on "
1217 << "ledgerIDLE_INTERVAL: "
1218 << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1219 << ", previous ledger close time resolution: "
1220 << previousLedger_.closeTimeResolution().count() << "ms. ";
1221
1222 // Decide if we should close the ledger
1224 anyTransactions,
1225 prevProposers_,
1226 proposersClosed,
1227 proposersValidated,
1228 prevRoundTime_,
1229 sinceClose,
1230 openTime_.read(),
1231 idleInterval,
1232 adaptor_.parms(),
1233 j_,
1234 clog))
1235 {
1236 CLOG(clog) << "closing ledger. ";
1237 closeLedger(clog);
1238 }
1239}
1240
1241template <class Adaptor>
1242bool
1245{
1246 CLOG(clog) << "shouldPause? ";
1247 auto const& parms = adaptor_.parms();
1248 std::uint32_t const ahead(
1249 previousLedger_.seq() -
1250 std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1251 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1252 std::size_t const totalValidators = trustedKeys.size();
1253 std::size_t laggards =
1254 adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1255 std::size_t const offline = trustedKeys.size();
1256
1257 std::stringstream vars;
1258 vars << " consensuslog (working seq: " << previousLedger_.seq() << ", "
1259 << "validated seq: " << adaptor_.getValidLedgerIndex() << ", "
1260 << "am validator: " << adaptor_.validator() << ", "
1261 << "have validated: " << adaptor_.haveValidated() << ", "
1262 << "roundTime: " << result_->roundTime.read().count() << ", "
1263 << "max consensus time: " << parms.ledgerMAX_CONSENSUS.count() << ", "
1264 << "validators: " << totalValidators << ", "
1265 << "laggards: " << laggards << ", "
1266 << "offline: " << offline << ", "
1267 << "quorum: " << quorum << ")";
1268
1269 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
1270 !adaptor_.haveValidated() ||
1271 result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1272 {
1273 j_.debug() << "not pausing (early)" << vars.str();
1274 CLOG(clog) << "Not pausing (early). ";
1275 return false;
1276 }
1277
1278 bool willPause = false;
1279
1293 constexpr static std::size_t maxPausePhase = 4;
1294
1314 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1315
1316 // validators that remain after the laggards() function are considered
1317 // offline, and should be considered as laggards for purposes of
1318 // evaluating whether the threshold for non-laggards has been reached.
1319 switch (phase)
1320 {
1321 case 0:
1322 // Laggards and offline shouldn't preclude consensus.
1323 if (laggards + offline > totalValidators - quorum)
1324 willPause = true;
1325 break;
1326 case maxPausePhase:
1327 // No tolerance.
1328 willPause = true;
1329 break;
1330 default:
1331 // Ensure that sufficient validators are known to be not lagging.
1332 // Their sufficiently most recent validation sequence was equal to
1333 // or greater than our own.
1334 //
1335 // The threshold is the amount required for quorum plus
1336 // the proportion of the remainder based on number of intermediate
1337 // phases between 0 and max.
1338 float const nonLaggards = totalValidators - (laggards + offline);
1339 float const quorumRatio =
1340 static_cast<float>(quorum) / totalValidators;
1341 float const allowedDissent = 1.0f - quorumRatio;
1342 float const phaseFactor = static_cast<float>(phase) / maxPausePhase;
1343
1344 if (nonLaggards / totalValidators <
1345 quorumRatio + (allowedDissent * phaseFactor))
1346 {
1347 willPause = true;
1348 }
1349 }
1350
1351 if (willPause)
1352 {
1353 j_.warn() << "pausing" << vars.str();
1354 CLOG(clog) << "pausing " << vars.str() << ". ";
1355 }
1356 else
1357 {
1358 j_.debug() << "not pausing" << vars.str();
1359 CLOG(clog) << "not pausing. ";
1360 }
1361 return willPause;
1362}
1363
1364template <class Adaptor>
1365void
1368{
1369 CLOG(clog) << "phaseEstablish. ";
1370 // can only establish consensus if we already took a stance
1371 XRPL_ASSERT(result_, "ripple::Consensus::phaseEstablish : result is set");
1372
1373 ++peerUnchangedCounter_;
1374 ++establishCounter_;
1375
1376 using namespace std::chrono;
1377 ConsensusParms const& parms = adaptor_.parms();
1378
1379 result_->roundTime.tick(clock_.now());
1380 result_->proposers = currPeerPositions_.size();
1381
1382 convergePercent_ = result_->roundTime.read() * 100 /
1383 std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
1384 CLOG(clog) << "convergePercent_ " << convergePercent_
1385 << " is based on round duration so far: "
1386 << result_->roundTime.read().count() << "ms, "
1387 << "previous round duration: " << prevRoundTime_.count()
1388 << "ms, "
1389 << "avMIN_CONSENSUS_TIME: " << parms.avMIN_CONSENSUS_TIME.count()
1390 << "ms. ";
1391
1392 // Give everyone a chance to take an initial position
1393 if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
1394 {
1395 CLOG(clog) << "ledgerMIN_CONSENSUS not reached: "
1396 << parms.ledgerMIN_CONSENSUS.count() << "ms. ";
1397 return;
1398 }
1399
1400 updateOurPositions(clog);
1401
1402 // Nothing to do if too many laggards or we don't have consensus.
1403 if (shouldPause(clog) || !haveConsensus(clog))
1404 return;
1405
1406 if (!haveCloseTimeConsensus_)
1407 {
1408 JLOG(j_.info()) << "We have TX consensus but not CT consensus";
1409 CLOG(clog) << "We have TX consensus but not CT consensus. ";
1410 return;
1411 }
1412
1413 JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size()
1414 << " participants)";
1415 CLOG(clog) << "Converge cutoff (" << currPeerPositions_.size()
1416 << " participants). Transitioned to ConsensusPhase::accepted. ";
1417 adaptor_.updateOperatingMode(currPeerPositions_.size());
1418 prevProposers_ = currPeerPositions_.size();
1419 prevRoundTime_ = result_->roundTime.read();
1420 phase_ = ConsensusPhase::accepted;
1421 JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
1422 adaptor_.onAccept(
1423 *result_,
1424 previousLedger_,
1425 closeResolution_,
1426 rawCloseTimes_,
1427 mode_.get(),
1428 getJson(true),
1429 adaptor_.validating());
1430}
1431
1432template <class Adaptor>
1433void
1435{
1436 // We should not be closing if we already have a position
1437 XRPL_ASSERT(!result_, "ripple::Consensus::closeLedger : result is not set");
1438
1440 JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
1441 rawCloseTimes_.self = now_;
1442 peerUnchangedCounter_ = 0;
1443 establishCounter_ = 0;
1444
1445 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1446 result_->roundTime.reset(clock_.now());
1447 // Share the newly created transaction set if we haven't already
1448 // received it from a peer
1449 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1450 adaptor_.share(result_->txns);
1451
1452 auto const mode = mode_.get();
1453 CLOG(clog)
1454 << "closeLedger transitioned to ConsensusPhase::establish, mode: "
1455 << to_string(mode)
1456 << ", number of peer positions: " << currPeerPositions_.size() << ". ";
1457 if (mode == ConsensusMode::proposing)
1458 adaptor_.propose(result_->position);
1459
1460 // Create disputes with any peer positions we have transactions for
1461 for (auto const& pit : currPeerPositions_)
1462 {
1463 auto const& pos = pit.second.proposal().position();
1464 auto const it = acquired_.find(pos);
1465 if (it != acquired_.end())
1466 createDisputes(it->second, clog);
1467 }
1468}
1469
1482inline int
1483participantsNeeded(int participants, int percent)
1484{
1485 int result = ((participants * percent) + (percent / 2)) / 100;
1486
1487 return (result == 0) ? 1 : result;
1488}
1489
1490template <class Adaptor>
1491void
1494{
1495 // We must have a position if we are updating it
1496 XRPL_ASSERT(
1497 result_, "ripple::Consensus::updateOurPositions : result is set");
1498 ConsensusParms const& parms = adaptor_.parms();
1499
1500 // Compute a cutoff time
1501 auto const peerCutoff = now_ - parms.proposeFRESHNESS;
1502 auto const ourCutoff = now_ - parms.proposeINTERVAL;
1503 CLOG(clog) << "updateOurPositions. peerCutoff " << to_string(peerCutoff)
1504 << ", ourCutoff " << to_string(ourCutoff) << ". ";
1505
1506 // Verify freshness of peer positions and compute close times
1508 {
1509 auto it = currPeerPositions_.begin();
1510 while (it != currPeerPositions_.end())
1511 {
1512 Proposal_t const& peerProp = it->second.proposal();
1513 if (peerProp.isStale(peerCutoff))
1514 {
1515 // peer's proposal is stale, so remove it
1516 NodeID_t const& peerID = peerProp.nodeID();
1517 JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
1518 for (auto& dt : result_->disputes)
1519 dt.second.unVote(peerID);
1520 it = currPeerPositions_.erase(it);
1521 }
1522 else
1523 {
1524 // proposal is still fresh
1525 ++closeTimeVotes[asCloseTime(peerProp.closeTime())];
1526 ++it;
1527 }
1528 }
1529 }
1530
1531 // This will stay unseated unless there are any changes
1532 std::optional<TxSet_t> ourNewSet;
1533
1534 // Update votes on disputed transactions
1535 {
1537 for (auto& [txId, dispute] : result_->disputes)
1538 {
1539 // Because the threshold for inclusion increases,
1540 // time can change our position on a dispute
1541 if (dispute.updateVote(
1542 convergePercent_,
1543 mode_.get() == ConsensusMode::proposing,
1544 parms))
1545 {
1546 if (!mutableSet)
1547 mutableSet.emplace(result_->txns);
1548
1549 if (dispute.getOurVote())
1550 {
1551 // now a yes
1552 mutableSet->insert(dispute.tx());
1553 }
1554 else
1555 {
1556 // now a no
1557 mutableSet->erase(txId);
1558 }
1559 }
1560 }
1561
1562 if (mutableSet)
1563 ourNewSet.emplace(std::move(*mutableSet));
1564 }
1565
1566 NetClock::time_point consensusCloseTime = {};
1567 haveCloseTimeConsensus_ = false;
1568
1569 if (currPeerPositions_.empty())
1570 {
1571 // no other times
1572 haveCloseTimeConsensus_ = true;
1573 consensusCloseTime = asCloseTime(result_->position.closeTime());
1574 }
1575 else
1576 {
1577 // We don't track rounds for close time, so just pass 0s
1578 auto const [neededWeight, newState] = getNeededWeight(
1579 parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
1580 if (newState)
1581 closeTimeAvalancheState_ = *newState;
1582 CLOG(clog) << "neededWeight " << neededWeight << ". ";
1583
1584 int participants = currPeerPositions_.size();
1585 if (mode_.get() == ConsensusMode::proposing)
1586 {
1587 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1588 ++participants;
1589 }
1590
1591 // Threshold for non-zero vote
1592 int threshVote = participantsNeeded(participants, neededWeight);
1593
1594 // Threshold to declare consensus
1595 int const threshConsensus =
1596 participantsNeeded(participants, parms.avCT_CONSENSUS_PCT);
1597
1599 ss << "Proposers:" << currPeerPositions_.size()
1600 << " nw:" << neededWeight << " thrV:" << threshVote
1601 << " thrC:" << threshConsensus;
1602 JLOG(j_.info()) << ss.str();
1603 CLOG(clog) << ss.str();
1604
1605 for (auto const& [t, v] : closeTimeVotes)
1606 {
1607 JLOG(j_.debug())
1608 << "CCTime: seq "
1609 << static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": "
1610 << t.time_since_epoch().count() << " has " << v << ", "
1611 << threshVote << " required";
1612
1613 if (v >= threshVote)
1614 {
1615 // A close time has enough votes for us to try to agree
1616 consensusCloseTime = t;
1617 threshVote = v;
1618
1619 if (threshVote >= threshConsensus)
1620 haveCloseTimeConsensus_ = true;
1621 }
1622 }
1623
1624 if (!haveCloseTimeConsensus_)
1625 {
1626 JLOG(j_.debug())
1627 << "No CT consensus:"
1628 << " Proposers:" << currPeerPositions_.size()
1629 << " Mode:" << to_string(mode_.get())
1630 << " Thresh:" << threshConsensus
1631 << " Pos:" << consensusCloseTime.time_since_epoch().count();
1632 CLOG(clog) << "No close time consensus. ";
1633 }
1634 }
1635
1636 if (!ourNewSet &&
1637 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1638 result_->position.isStale(ourCutoff)))
1639 {
1640 // close time changed or our position is stale
1641 ourNewSet.emplace(result_->txns);
1642 }
1643
1644 if (ourNewSet)
1645 {
1646 auto newID = ourNewSet->id();
1647
1648 result_->txns = std::move(*ourNewSet);
1649
1651 ss << "Position change: CTime "
1652 << consensusCloseTime.time_since_epoch().count() << ", tx " << newID;
1653 JLOG(j_.info()) << ss.str();
1654 CLOG(clog) << ss.str();
1655
1656 result_->position.changePosition(newID, consensusCloseTime, now_);
1657
1658 // Share our new transaction set and update disputes
1659 // if we haven't already received it
1660 if (acquired_.emplace(newID, result_->txns).second)
1661 {
1662 if (!result_->position.isBowOut())
1663 adaptor_.share(result_->txns);
1664
1665 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1666 {
1667 Proposal_t const& p = peerPos.proposal();
1668 if (p.position() == newID)
1669 updateDisputes(nodeId, result_->txns);
1670 }
1671 }
1672
1673 // Share our new position if we are still participating this round
1674 if (!result_->position.isBowOut() &&
1675 (mode_.get() == ConsensusMode::proposing))
1676 adaptor_.propose(result_->position);
1677 }
1678}
1679
1680template <class Adaptor>
1681bool
1684{
1685 // Must have a stance if we are checking for consensus
1686 XRPL_ASSERT(result_, "ripple::Consensus::haveConsensus : has result");
1687
1688 // CHECKME: should possibly count unacquired TX sets as disagreeing
1689 int agree = 0, disagree = 0;
1690
1691 auto ourPosition = result_->position.position();
1692
1693 // Count number of agreements/disagreements with our position
1694 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1695 {
1696 Proposal_t const& peerProp = peerPos.proposal();
1697 if (peerProp.position() == ourPosition)
1698 {
1699 ++agree;
1700 }
1701 else
1702 {
1703 JLOG(j_.debug()) << "Proposal disagreement: Peer " << nodeId
1704 << " has " << peerProp.position();
1705 ++disagree;
1706 }
1707 }
1708 auto currentFinished =
1709 adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1710
1711 JLOG(j_.debug()) << "Checking for TX consensus: agree=" << agree
1712 << ", disagree=" << disagree;
1713
1714 ConsensusParms const& parms = adaptor_.parms();
1715 // Stalling is BAD. It means that we have a consensus on the close time, so
1716 // peers are talking, but we have disputed transactions that peers are
1717 // unable or unwilling to come to agreement on one way or the other.
1718 bool const stalled = haveCloseTimeConsensus_ &&
1719 !result_->disputes.empty() &&
1720 std::ranges::all_of(result_->disputes,
1721 [this, &parms, &clog](auto const& dispute) {
1722 return dispute.second.stalled(
1723 parms,
1724 mode_.get() == ConsensusMode::proposing,
1725 peerUnchangedCounter_,
1726 j_,
1727 clog);
1728 });
1729 if (stalled)
1730 {
1732 ss << "Consensus detects as stalled with " << (agree + disagree) << "/"
1733 << prevProposers_ << " proposers, and " << result_->disputes.size()
1734 << " stalled disputed transactions.";
1735 JLOG(j_.error()) << ss.str();
1736 CLOG(clog) << ss.str();
1737 }
1738
1739 // Determine if we actually have consensus or not
1740 result_->state = checkConsensus(
1741 prevProposers_,
1742 agree + disagree,
1743 agree,
1744 currentFinished,
1745 prevRoundTime_,
1746 result_->roundTime.read(),
1747 stalled,
1748 parms,
1749 mode_.get() == ConsensusMode::proposing,
1750 j_,
1751 clog);
1752
1753 if (result_->state == ConsensusState::No)
1754 {
1755 CLOG(clog) << "No consensus. ";
1756 return false;
1757 }
1758
1759 // Consensus has taken far too long. Drop out of the round.
1760 if (result_->state == ConsensusState::Expired)
1761 {
1762 static auto const minimumCounter =
1763 parms.avalancheCutoffs.size() * parms.avMIN_ROUNDS;
1765 if (establishCounter_ < minimumCounter)
1766 {
1767 // If each round of phaseEstablish takes a very long time, we may
1768 // "expire" before we've given consensus enough time at each
1769 // avalanche level to actually come to a consensus. In that case,
1770 // keep trying. This should only happen if there are an extremely
1771 // large number of disputes such that each round takes an inordinate
1772 // amount of time.
1773
1774 ss << "Consensus time has expired in round " << establishCounter_
1775 << "; continue until round " << minimumCounter << ". "
1776 << Json::Compact{getJson(false)};
1777 JLOG(j_.error()) << ss.str();
1778 CLOG(clog) << ss.str() << ". ";
1779 return false;
1780 }
1781 ss << "Consensus expired. " << Json::Compact{getJson(true)};
1782 JLOG(j_.error()) << ss.str();
1783 CLOG(clog) << ss.str() << ". ";
1784 leaveConsensus(clog);
1785 }
1786 // There is consensus, but we need to track if the network moved on
1787 // without us.
1788 if (result_->state == ConsensusState::MovedOn)
1789 {
1790 JLOG(j_.error()) << "Unable to reach consensus";
1791 JLOG(j_.error()) << Json::Compact{getJson(true)};
1792 CLOG(clog) << "Unable to reach consensus "
1793 << Json::Compact{getJson(true)} << ". ";
1794 }
1795
1796 CLOG(clog) << "Consensus has been reached. ";
1797 return true;
1798}
1799
1800template <class Adaptor>
1801void
1804{
1805 if (mode_.get() == ConsensusMode::proposing)
1806 {
1807 if (result_ && !result_->position.isBowOut())
1808 {
1809 result_->position.bowOut(now_);
1810 adaptor_.propose(result_->position);
1811 }
1812
1813 mode_.set(ConsensusMode::observing, adaptor_);
1814 JLOG(j_.info()) << "Bowing out of consensus";
1815 CLOG(clog) << "Bowing out of consensus. ";
1816 }
1817}
1818
1819template <class Adaptor>
1820void
1822 TxSet_t const& o,
1824{
1825 // Cannot create disputes without our stance
1826 XRPL_ASSERT(result_, "ripple::Consensus::createDisputes : result is set");
1827
1828 // Only create disputes if this is a new set
1829 auto const emplaced = result_->compares.emplace(o.id()).second;
1830 CLOG(clog) << "createDisputes: new set? " << !emplaced << ". ";
1831 if (!emplaced)
1832 return;
1833
1834 // Nothing to dispute if we agree
1835 if (result_->txns.id() == o.id())
1836 {
1837 CLOG(clog) << "both sets are identical. ";
1838 return;
1839 }
1840
1841 CLOG(clog) << "comparing existing with new set: " << result_->txns.id()
1842 << ',' << o.id() << ". ";
1843 JLOG(j_.debug()) << "createDisputes " << result_->txns.id() << " to "
1844 << o.id();
1845
1846 auto differences = result_->txns.compare(o);
1847
1848 int dc = 0;
1849
1850 for (auto const& [txId, inThisSet] : differences)
1851 {
1852 ++dc;
1853 // create disputed transactions (from the ledger that has them)
1854 XRPL_ASSERT(
1855 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1856 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1857 "ripple::Consensus::createDisputes : has disputed transactions");
1858
1859 Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1860 auto txID = tx.id();
1861
1862 if (result_->disputes.find(txID) != result_->disputes.end())
1863 continue;
1864
1865 JLOG(j_.debug()) << "Transaction " << txID << " is disputed";
1866
1867 typename Result::Dispute_t dtx{
1868 tx,
1869 result_->txns.exists(txID),
1870 std::max(prevProposers_, currPeerPositions_.size()),
1871 j_};
1872
1873 // Update all of the available peer's votes on the disputed transaction
1874 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1875 {
1876 Proposal_t const& peerProp = peerPos.proposal();
1877 auto const cit = acquired_.find(peerProp.position());
1878 if (cit != acquired_.end() &&
1879 dtx.setVote(nodeId, cit->second.exists(txID)))
1880 peerUnchangedCounter_ = 0;
1881 }
1882 adaptor_.share(dtx.tx());
1883
1884 result_->disputes.emplace(txID, std::move(dtx));
1885 }
1886 JLOG(j_.debug()) << dc << " differences found";
1887 CLOG(clog) << "disputes: " << dc << ". ";
1888}
1889
1890template <class Adaptor>
1891void
1893{
1894 // Cannot updateDisputes without our stance
1895 XRPL_ASSERT(result_, "ripple::Consensus::updateDisputes : result is set");
1896
1897 // Ensure we have created disputes against this set if we haven't seen
1898 // it before
1899 if (result_->compares.find(other.id()) == result_->compares.end())
1900 createDisputes(other);
1901
1902 for (auto& it : result_->disputes)
1903 {
1904 auto& d = it.second;
1905 if (d.setVote(node, other.exists(d.tx().id())))
1906 peerUnchangedCounter_ = 0;
1907 }
1908}
1909
1910template <class Adaptor>
1913{
1914 return roundCloseTime(raw, closeResolution_);
1915}
1916
1917} // namespace ripple
1918
1919#endif
T all_of(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:149
Value & append(Value const &value)
Append value to array at the end.
Json::Int Int
Definition json_value.h:157
A generic endpoint for log messages.
Definition Journal.h:60
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
Stream warn() const
Definition Journal.h:340
NodeID_t const & nodeID() const
Identifying which peer took this position.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
Position_t const & position() const
Get the proposed position.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Measures the duration of phases of consensus.
void set(ConsensusMode mode, Adaptor &a)
Definition Consensus.h:328
ConsensusMode get() const
Definition Consensus.h:322
Generic implementation of consensus algorithm.
Definition Consensus.h:298
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
Definition Consensus.h:606
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
Definition Consensus.h:1151
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
Definition Consensus.h:859
ConsensusTimer openTime_
Definition Consensus.h:579
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:697
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
Definition Consensus.h:1366
typename Adaptor::PeerPosition_t PeerPosition_t
Definition Consensus.h:303
ConsensusPhase phase_
Definition Consensus.h:567
NetClock::time_point prevCloseTime_
Definition Consensus.h:595
clock_type const & clock_
Definition Consensus.h:572
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1802
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
Definition Consensus.h:1892
Ledger_t previousLedger_
Definition Consensus.h:603
typename Adaptor::TxSet_t TxSet_t
Definition Consensus.h:300
ConsensusParms::AvalancheState closeTimeAvalancheState_
Definition Consensus.h:583
std::size_t establishCounter_
Definition Consensus.h:616
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1492
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1682
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
Definition Consensus.h:431
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1062
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
Definition Consensus.h:1118
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
Definition Consensus.h:626
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
Definition Consensus.h:940
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
Definition Consensus.h:964
typename TxSet_t::Tx Tx_t
Definition Consensus.h:302
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
Definition Consensus.h:650
Consensus(Consensus &&) noexcept=default
NetClock::time_point now_
Definition Consensus.h:594
std::size_t prevProposers_
Definition Consensus.h:629
NetClock::time_point asCloseTime(NetClock::time_point raw) const
Definition Consensus.h:1912
beast::Journal const j_
Definition Consensus.h:635
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
Definition Consensus.h:1821
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
Definition Consensus.h:892
Adaptor & adaptor_
Definition Consensus.h:565
typename Adaptor::Ledger_t Ledger_t
Definition Consensus.h:299
ConsensusPhase phase() const
Definition Consensus.h:437
std::size_t peerUnchangedCounter_
Definition Consensus.h:613
typename Adaptor::NodeID_t NodeID_t
Definition Consensus.h:301
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1434
NetClock::duration closeResolution_
Definition Consensus.h:581
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
Definition Consensus.h:743
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
Definition Consensus.h:764
MonitoredMode mode_
Definition Consensus.h:568
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
Definition Consensus.h:622
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
Definition Consensus.h:1243
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
Definition Consensus.h:1168
ConsensusCloseTimes rawCloseTimes_
Definition Consensus.h:609
std::chrono::milliseconds prevRoundTime_
Definition Consensus.h:587
std::optional< Result > result_
Definition Consensus.h:608
hash_set< NodeID_t > deadNodes_
Definition Consensus.h:632
Ledger_t::ID prevLedgerID_
Definition Consensus.h:601
bool haveCloseTimeConsensus_
Definition Consensus.h:570
A transaction discovered to be in dispute during consensus.
Definition DisputedTx.h:49
T emplace(T... args)
T is_same_v
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:44
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Definition Consensus.cpp:27
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
Definition Consensus.h:1483
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
STL namespace.
T read(T... args)
T size(T... args)
T str(T... args)
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
Encapsulates the result of consensus.
T time_since_epoch(T... args)
T to_string(T... args)
T value_or(T... args)