3#include <xrpld/consensus/ConsensusParms.h>
4#include <xrpld/consensus/ConsensusProposal.h>
5#include <xrpld/consensus/ConsensusTypes.h>
6#include <xrpld/consensus/DisputedTx.h>
8#include <xrpl/basics/Log.h>
9#include <xrpl/basics/chrono.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/json/json_writer.h>
12#include <xrpl/ledger/LedgerTiming.h>
51 ConsensusParms
const& parms,
83 ConsensusParms
const& parms,
276template <
class Adaptor>
282 using Tx_t =
typename TxSet_t::Tx;
307 a.onModeChange(
mode_, mode);
348 std::unique_ptr<
std::stringstream> const& clog = {});
405 typename Ledger_t::ID
439 typename Ledger_t::ID
const& lgrId,
608template <
class Adaptor>
610 : adaptor_(adaptor), clock_(clock), j_{journal}
612 JLOG(
j_.
debug()) <<
"Creating consensus object";
615template <
class Adaptor>
619 typename Ledger_t::ID
const& prevLedgerID,
628 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
629 prevCloseTime_ = prevLedger.closeTime();
634 prevCloseTime_ = rawCloseTimes_.self;
637 for (
NodeID_t const& n : nowUntrusted)
638 recentPeerPositions_.erase(n);
643 if (prevLedger.id() != prevLedgerID)
646 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID))
648 prevLedger = *newLedger;
653 JLOG(j_.
info()) <<
"Entering consensus with: " << previousLedger_.id();
654 JLOG(j_.
info()) <<
"Correct LCL is: " << prevLedgerID;
658 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
660template <
class Adaptor>
664 typename Ledger_t::ID
const& prevLedgerID,
670 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::open ";
671 CLOG(clog) <<
"startRoundInternal transitioned to ConsensusPhase::open, "
672 "previous ledgerID: "
673 << prevLedgerID <<
", seq: " << prevLedger.seq() <<
". ";
674 mode_.set(mode, adaptor_);
676 prevLedgerID_ = prevLedgerID;
677 previousLedger_ = prevLedger;
679 convergePercent_ = 0;
681 haveCloseTimeConsensus_ =
false;
682 openTime_.reset(clock_.now());
683 currPeerPositions_.clear();
685 rawCloseTimes_.peers.clear();
686 rawCloseTimes_.self = {};
690 previousLedger_.closeTimeResolution(),
691 previousLedger_.closeAgree(),
692 previousLedger_.seq() +
typename Ledger_t::Seq{1});
695 CLOG(clog) <<
"number of peer proposals,previous proposers: " << currPeerPositions_.size()
696 <<
',' << prevProposers_ <<
". ";
697 if (currPeerPositions_.size() > (prevProposers_ / 2))
701 CLOG(clog) <<
"consider closing the ledger immediately. ";
702 timerEntry(now_, clog);
706template <
class Adaptor>
710 JLOG(j_.
debug()) <<
"PROPOSAL " << newPeerPos.render();
711 auto const& peerID = newPeerPos.proposal().nodeID();
715 auto& props = recentPeerPositions_[peerID];
717 if (props.size() >= 10)
720 props.push_back(newPeerPos);
722 return peerProposalInternal(now, newPeerPos);
725template <
class Adaptor>
737 auto const& newPeerProp = newPeerPos.proposal();
739 if (newPeerProp.prevLedger() != prevLedgerID_)
741 JLOG(j_.
debug()) <<
"Got proposal for " << newPeerProp.prevLedger() <<
" but we are on "
746 auto const& peerID = newPeerProp.nodeID();
748 if (deadNodes_.find(peerID) != deadNodes_.end())
750 JLOG(j_.
info()) <<
"Position from dead node: " << peerID;
756 auto peerPosIt = currPeerPositions_.find(peerID);
758 if (peerPosIt != currPeerPositions_.end())
760 if (newPeerProp.proposeSeq() <= peerPosIt->second.proposal().proposeSeq())
766 if (newPeerProp.isBowOut())
768 JLOG(j_.
info()) <<
"Peer " << peerID <<
" bows out";
771 for (
auto& it : result_->disputes)
772 it.second.unVote(peerID);
774 if (peerPosIt != currPeerPositions_.end())
775 currPeerPositions_.erase(peerID);
776 deadNodes_.insert(peerID);
781 if (peerPosIt != currPeerPositions_.end())
782 peerPosIt->second = newPeerPos;
784 currPeerPositions_.emplace(peerID, newPeerPos);
787 if (newPeerProp.isInitial())
790 JLOG(j_.
trace()) <<
"Peer reports close time as "
791 << newPeerProp.closeTime().time_since_epoch().count();
792 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
795 JLOG(j_.
trace()) <<
"Processing peer proposal " << newPeerProp.proposeSeq() <<
"/"
796 << newPeerProp.position();
799 auto const ait = acquired_.find(newPeerProp.position());
800 if (ait == acquired_.end())
805 if (
auto set = adaptor_.acquireTxSet(newPeerProp.position()))
806 gotTxSet(now_, *
set);
808 JLOG(j_.
debug()) <<
"Don't have tx set for peer";
812 updateDisputes(newPeerProp.nodeID(), ait->second);
819template <
class Adaptor>
825 CLOG(clog) <<
"Consensus<Adaptor>::timerEntry. ";
829 CLOG(clog) <<
"Nothing to do during accepted phase. ";
834 CLOG(clog) <<
"Set network adjusted time to " <<
to_string(now) <<
". ";
837 auto const phaseOrig = phase_;
838 CLOG(clog) <<
"Phase " <<
to_string(phaseOrig) <<
". ";
840 if (phaseOrig != phase_)
842 CLOG(clog) <<
"Changed phase to << " <<
to_string(phase_) <<
". ";
848 phaseEstablish(clog);
849 CLOG(clog) <<
"timerEntry finishing in phase " <<
to_string(phase_) <<
". ";
852template <
class Adaptor>
862 auto id = txSet.id();
866 if (!acquired_.emplace(
id, txSet).second)
871 JLOG(j_.
debug()) <<
"Not creating disputes: no position yet.";
878 id != result_->position.position(),
879 "xrpl::Consensus::gotTxSet : updated transaction set");
881 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
883 if (peerPos.proposal().position() == id)
885 updateDisputes(nodeId, txSet);
892 JLOG(j_.
warn()) <<
"By the time we got " <<
id <<
" no peers were proposing it";
897template <
class Adaptor>
903 using namespace std::chrono_literals;
904 JLOG(j_.
info()) <<
"Simulating consensus";
907 result_->roundTime.tick(consensusDelay.
value_or(100ms));
908 result_->proposers = prevProposers_ = currPeerPositions_.size();
909 prevRoundTime_ = result_->roundTime.read();
911 adaptor_.onForceAccept(
912 *result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_.get(),
getJson(
true));
913 JLOG(j_.
info()) <<
"Simulation complete";
916template <
class Adaptor>
926 ret[
"proposers"] =
static_cast<int>(currPeerPositions_.size());
930 ret[
"synched"] =
true;
931 ret[
"ledger_seq"] =
static_cast<std::uint32_t>(previousLedger_.seq()) + 1;
932 ret[
"close_granularity"] =
static_cast<Int
>(closeResolution_.count());
935 ret[
"synched"] =
false;
939 if (result_ && !result_->disputes.empty() && !
full)
940 ret[
"disputes"] =
static_cast<Int
>(result_->disputes.size());
943 ret[
"our_position"] = result_->position.getJson();
948 ret[
"current_ms"] =
static_cast<Int
>(result_->roundTime.read().count());
949 ret[
"converge_percent"] = convergePercent_;
950 ret[
"close_resolution"] =
static_cast<Int
>(closeResolution_.count());
951 ret[
"have_time_consensus"] = haveCloseTimeConsensus_;
952 ret[
"previous_proposers"] =
static_cast<Int
>(prevProposers_);
953 ret[
"previous_mseconds"] =
static_cast<Int
>(prevRoundTime_.count());
955 if (!currPeerPositions_.empty())
959 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
961 ppj[
to_string(nodeId)] = peerPos.getJson();
963 ret[
"peer_positions"] = std::move(ppj);
966 if (!acquired_.empty())
969 for (
auto const& at : acquired_)
973 ret[
"acquired"] = std::move(acq);
976 if (result_ && !result_->disputes.empty())
979 for (
auto const& [txId, dispute] : result_->disputes)
981 dsj[
to_string(txId)] = dispute.getJson();
983 ret[
"disputes"] = std::move(dsj);
986 if (!rawCloseTimes_.peers.empty())
989 for (
auto const& ct : rawCloseTimes_.peers)
991 ctj[
std::to_string(ct.first.time_since_epoch().count())] = ct.second;
993 ret[
"close_times"] = std::move(ctj);
996 if (!deadNodes_.empty())
999 for (
auto const& dn : deadNodes_)
1003 ret[
"dead_nodes"] = std::move(dnj);
1011template <
class Adaptor>
1014 typename Ledger_t::ID
const& lgrId,
1017 CLOG(clog) <<
"handleWrongLedger. ";
1019 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1020 "xrpl::Consensus::handleWrongLedger : have wrong ledger");
1023 leaveConsensus(clog);
1026 if (prevLedgerID_ != lgrId)
1028 prevLedgerID_ = lgrId;
1033 result_->disputes.clear();
1034 result_->compares.clear();
1037 currPeerPositions_.clear();
1038 rawCloseTimes_.peers.clear();
1042 playbackProposals();
1045 if (previousLedger_.id() == prevLedgerID_)
1047 CLOG(clog) <<
"previousLedger_.id() == prevLeverID_ " << prevLedgerID_ <<
". ";
1052 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1054 JLOG(j_.
info()) <<
"Have the consensus ledger " << prevLedgerID_;
1055 CLOG(clog) <<
"Have the consensus ledger " << prevLedgerID_ <<
". ";
1060 CLOG(clog) <<
"Still on wrong ledger. ";
1065template <
class Adaptor>
1069 CLOG(clog) <<
"checkLedger. ";
1071 auto netLgr = adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1072 CLOG(clog) <<
"network ledgerid " << netLgr <<
", "
1073 <<
"previous ledger " << prevLedgerID_ <<
". ";
1075 if (netLgr != prevLedgerID_)
1078 ss <<
"View of consensus changed during " <<
to_string(phase_)
1079 <<
" mode=" <<
to_string(mode_.get()) <<
", " << prevLedgerID_ <<
" to " << netLgr
1080 <<
", " <<
Json::Compact{previousLedger_.getJson()} <<
". ";
1082 CLOG(clog) << ss.
str();
1084 handleWrongLedger(netLgr, clog);
1086 else if (previousLedger_.id() != prevLedgerID_)
1088 CLOG(clog) <<
"previousLedger_.id() != prevLedgerID_: " << previousLedger_.id() <<
','
1090 handleWrongLedger(netLgr, clog);
1094template <
class Adaptor>
1098 for (
auto const& it : recentPeerPositions_)
1100 for (
auto const& pos : it.second)
1102 if (pos.proposal().prevLedger() == prevLedgerID_)
1104 if (peerProposalInternal(now_, pos))
1105 adaptor_.share(pos);
1111template <
class Adaptor>
1115 CLOG(clog) <<
"phaseOpen. ";
1119 bool const anyTransactions = adaptor_.hasOpenTransactions();
1120 auto proposersClosed = currPeerPositions_.size();
1121 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1123 openTime_.tick(clock_.now());
1128 auto const mode = mode_.get();
1129 bool const closeAgree = previousLedger_.closeAgree();
1130 auto const prevCloseTime = previousLedger_.closeTime();
1131 auto const prevParentCloseTimePlus1 = previousLedger_.parentCloseTime() + 1s;
1133 (prevCloseTime != prevParentCloseTimePlus1);
1135 auto const lastCloseTime = previousCloseCorrect
1139 if (now_ >= lastCloseTime)
1140 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1142 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1143 CLOG(
clog) <<
"calculating how long since last ledger's close time "
1145 <<
to_string(mode) <<
", previous closeAgree: " << closeAgree
1146 <<
", previous close time: " <<
to_string(prevCloseTime)
1147 <<
", previous parent close time + 1s: " <<
to_string(prevParentCloseTimePlus1)
1148 <<
", previous close time seen internally: " <<
to_string(prevCloseTime_)
1149 <<
", last close time: " <<
to_string(lastCloseTime)
1150 <<
", since close: " << sinceClose.
count() <<
". ";
1154 adaptor_.parms().ledgerIDLE_INTERVAL, 2 * previousLedger_.closeTimeResolution());
1155 CLOG(
clog) <<
"idle interval set to " << idleInterval.
count() <<
"ms based on "
1156 <<
"ledgerIDLE_INTERVAL: " << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1157 <<
", previous ledger close time resolution: "
1158 << previousLedger_.closeTimeResolution().count() <<
"ms. ";
1174 CLOG(
clog) <<
"closing ledger. ";
1179template <
class Adaptor>
1183 CLOG(
clog) <<
"shouldPause? ";
1184 auto const& parms = adaptor_.parms();
1186 previousLedger_.seq() -
std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1187 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1188 std::size_t const totalValidators = trustedKeys.size();
1189 std::size_t const laggards = adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1193 vars <<
" consensuslog (working seq: " << previousLedger_.seq() <<
", "
1194 <<
"validated seq: " << adaptor_.getValidLedgerIndex() <<
", "
1195 <<
"am validator: " << adaptor_.validator() <<
", "
1196 <<
"have validated: " << adaptor_.haveValidated() <<
", "
1197 <<
"roundTime: " << result_->roundTime.
read().count() <<
", "
1198 <<
"max consensus time: " << parms.ledgerMAX_CONSENSUS.count() <<
", "
1199 <<
"validators: " << totalValidators <<
", "
1200 <<
"laggards: " << laggards <<
", "
1201 <<
"offline: " << offline <<
", "
1202 <<
"quorum: " << quorum <<
")";
1204 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
1205 !adaptor_.haveValidated() || result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1207 j_.
debug() <<
"not pausing (early)" << vars.
str();
1208 CLOG(
clog) <<
"Not pausing (early). ";
1212 bool willPause =
false;
1248 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1257 if (laggards + offline > totalValidators - quorum)
1272 float const nonLaggards = totalValidators - (laggards + offline);
1273 float const quorumRatio =
static_cast<float>(quorum) / totalValidators;
1274 float const allowedDissent = 1.0f - quorumRatio;
1275 float const phaseFactor =
static_cast<float>(phase) / maxPausePhase;
1277 if (nonLaggards / totalValidators < quorumRatio + (allowedDissent * phaseFactor))
1285 j_.
warn() <<
"pausing" << vars.
str();
1286 CLOG(
clog) <<
"pausing " << vars.
str() <<
". ";
1290 j_.
debug() <<
"not pausing" << vars.
str();
1291 CLOG(
clog) <<
"not pausing. ";
1296template <
class Adaptor>
1300 CLOG(
clog) <<
"phaseEstablish. ";
1302 XRPL_ASSERT(result_,
"xrpl::Consensus::phaseEstablish : result is set");
1304 ++peerUnchangedCounter_;
1305 ++establishCounter_;
1310 result_->roundTime.tick(clock_.now());
1311 result_->proposers = currPeerPositions_.size();
1313 convergePercent_ = result_->roundTime.read() * 100 /
1315 CLOG(
clog) <<
"convergePercent_ " << convergePercent_
1316 <<
" is based on round duration so far: " << result_->roundTime.read().count()
1318 <<
"previous round duration: " << prevRoundTime_.count() <<
"ms, "
1329 updateOurPositions(
clog);
1332 if (shouldPause(
clog) || !haveConsensus(
clog))
1335 if (!haveCloseTimeConsensus_)
1337 JLOG(j_.
info()) <<
"We have TX consensus but not CT consensus";
1338 CLOG(
clog) <<
"We have TX consensus but not CT consensus. ";
1342 JLOG(j_.
info()) <<
"Converge cutoff (" << currPeerPositions_.size() <<
" participants)";
1343 CLOG(
clog) <<
"Converge cutoff (" << currPeerPositions_.size()
1344 <<
" participants). Transitioned to ConsensusPhase::accepted. ";
1345 adaptor_.updateOperatingMode(currPeerPositions_.size());
1346 prevProposers_ = currPeerPositions_.size();
1347 prevRoundTime_ = result_->roundTime.read();
1349 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::accepted";
1357 adaptor_.validating());
1360template <
class Adaptor>
1365 XRPL_ASSERT(!result_,
"xrpl::Consensus::closeLedger : result is not set");
1368 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::establish";
1369 rawCloseTimes_.self = now_;
1370 peerUnchangedCounter_ = 0;
1371 establishCounter_ = 0;
1373 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1374 result_->roundTime.reset(clock_.now());
1377 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1378 adaptor_.share(result_->txns);
1380 auto const mode = mode_.get();
1381 CLOG(
clog) <<
"closeLedger transitioned to ConsensusPhase::establish, mode: " <<
to_string(mode)
1382 <<
", number of peer positions: " << currPeerPositions_.
size() <<
". ";
1384 adaptor_.propose(result_->position);
1387 for (
auto const& pit : currPeerPositions_)
1389 auto const& pos = pit.second.proposal().position();
1390 auto const it = acquired_.find(pos);
1391 if (it != acquired_.end())
1392 createDisputes(it->second,
clog);
1411 int const result = ((participants * percent) + (percent / 2)) / 100;
1413 return (result == 0) ? 1 : result;
1416template <
class Adaptor>
1421 XRPL_ASSERT(result_,
"xrpl::Consensus::updateOurPositions : result is set");
1427 CLOG(
clog) <<
"updateOurPositions. peerCutoff " <<
to_string(peerCutoff) <<
", ourCutoff "
1433 auto it = currPeerPositions_.
begin();
1434 while (it != currPeerPositions_.end())
1436 Proposal_t const& peerProp = it->second.proposal();
1437 if (peerProp.
isStale(peerCutoff))
1441 JLOG(j_.
warn()) <<
"Removing stale proposal from " << peerID;
1442 for (
auto& dt : result_->disputes)
1443 dt.second.unVote(peerID);
1444 it = currPeerPositions_.erase(it);
1449 ++closeTimeVotes[asCloseTime(peerProp.
closeTime())];
1461 for (
auto& [txId, dispute] : result_->disputes)
1465 if (dispute.updateVote(
1469 mutableSet.
emplace(result_->txns);
1471 if (dispute.getOurVote())
1474 mutableSet->insert(dispute.tx());
1479 mutableSet->erase(txId);
1485 ourNewSet.
emplace(std::move(*mutableSet));
1489 haveCloseTimeConsensus_ =
false;
1491 if (currPeerPositions_.empty())
1494 haveCloseTimeConsensus_ =
true;
1495 consensusCloseTime = asCloseTime(result_->position.closeTime());
1500 auto const [neededWeight, newState] =
1501 getNeededWeight(parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
1503 closeTimeAvalancheState_ = *newState;
1504 CLOG(
clog) <<
"neededWeight " << neededWeight <<
". ";
1506 int participants = currPeerPositions_.size();
1509 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1520 ss <<
"Proposers:" << currPeerPositions_.size() <<
" nw:" << neededWeight
1521 <<
" thrV:" << threshVote <<
" thrC:" << threshConsensus;
1525 for (
auto const& [t, v] : closeTimeVotes)
1527 JLOG(j_.
debug()) <<
"CCTime: seq "
1528 <<
static_cast<std::uint32_t>(previousLedger_.seq()) + 1 <<
": "
1529 << t.time_since_epoch().count() <<
" has " << v <<
", " << threshVote
1532 if (v >= threshVote)
1535 consensusCloseTime = t;
1538 if (threshVote >= threshConsensus)
1539 haveCloseTimeConsensus_ =
true;
1543 if (!haveCloseTimeConsensus_)
1545 JLOG(j_.
debug()) <<
"No CT consensus:"
1546 <<
" Proposers:" << currPeerPositions_.size()
1547 <<
" Mode:" <<
to_string(mode_.get()) <<
" Thresh:" << threshConsensus
1549 CLOG(
clog) <<
"No close time consensus. ";
1554 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1555 result_->position.isStale(ourCutoff)))
1558 ourNewSet.
emplace(result_->txns);
1563 auto newID = ourNewSet->id();
1565 result_->txns = std::move(*ourNewSet);
1568 ss <<
"Position change: CTime " << consensusCloseTime.
time_since_epoch().count() <<
", tx "
1573 result_->position.changePosition(newID, consensusCloseTime, now_);
1577 if (acquired_.emplace(newID, result_->txns).second)
1579 if (!result_->position.isBowOut())
1580 adaptor_.share(result_->txns);
1582 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1586 updateDisputes(nodeId, result_->txns);
1592 adaptor_.propose(result_->position);
1596template <
class Adaptor>
1601 XRPL_ASSERT(result_,
"xrpl::Consensus::haveConsensus : has result");
1604 int agree = 0, disagree = 0;
1606 auto ourPosition = result_->position.position();
1609 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1611 Proposal_t const& peerProp = peerPos.proposal();
1612 if (peerProp.
position() == ourPosition)
1618 JLOG(j_.
debug()) <<
"Proposal disagreement: Peer " << nodeId <<
" has "
1623 auto currentFinished = adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1625 JLOG(j_.
debug()) <<
"Checking for TX consensus: agree=" << agree <<
", disagree=" << disagree;
1631 bool const stalled =
1632 haveCloseTimeConsensus_ && !result_->disputes.empty() &&
1634 return dispute.second.stalled(
1635 parms, mode_.get() == ConsensusMode::proposing, peerUnchangedCounter_, j_, clog);
1640 ss <<
"Consensus detects as stalled with " << (agree + disagree) <<
"/" << prevProposers_
1641 <<
" proposers, and " << result_->disputes.size() <<
" stalled disputed transactions.";
1653 result_->roundTime.read(),
1662 CLOG(
clog) <<
"No consensus. ";
1671 if (establishCounter_ < minimumCounter)
1680 ss <<
"Consensus time has expired in round " << establishCounter_
1681 <<
"; continue until round " << minimumCounter <<
". "
1684 CLOG(
clog) << ss.
str() <<
". ";
1689 CLOG(
clog) << ss.
str() <<
". ";
1690 leaveConsensus(
clog);
1696 JLOG(j_.
error()) <<
"Unable to reach consensus";
1701 CLOG(
clog) <<
"Consensus has been reached. ";
1705template <
class Adaptor>
1711 if (result_ && !result_->position.isBowOut())
1713 result_->position.bowOut(now_);
1714 adaptor_.propose(result_->position);
1718 JLOG(j_.
info()) <<
"Bowing out of consensus";
1719 CLOG(
clog) <<
"Bowing out of consensus. ";
1723template <
class Adaptor>
1728 XRPL_ASSERT(result_,
"xrpl::Consensus::createDisputes : result is set");
1731 auto const emplaced = result_->compares.emplace(o.id()).second;
1732 CLOG(
clog) <<
"createDisputes: new set? " << !emplaced <<
". ";
1737 if (result_->txns.id() == o.id())
1739 CLOG(
clog) <<
"both sets are identical. ";
1743 CLOG(
clog) <<
"comparing existing with new set: " << result_->txns.id() <<
',' << o.id()
1745 JLOG(j_.
debug()) <<
"createDisputes " << result_->txns.id() <<
" to " << o.id();
1747 auto differences = result_->txns.compare(o);
1751 for (
auto const& [txId, inThisSet] : differences)
1756 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1757 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1758 "xrpl::Consensus::createDisputes : has disputed transactions");
1760 Tx_t const tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1761 auto txID = tx.id();
1763 if (result_->disputes.find(txID) != result_->disputes.end())
1766 JLOG(j_.
debug()) <<
"Transaction " << txID <<
" is disputed";
1770 result_->txns.exists(txID),
1771 std::max(prevProposers_, currPeerPositions_.size()),
1775 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1777 Proposal_t const& peerProp = peerPos.proposal();
1778 auto const cit = acquired_.find(peerProp.
position());
1779 if (cit != acquired_.end() && dtx.setVote(nodeId, cit->second.exists(txID)))
1780 peerUnchangedCounter_ = 0;
1782 adaptor_.share(dtx.tx());
1784 result_->disputes.emplace(txID, std::move(dtx));
1786 JLOG(j_.
debug()) << dc <<
" differences found";
1787 CLOG(
clog) <<
"disputes: " << dc <<
". ";
1790template <
class Adaptor>
1795 XRPL_ASSERT(result_,
"xrpl::Consensus::updateDisputes : result is set");
1799 if (result_->compares.find(other.id()) == result_->compares.end())
1800 createDisputes(other);
1802 for (
auto& it : result_->disputes)
1804 auto& d = it.second;
1805 if (d.setVote(node, other.exists(d.tx().id())))
1806 peerUnchangedCounter_ = 0;
1810template <
class Adaptor>
Decorator for streaming out compact json.
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Position_t const & position() const
Get the proposed position.
NodeID_t const & nodeID() const
Identifying which peer took this position.
Measures the duration of phases of consensus.
ConsensusMode get() const
MonitoredMode(ConsensusMode m)
void set(ConsensusMode mode, Adaptor &a)
Generic implementation of consensus algorithm.
typename Adaptor::TxSet_t TxSet_t
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
typename Adaptor::Ledger_t Ledger_t
bool haveCloseTimeConsensus_
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
NetClock::duration closeResolution_
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
typename TxSet_t::Tx Tx_t
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
hash_set< NodeID_t > deadNodes_
std::size_t peerUnchangedCounter_
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
NetClock::time_point prevCloseTime_
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
std::chrono::milliseconds prevRoundTime_
typename Adaptor::PeerPosition_t PeerPosition_t
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
Consensus(Consensus &&) noexcept=default
NetClock::time_point asCloseTime(NetClock::time_point raw) const
ConsensusParms::AvalancheState closeTimeAvalancheState_
ConsensusPhase phase() const
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
Ledger_t::ID prevLedgerID_
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
std::size_t prevProposers_
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
std::size_t establishCounter_
NetClock::time_point now_
clock_type const & clock_
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
typename Adaptor::NodeID_t NodeID_t
std::optional< Result > result_
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
ConsensusCloseTimes rawCloseTimes_
A transaction discovered to be in dispute during consensus.
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
std::string to_string(base_uint< Bits, Tag > const &a)
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
Encapsulates the result of consensus.
T time_since_epoch(T... args)