3#include <xrpld/consensus/ConsensusParms.h>
4#include <xrpld/consensus/ConsensusProposal.h>
5#include <xrpld/consensus/ConsensusTypes.h>
6#include <xrpld/consensus/DisputedTx.h>
7#include <xrpld/consensus/LedgerTiming.h>
9#include <xrpl/basics/Log.h>
10#include <xrpl/basics/chrono.h>
11#include <xrpl/beast/utility/Journal.h>
12#include <xrpl/json/json_writer.h>
51 ConsensusParms
const& parms,
83 ConsensusParms
const& parms,
276template <
class Adaptor>
282 using Tx_t =
typename TxSet_t::Tx;
307 a.onModeChange(
mode_, mode);
348 std::unique_ptr<
std::stringstream> const& clog = {});
401 typename Ledger_t::ID
602template <
class Adaptor>
604 : adaptor_(adaptor), clock_(clock), j_{journal}
606 JLOG(
j_.
debug()) <<
"Creating consensus object";
609template <
class Adaptor>
613 typename Ledger_t::ID
const& prevLedgerID,
622 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
623 prevCloseTime_ = prevLedger.closeTime();
628 prevCloseTime_ = rawCloseTimes_.self;
631 for (
NodeID_t const& n : nowUntrusted)
632 recentPeerPositions_.erase(n);
637 if (prevLedger.id() != prevLedgerID)
640 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID))
642 prevLedger = *newLedger;
647 JLOG(j_.
info()) <<
"Entering consensus with: " << previousLedger_.id();
648 JLOG(j_.
info()) <<
"Correct LCL is: " << prevLedgerID;
652 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
654template <
class Adaptor>
658 typename Ledger_t::ID
const& prevLedgerID,
664 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::open ";
665 CLOG(clog) <<
"startRoundInternal transitioned to ConsensusPhase::open, "
666 "previous ledgerID: "
667 << prevLedgerID <<
", seq: " << prevLedger.seq() <<
". ";
668 mode_.set(mode, adaptor_);
670 prevLedgerID_ = prevLedgerID;
671 previousLedger_ = prevLedger;
673 convergePercent_ = 0;
675 haveCloseTimeConsensus_ =
false;
676 openTime_.reset(clock_.now());
677 currPeerPositions_.clear();
679 rawCloseTimes_.peers.clear();
680 rawCloseTimes_.self = {};
684 previousLedger_.closeTimeResolution(),
685 previousLedger_.closeAgree(),
686 previousLedger_.seq() +
typename Ledger_t::Seq{1});
689 CLOG(clog) <<
"number of peer proposals,previous proposers: " << currPeerPositions_.size() <<
',' << prevProposers_
691 if (currPeerPositions_.size() > (prevProposers_ / 2))
695 CLOG(clog) <<
"consider closing the ledger immediately. ";
696 timerEntry(now_, clog);
700template <
class Adaptor>
704 JLOG(j_.
debug()) <<
"PROPOSAL " << newPeerPos.render();
705 auto const& peerID = newPeerPos.proposal().nodeID();
709 auto& props = recentPeerPositions_[peerID];
711 if (props.size() >= 10)
714 props.push_back(newPeerPos);
716 return peerProposalInternal(now, newPeerPos);
719template <
class Adaptor>
729 auto const& newPeerProp = newPeerPos.proposal();
731 if (newPeerProp.prevLedger() != prevLedgerID_)
733 JLOG(j_.
debug()) <<
"Got proposal for " << newPeerProp.prevLedger() <<
" but we are on " << prevLedgerID_;
737 auto const& peerID = newPeerProp.nodeID();
739 if (deadNodes_.find(peerID) != deadNodes_.end())
741 JLOG(j_.
info()) <<
"Position from dead node: " << peerID;
747 auto peerPosIt = currPeerPositions_.find(peerID);
749 if (peerPosIt != currPeerPositions_.end())
751 if (newPeerProp.proposeSeq() <= peerPosIt->second.proposal().proposeSeq())
757 if (newPeerProp.isBowOut())
759 JLOG(j_.
info()) <<
"Peer " << peerID <<
" bows out";
762 for (
auto& it : result_->disputes)
763 it.second.unVote(peerID);
765 if (peerPosIt != currPeerPositions_.end())
766 currPeerPositions_.erase(peerID);
767 deadNodes_.insert(peerID);
772 if (peerPosIt != currPeerPositions_.end())
773 peerPosIt->second = newPeerPos;
775 currPeerPositions_.emplace(peerID, newPeerPos);
778 if (newPeerProp.isInitial())
781 JLOG(j_.
trace()) <<
"Peer reports close time as " << newPeerProp.closeTime().time_since_epoch().count();
782 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
785 JLOG(j_.
trace()) <<
"Processing peer proposal " << newPeerProp.proposeSeq() <<
"/" << newPeerProp.position();
788 auto const ait = acquired_.find(newPeerProp.position());
789 if (ait == acquired_.end())
794 if (
auto set = adaptor_.acquireTxSet(newPeerProp.position()))
795 gotTxSet(now_, *
set);
797 JLOG(j_.
debug()) <<
"Don't have tx set for peer";
801 updateDisputes(newPeerProp.nodeID(), ait->second);
808template <
class Adaptor>
812 CLOG(clog) <<
"Consensus<Adaptor>::timerEntry. ";
816 CLOG(clog) <<
"Nothing to do during accepted phase. ";
821 CLOG(clog) <<
"Set network adjusted time to " <<
to_string(now) <<
". ";
824 auto const phaseOrig = phase_;
825 CLOG(clog) <<
"Phase " <<
to_string(phaseOrig) <<
". ";
827 if (phaseOrig != phase_)
829 CLOG(clog) <<
"Changed phase to << " <<
to_string(phase_) <<
". ";
835 phaseEstablish(clog);
836 CLOG(clog) <<
"timerEntry finishing in phase " <<
to_string(phase_) <<
". ";
839template <
class Adaptor>
849 auto id = txSet.id();
853 if (!acquired_.emplace(
id, txSet).second)
858 JLOG(j_.
debug()) <<
"Not creating disputes: no position yet.";
864 XRPL_ASSERT(
id != result_->position.position(),
"xrpl::Consensus::gotTxSet : updated transaction set");
866 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
868 if (peerPos.proposal().position() == id)
870 updateDisputes(nodeId, txSet);
877 JLOG(j_.
warn()) <<
"By the time we got " <<
id <<
" no peers were proposing it";
882template <
class Adaptor>
886 using namespace std::chrono_literals;
887 JLOG(j_.
info()) <<
"Simulating consensus";
890 result_->roundTime.tick(consensusDelay.
value_or(100ms));
891 result_->proposers = prevProposers_ = currPeerPositions_.size();
892 prevRoundTime_ = result_->roundTime.read();
894 adaptor_.onForceAccept(*result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_.get(),
getJson(
true));
895 JLOG(j_.
info()) <<
"Simulation complete";
898template <
class Adaptor>
908 ret[
"proposers"] =
static_cast<int>(currPeerPositions_.size());
912 ret[
"synched"] =
true;
913 ret[
"ledger_seq"] =
static_cast<std::uint32_t>(previousLedger_.seq()) + 1;
914 ret[
"close_granularity"] =
static_cast<Int
>(closeResolution_.count());
917 ret[
"synched"] =
false;
921 if (result_ && !result_->disputes.empty() && !
full)
922 ret[
"disputes"] =
static_cast<Int
>(result_->disputes.size());
925 ret[
"our_position"] = result_->position.getJson();
930 ret[
"current_ms"] =
static_cast<Int
>(result_->roundTime.read().count());
931 ret[
"converge_percent"] = convergePercent_;
932 ret[
"close_resolution"] =
static_cast<Int
>(closeResolution_.count());
933 ret[
"have_time_consensus"] = haveCloseTimeConsensus_;
934 ret[
"previous_proposers"] =
static_cast<Int
>(prevProposers_);
935 ret[
"previous_mseconds"] =
static_cast<Int
>(prevRoundTime_.count());
937 if (!currPeerPositions_.empty())
941 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
943 ppj[
to_string(nodeId)] = peerPos.getJson();
945 ret[
"peer_positions"] = std::move(ppj);
948 if (!acquired_.empty())
951 for (
auto const& at : acquired_)
955 ret[
"acquired"] = std::move(acq);
958 if (result_ && !result_->disputes.empty())
961 for (
auto const& [txId, dispute] : result_->disputes)
963 dsj[
to_string(txId)] = dispute.getJson();
965 ret[
"disputes"] = std::move(dsj);
968 if (!rawCloseTimes_.peers.empty())
971 for (
auto const& ct : rawCloseTimes_.peers)
973 ctj[
std::to_string(ct.first.time_since_epoch().count())] = ct.second;
975 ret[
"close_times"] = std::move(ctj);
978 if (!deadNodes_.empty())
981 for (
auto const& dn : deadNodes_)
985 ret[
"dead_nodes"] = std::move(dnj);
993template <
class Adaptor>
996 typename Ledger_t::ID
const& lgrId,
999 CLOG(clog) <<
"handleWrongLedger. ";
1001 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1002 "xrpl::Consensus::handleWrongLedger : have wrong ledger");
1005 leaveConsensus(clog);
1008 if (prevLedgerID_ != lgrId)
1010 prevLedgerID_ = lgrId;
1015 result_->disputes.clear();
1016 result_->compares.clear();
1019 currPeerPositions_.clear();
1020 rawCloseTimes_.peers.clear();
1024 playbackProposals();
1027 if (previousLedger_.id() == prevLedgerID_)
1029 CLOG(clog) <<
"previousLedger_.id() == prevLeverID_ " << prevLedgerID_ <<
". ";
1034 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1036 JLOG(j_.
info()) <<
"Have the consensus ledger " << prevLedgerID_;
1037 CLOG(clog) <<
"Have the consensus ledger " << prevLedgerID_ <<
". ";
1042 CLOG(clog) <<
"Still on wrong ledger. ";
1047template <
class Adaptor>
1051 CLOG(clog) <<
"checkLedger. ";
1053 auto netLgr = adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1054 CLOG(clog) <<
"network ledgerid " << netLgr <<
", "
1055 <<
"previous ledger " << prevLedgerID_ <<
". ";
1057 if (netLgr != prevLedgerID_)
1060 ss <<
"View of consensus changed during " <<
to_string(phase_) <<
" mode=" <<
to_string(mode_.get()) <<
", "
1061 << prevLedgerID_ <<
" to " << netLgr <<
", " <<
Json::Compact{previousLedger_.getJson()} <<
". ";
1063 CLOG(clog) << ss.
str();
1065 handleWrongLedger(netLgr, clog);
1067 else if (previousLedger_.id() != prevLedgerID_)
1069 CLOG(clog) <<
"previousLedger_.id() != prevLedgerID_: " << previousLedger_.id() <<
','
1071 handleWrongLedger(netLgr, clog);
1075template <
class Adaptor>
1079 for (
auto const& it : recentPeerPositions_)
1081 for (
auto const& pos : it.second)
1083 if (pos.proposal().prevLedger() == prevLedgerID_)
1085 if (peerProposalInternal(now_, pos))
1086 adaptor_.share(pos);
1092template <
class Adaptor>
1096 CLOG(clog) <<
"phaseOpen. ";
1100 bool anyTransactions = adaptor_.hasOpenTransactions();
1101 auto proposersClosed = currPeerPositions_.size();
1102 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1104 openTime_.tick(clock_.now());
1109 auto const mode = mode_.get();
1110 bool const closeAgree = previousLedger_.closeAgree();
1111 auto const prevCloseTime = previousLedger_.closeTime();
1112 auto const prevParentCloseTimePlus1 = previousLedger_.parentCloseTime() + 1s;
1113 bool const previousCloseCorrect =
1116 auto const lastCloseTime = previousCloseCorrect ? prevCloseTime
1119 if (now_ >= lastCloseTime)
1120 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1122 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1123 CLOG(
clog) <<
"calculating how long since last ledger's close time "
1125 <<
to_string(mode) <<
", previous closeAgree: " << closeAgree
1126 <<
", previous close time: " <<
to_string(prevCloseTime)
1127 <<
", previous parent close time + 1s: " <<
to_string(prevParentCloseTimePlus1)
1128 <<
", previous close time seen internally: " <<
to_string(prevCloseTime_)
1129 <<
", last close time: " <<
to_string(lastCloseTime) <<
", since close: " << sinceClose.
count()
1133 auto const idleInterval =
1135 CLOG(
clog) <<
"idle interval set to " << idleInterval.
count() <<
"ms based on "
1136 <<
"ledgerIDLE_INTERVAL: " << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1137 <<
", previous ledger close time resolution: " << previousLedger_.closeTimeResolution().count()
1154 CLOG(
clog) <<
"closing ledger. ";
1159template <
class Adaptor>
1163 CLOG(
clog) <<
"shouldPause? ";
1164 auto const& parms = adaptor_.parms();
1165 std::uint32_t const ahead(previousLedger_.seq() -
std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1166 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1167 std::size_t const totalValidators = trustedKeys.size();
1168 std::size_t laggards = adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1172 vars <<
" consensuslog (working seq: " << previousLedger_.seq() <<
", "
1173 <<
"validated seq: " << adaptor_.getValidLedgerIndex() <<
", "
1174 <<
"am validator: " << adaptor_.validator() <<
", "
1175 <<
"have validated: " << adaptor_.haveValidated() <<
", "
1176 <<
"roundTime: " << result_->roundTime.
read().count() <<
", "
1177 <<
"max consensus time: " << parms.ledgerMAX_CONSENSUS.count() <<
", "
1178 <<
"validators: " << totalValidators <<
", "
1179 <<
"laggards: " << laggards <<
", "
1180 <<
"offline: " << offline <<
", "
1181 <<
"quorum: " << quorum <<
")";
1183 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() || !adaptor_.haveValidated() ||
1184 result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1186 j_.
debug() <<
"not pausing (early)" << vars.
str();
1187 CLOG(
clog) <<
"Not pausing (early). ";
1191 bool willPause =
false;
1227 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1236 if (laggards + offline > totalValidators - quorum)
1251 float const nonLaggards = totalValidators - (laggards + offline);
1252 float const quorumRatio =
static_cast<float>(quorum) / totalValidators;
1253 float const allowedDissent = 1.0f - quorumRatio;
1254 float const phaseFactor =
static_cast<float>(phase) / maxPausePhase;
1256 if (nonLaggards / totalValidators < quorumRatio + (allowedDissent * phaseFactor))
1264 j_.
warn() <<
"pausing" << vars.
str();
1265 CLOG(
clog) <<
"pausing " << vars.
str() <<
". ";
1269 j_.
debug() <<
"not pausing" << vars.
str();
1270 CLOG(
clog) <<
"not pausing. ";
1275template <
class Adaptor>
1279 CLOG(
clog) <<
"phaseEstablish. ";
1281 XRPL_ASSERT(result_,
"xrpl::Consensus::phaseEstablish : result is set");
1283 ++peerUnchangedCounter_;
1284 ++establishCounter_;
1289 result_->roundTime.tick(clock_.now());
1290 result_->proposers = currPeerPositions_.size();
1294 CLOG(
clog) <<
"convergePercent_ " << convergePercent_
1295 <<
" is based on round duration so far: " << result_->roundTime.read().count() <<
"ms, "
1296 <<
"previous round duration: " << prevRoundTime_.count() <<
"ms, "
1306 updateOurPositions(
clog);
1309 if (shouldPause(
clog) || !haveConsensus(
clog))
1312 if (!haveCloseTimeConsensus_)
1314 JLOG(j_.
info()) <<
"We have TX consensus but not CT consensus";
1315 CLOG(
clog) <<
"We have TX consensus but not CT consensus. ";
1319 JLOG(j_.
info()) <<
"Converge cutoff (" << currPeerPositions_.size() <<
" participants)";
1320 CLOG(
clog) <<
"Converge cutoff (" << currPeerPositions_.size()
1321 <<
" participants). Transitioned to ConsensusPhase::accepted. ";
1322 adaptor_.updateOperatingMode(currPeerPositions_.size());
1323 prevProposers_ = currPeerPositions_.size();
1324 prevRoundTime_ = result_->roundTime.read();
1326 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::accepted";
1328 *result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_.get(),
getJson(
true), adaptor_.validating());
1331template <
class Adaptor>
1336 XRPL_ASSERT(!result_,
"xrpl::Consensus::closeLedger : result is not set");
1339 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::establish";
1340 rawCloseTimes_.self = now_;
1341 peerUnchangedCounter_ = 0;
1342 establishCounter_ = 0;
1344 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1345 result_->roundTime.reset(clock_.now());
1348 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1349 adaptor_.share(result_->txns);
1351 auto const mode = mode_.get();
1352 CLOG(
clog) <<
"closeLedger transitioned to ConsensusPhase::establish, mode: " <<
to_string(mode)
1353 <<
", number of peer positions: " << currPeerPositions_.
size() <<
". ";
1355 adaptor_.propose(result_->position);
1358 for (
auto const& pit : currPeerPositions_)
1360 auto const& pos = pit.second.proposal().position();
1361 auto const it = acquired_.find(pos);
1362 if (it != acquired_.end())
1363 createDisputes(it->second,
clog);
1382 int result = ((participants * percent) + (percent / 2)) / 100;
1384 return (result == 0) ? 1 : result;
1387template <
class Adaptor>
1392 XRPL_ASSERT(result_,
"xrpl::Consensus::updateOurPositions : result is set");
1398 CLOG(
clog) <<
"updateOurPositions. peerCutoff " <<
to_string(peerCutoff) <<
", ourCutoff " <<
to_string(ourCutoff)
1404 auto it = currPeerPositions_.
begin();
1405 while (it != currPeerPositions_.end())
1407 Proposal_t const& peerProp = it->second.proposal();
1408 if (peerProp.
isStale(peerCutoff))
1412 JLOG(j_.
warn()) <<
"Removing stale proposal from " << peerID;
1413 for (
auto& dt : result_->disputes)
1414 dt.second.unVote(peerID);
1415 it = currPeerPositions_.erase(it);
1420 ++closeTimeVotes[asCloseTime(peerProp.
closeTime())];
1432 for (
auto& [txId, dispute] : result_->disputes)
1439 mutableSet.
emplace(result_->txns);
1441 if (dispute.getOurVote())
1444 mutableSet->insert(dispute.tx());
1449 mutableSet->erase(txId);
1455 ourNewSet.
emplace(std::move(*mutableSet));
1459 haveCloseTimeConsensus_ =
false;
1461 if (currPeerPositions_.empty())
1464 haveCloseTimeConsensus_ =
true;
1465 consensusCloseTime = asCloseTime(result_->position.closeTime());
1470 auto const [neededWeight, newState] =
getNeededWeight(parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
1472 closeTimeAvalancheState_ = *newState;
1473 CLOG(
clog) <<
"neededWeight " << neededWeight <<
". ";
1475 int participants = currPeerPositions_.size();
1478 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1489 ss <<
"Proposers:" << currPeerPositions_.size() <<
" nw:" << neededWeight <<
" thrV:" << threshVote
1490 <<
" thrC:" << threshConsensus;
1494 for (
auto const& [t, v] : closeTimeVotes)
1496 JLOG(j_.
debug()) <<
"CCTime: seq " <<
static_cast<std::uint32_t>(previousLedger_.seq()) + 1 <<
": "
1497 << t.time_since_epoch().count() <<
" has " << v <<
", " << threshVote <<
" required";
1499 if (v >= threshVote)
1502 consensusCloseTime = t;
1505 if (threshVote >= threshConsensus)
1506 haveCloseTimeConsensus_ =
true;
1510 if (!haveCloseTimeConsensus_)
1512 JLOG(j_.
debug()) <<
"No CT consensus:"
1513 <<
" Proposers:" << currPeerPositions_.size() <<
" Mode:" <<
to_string(mode_.get())
1514 <<
" Thresh:" << threshConsensus
1516 CLOG(
clog) <<
"No close time consensus. ";
1521 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) || result_->position.isStale(ourCutoff)))
1524 ourNewSet.
emplace(result_->txns);
1529 auto newID = ourNewSet->id();
1531 result_->txns = std::move(*ourNewSet);
1534 ss <<
"Position change: CTime " << consensusCloseTime.
time_since_epoch().count() <<
", tx " << newID;
1538 result_->position.changePosition(newID, consensusCloseTime, now_);
1542 if (acquired_.emplace(newID, result_->txns).second)
1544 if (!result_->position.isBowOut())
1545 adaptor_.share(result_->txns);
1547 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1551 updateDisputes(nodeId, result_->txns);
1557 adaptor_.propose(result_->position);
1561template <
class Adaptor>
1566 XRPL_ASSERT(result_,
"xrpl::Consensus::haveConsensus : has result");
1569 int agree = 0, disagree = 0;
1571 auto ourPosition = result_->position.position();
1574 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1576 Proposal_t const& peerProp = peerPos.proposal();
1577 if (peerProp.
position() == ourPosition)
1583 JLOG(j_.
debug()) <<
"Proposal disagreement: Peer " << nodeId <<
" has " << peerProp.
position();
1587 auto currentFinished = adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1589 JLOG(j_.
debug()) <<
"Checking for TX consensus: agree=" << agree <<
", disagree=" << disagree;
1595 bool const stalled = haveCloseTimeConsensus_ && !result_->disputes.empty() &&
1597 return dispute.second.stalled(
1598 parms, mode_.get() == ConsensusMode::proposing, peerUnchangedCounter_, j_, clog);
1603 ss <<
"Consensus detects as stalled with " << (agree + disagree) <<
"/" << prevProposers_ <<
" proposers, and "
1604 << result_->disputes.size() <<
" stalled disputed transactions.";
1616 result_->roundTime.read(),
1625 CLOG(
clog) <<
"No consensus. ";
1634 if (establishCounter_ < minimumCounter)
1643 ss <<
"Consensus time has expired in round " << establishCounter_ <<
"; continue until round "
1646 CLOG(
clog) << ss.
str() <<
". ";
1651 CLOG(
clog) << ss.
str() <<
". ";
1652 leaveConsensus(
clog);
1658 JLOG(j_.
error()) <<
"Unable to reach consensus";
1663 CLOG(
clog) <<
"Consensus has been reached. ";
1667template <
class Adaptor>
1673 if (result_ && !result_->position.isBowOut())
1675 result_->position.bowOut(now_);
1676 adaptor_.propose(result_->position);
1680 JLOG(j_.
info()) <<
"Bowing out of consensus";
1681 CLOG(
clog) <<
"Bowing out of consensus. ";
1685template <
class Adaptor>
1690 XRPL_ASSERT(result_,
"xrpl::Consensus::createDisputes : result is set");
1693 auto const emplaced = result_->compares.emplace(o.id()).second;
1694 CLOG(
clog) <<
"createDisputes: new set? " << !emplaced <<
". ";
1699 if (result_->txns.id() == o.id())
1701 CLOG(
clog) <<
"both sets are identical. ";
1705 CLOG(
clog) <<
"comparing existing with new set: " << result_->txns.id() <<
',' << o.id() <<
". ";
1706 JLOG(j_.
debug()) <<
"createDisputes " << result_->txns.id() <<
" to " << o.id();
1708 auto differences = result_->txns.compare(o);
1712 for (
auto const& [txId, inThisSet] : differences)
1717 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1718 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1719 "xrpl::Consensus::createDisputes : has disputed transactions");
1721 Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1722 auto txID = tx.id();
1724 if (result_->disputes.find(txID) != result_->disputes.end())
1727 JLOG(j_.
debug()) <<
"Transaction " << txID <<
" is disputed";
1730 tx, result_->txns.exists(txID),
std::max(prevProposers_, currPeerPositions_.size()), j_};
1733 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1735 Proposal_t const& peerProp = peerPos.proposal();
1736 auto const cit = acquired_.find(peerProp.
position());
1737 if (cit != acquired_.end() && dtx.setVote(nodeId, cit->second.exists(txID)))
1738 peerUnchangedCounter_ = 0;
1740 adaptor_.share(dtx.tx());
1742 result_->disputes.emplace(txID, std::move(dtx));
1744 JLOG(j_.
debug()) << dc <<
" differences found";
1745 CLOG(
clog) <<
"disputes: " << dc <<
". ";
1748template <
class Adaptor>
1753 XRPL_ASSERT(result_,
"xrpl::Consensus::updateDisputes : result is set");
1757 if (result_->compares.find(other.id()) == result_->compares.end())
1758 createDisputes(other);
1760 for (
auto& it : result_->disputes)
1762 auto& d = it.second;
1763 if (d.setVote(node, other.exists(d.tx().id())))
1764 peerUnchangedCounter_ = 0;
1768template <
class Adaptor>
Decorator for streaming out compact json.
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Position_t const & position() const
Get the proposed position.
NodeID_t const & nodeID() const
Identifying which peer took this position.
Measures the duration of phases of consensus.
ConsensusMode get() const
MonitoredMode(ConsensusMode m)
void set(ConsensusMode mode, Adaptor &a)
Generic implementation of consensus algorithm.
typename Adaptor::TxSet_t TxSet_t
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
typename Adaptor::Ledger_t Ledger_t
bool haveCloseTimeConsensus_
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
NetClock::duration closeResolution_
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
typename TxSet_t::Tx Tx_t
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
hash_set< NodeID_t > deadNodes_
std::size_t peerUnchangedCounter_
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
NetClock::time_point prevCloseTime_
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
std::chrono::milliseconds prevRoundTime_
typename Adaptor::PeerPosition_t PeerPosition_t
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
Consensus(Consensus &&) noexcept=default
NetClock::time_point asCloseTime(NetClock::time_point raw) const
ConsensusParms::AvalancheState closeTimeAvalancheState_
ConsensusPhase phase() const
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
Ledger_t::ID prevLedgerID_
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
std::size_t prevProposers_
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
std::size_t establishCounter_
NetClock::time_point now_
clock_type const & clock_
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
typename Adaptor::NodeID_t NodeID_t
std::optional< Result > result_
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
ConsensusCloseTimes rawCloseTimes_
A transaction discovered to be in dispute during consensus.
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
std::string to_string(base_uint< Bits, Tag > const &a)
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
Encapsulates the result of consensus.
T time_since_epoch(T... args)