20#ifndef RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED 
   21#define RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED 
   23#include <xrpld/consensus/ConsensusParms.h> 
   24#include <xrpld/consensus/ConsensusProposal.h> 
   25#include <xrpld/consensus/ConsensusTypes.h> 
   26#include <xrpld/consensus/DisputedTx.h> 
   27#include <xrpld/consensus/LedgerTiming.h> 
   29#include <xrpl/basics/Log.h> 
   30#include <xrpl/basics/chrono.h> 
   31#include <xrpl/beast/utility/Journal.h> 
   32#include <xrpl/json/json_writer.h> 
   71    ConsensusParms 
const& parms,
 
  103    ConsensusParms 
const& parms,
 
  296template <
class Adaptor>
 
  302    using Tx_t = 
typename TxSet_t::Tx;
 
  306        typename Ledger_t::ID,
 
  307        typename TxSet_t::ID>;
 
  330            a.onModeChange(
mode_, mode);
 
 
 
  371        std::unique_ptr<
std::stringstream> const& clog = {});
 
  430    typename Ledger_t::ID
 
  464        typename Ledger_t::ID 
const& lgrId,
 
  638template <
class Adaptor>
 
  643    : adaptor_(adaptor), clock_(clock), j_{journal}
 
  645    JLOG(
j_.
debug()) << 
"Creating consensus object";
 
 
  648template <
class Adaptor>
 
  652    typename Ledger_t::ID 
const& prevLedgerID,
 
  661        prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
 
  662        prevCloseTime_ = prevLedger.closeTime();
 
  667        prevCloseTime_ = rawCloseTimes_.self;
 
  670    for (
NodeID_t const& n : nowUntrusted)
 
  671        recentPeerPositions_.erase(n);
 
  677    if (prevLedger.id() != prevLedgerID)
 
  680        if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID))
 
  682            prevLedger = *newLedger;
 
  688                << 
"Entering consensus with: " << previousLedger_.id();
 
  689            JLOG(j_.
info()) << 
"Correct LCL is: " << prevLedgerID;
 
  693    startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
 
 
  695template <
class Adaptor>
 
  699    typename Ledger_t::ID 
const& prevLedgerID,
 
  705    JLOG(j_.
debug()) << 
"transitioned to ConsensusPhase::open ";
 
  706    CLOG(clog) << 
"startRoundInternal transitioned to ConsensusPhase::open, " 
  707                  "previous ledgerID: " 
  708               << prevLedgerID << 
", seq: " << prevLedger.seq() << 
". ";
 
  709    mode_.set(mode, adaptor_);
 
  711    prevLedgerID_ = prevLedgerID;
 
  712    previousLedger_ = prevLedger;
 
  714    convergePercent_ = 0;
 
  716    haveCloseTimeConsensus_ = 
false;
 
  717    openTime_.reset(clock_.now());
 
  718    currPeerPositions_.clear();
 
  720    rawCloseTimes_.peers.clear();
 
  721    rawCloseTimes_.self = {};
 
  725        previousLedger_.closeTimeResolution(),
 
  726        previousLedger_.closeAgree(),
 
  727        previousLedger_.seq() + 
typename Ledger_t::Seq{1});
 
  730    CLOG(clog) << 
"number of peer proposals,previous proposers: " 
  731               << currPeerPositions_.size() << 
',' << prevProposers_ << 
". ";
 
  732    if (currPeerPositions_.size() > (prevProposers_ / 2))
 
  736        CLOG(clog) << 
"consider closing the ledger immediately. ";
 
  737        timerEntry(now_, clog);
 
 
  741template <
class Adaptor>
 
  747    JLOG(j_.
debug()) << 
"PROPOSAL " << newPeerPos.render();
 
  748    auto const& peerID = newPeerPos.proposal().nodeID();
 
  752        auto& props = recentPeerPositions_[peerID];
 
  754        if (props.size() >= 10)
 
  757        props.push_back(newPeerPos);
 
  759    return peerProposalInternal(now, newPeerPos);
 
 
  762template <
class Adaptor>
 
  774    auto const& newPeerProp = newPeerPos.proposal();
 
  776    if (newPeerProp.prevLedger() != prevLedgerID_)
 
  778        JLOG(j_.
debug()) << 
"Got proposal for " << newPeerProp.prevLedger()
 
  779                         << 
" but we are on " << prevLedgerID_;
 
  783    auto const& peerID = newPeerProp.nodeID();
 
  785    if (deadNodes_.find(peerID) != deadNodes_.end())
 
  787        JLOG(j_.
info()) << 
"Position from dead node: " << peerID;
 
  793        auto peerPosIt = currPeerPositions_.find(peerID);
 
  795        if (peerPosIt != currPeerPositions_.end())
 
  797            if (newPeerProp.proposeSeq() <=
 
  798                peerPosIt->second.proposal().proposeSeq())
 
  804        if (newPeerProp.isBowOut())
 
  806            JLOG(j_.
info()) << 
"Peer " << peerID << 
" bows out";
 
  809                for (
auto& it : result_->disputes)
 
  810                    it.second.unVote(peerID);
 
  812            if (peerPosIt != currPeerPositions_.end())
 
  813                currPeerPositions_.erase(peerID);
 
  814            deadNodes_.insert(peerID);
 
  819        if (peerPosIt != currPeerPositions_.end())
 
  820            peerPosIt->second = newPeerPos;
 
  822            currPeerPositions_.emplace(peerID, newPeerPos);
 
  825    if (newPeerProp.isInitial())
 
  828        JLOG(j_.
trace()) << 
"Peer reports close time as " 
  829                         << newPeerProp.closeTime().time_since_epoch().count();
 
  830        ++rawCloseTimes_.peers[newPeerProp.closeTime()];
 
  833    JLOG(j_.
trace()) << 
"Processing peer proposal " << newPeerProp.proposeSeq()
 
  834                     << 
"/" << newPeerProp.position();
 
  837        auto const ait = acquired_.find(newPeerProp.position());
 
  838        if (ait == acquired_.end())
 
  843            if (
auto set = adaptor_.acquireTxSet(newPeerProp.position()))
 
  844                gotTxSet(now_, *
set);
 
  846                JLOG(j_.
debug()) << 
"Don't have tx set for peer";
 
  850            updateDisputes(newPeerProp.nodeID(), ait->second);
 
 
  857template <
class Adaptor>
 
  863    CLOG(clog) << 
"Consensus<Adaptor>::timerEntry. ";
 
  867        CLOG(clog) << 
"Nothing to do during accepted phase. ";
 
  872    CLOG(clog) << 
"Set network adjusted time to " << 
to_string(now) << 
". ";
 
  875    auto const phaseOrig = phase_;
 
  876    CLOG(clog) << 
"Phase " << 
to_string(phaseOrig) << 
". ";
 
  878    if (phaseOrig != phase_)
 
  880        CLOG(clog) << 
"Changed phase to << " << 
to_string(phase_) << 
". ";
 
  886        phaseEstablish(clog);
 
  887    CLOG(clog) << 
"timerEntry finishing in phase " << 
to_string(phase_) << 
". ";
 
 
  890template <
class Adaptor>
 
  902    auto id = txSet.id();
 
  906    if (!acquired_.emplace(
id, txSet).second)
 
  911        JLOG(j_.
debug()) << 
"Not creating disputes: no position yet.";
 
  918            id != result_->position.position(),
 
  919            "ripple::Consensus::gotTxSet : updated transaction set");
 
  921        for (
auto const& [nodeId, peerPos] : currPeerPositions_)
 
  923            if (peerPos.proposal().position() == id)
 
  925                updateDisputes(nodeId, txSet);
 
  933                << 
"By the time we got " << 
id << 
" no peers were proposing it";
 
 
  938template <
class Adaptor>
 
  944    using namespace std::chrono_literals;
 
  945    JLOG(j_.
info()) << 
"Simulating consensus";
 
  948    result_->roundTime.tick(consensusDelay.
value_or(100ms));
 
  949    result_->proposers = prevProposers_ = currPeerPositions_.size();
 
  950    prevRoundTime_ = result_->roundTime.read();
 
  952    adaptor_.onForceAccept(
 
  959    JLOG(j_.
info()) << 
"Simulation complete";
 
 
  962template <
class Adaptor>
 
  972    ret[
"proposers"] = 
static_cast<int>(currPeerPositions_.size());
 
  976        ret[
"synched"] = 
true;
 
  979        ret[
"close_granularity"] = 
static_cast<Int
>(closeResolution_.count());
 
  982        ret[
"synched"] = 
false;
 
  986    if (result_ && !result_->disputes.empty() && !full)
 
  987        ret[
"disputes"] = 
static_cast<Int
>(result_->disputes.size());
 
  990        ret[
"our_position"] = result_->position.getJson();
 
  996                static_cast<Int
>(result_->roundTime.read().count());
 
  997        ret[
"converge_percent"] = convergePercent_;
 
  998        ret[
"close_resolution"] = 
static_cast<Int
>(closeResolution_.count());
 
  999        ret[
"have_time_consensus"] = haveCloseTimeConsensus_;
 
 1000        ret[
"previous_proposers"] = 
static_cast<Int
>(prevProposers_);
 
 1001        ret[
"previous_mseconds"] = 
static_cast<Int
>(prevRoundTime_.count());
 
 1003        if (!currPeerPositions_.empty())
 
 1007            for (
auto const& [nodeId, peerPos] : currPeerPositions_)
 
 1009                ppj[
to_string(nodeId)] = peerPos.getJson();
 
 1011            ret[
"peer_positions"] = std::move(ppj);
 
 1014        if (!acquired_.empty())
 
 1017            for (
auto const& at : acquired_)
 
 1021            ret[
"acquired"] = std::move(acq);
 
 1024        if (result_ && !result_->disputes.empty())
 
 1027            for (
auto const& [txId, dispute] : result_->disputes)
 
 1029                dsj[
to_string(txId)] = dispute.getJson();
 
 1031            ret[
"disputes"] = std::move(dsj);
 
 1034        if (!rawCloseTimes_.peers.empty())
 
 1037            for (
auto const& ct : rawCloseTimes_.peers)
 
 1042            ret[
"close_times"] = std::move(ctj);
 
 1045        if (!deadNodes_.empty())
 
 1048            for (
auto const& dn : deadNodes_)
 
 1052            ret[
"dead_nodes"] = std::move(dnj);
 
 
 1060template <
class Adaptor>
 
 1063    typename Ledger_t::ID 
const& lgrId,
 
 1066    CLOG(clog) << 
"handleWrongLedger. ";
 
 1068        lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
 
 1069        "ripple::Consensus::handleWrongLedger : have wrong ledger");
 
 1072    leaveConsensus(clog);
 
 1075    if (prevLedgerID_ != lgrId)
 
 1077        prevLedgerID_ = lgrId;
 
 1082            result_->disputes.clear();
 
 1083            result_->compares.clear();
 
 1086        currPeerPositions_.clear();
 
 1087        rawCloseTimes_.peers.clear();
 
 1091        playbackProposals();
 
 1094    if (previousLedger_.id() == prevLedgerID_)
 
 1096        CLOG(clog) << 
"previousLedger_.id() == prevLeverID_ " << prevLedgerID_
 
 1102    if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
 
 1104        JLOG(j_.
info()) << 
"Have the consensus ledger " << prevLedgerID_;
 
 1105        CLOG(clog) << 
"Have the consensus ledger " << prevLedgerID_ << 
". ";
 
 1111        CLOG(clog) << 
"Still on wrong ledger. ";
 
 
 1116template <
class Adaptor>
 
 1120    CLOG(clog) << 
"checkLedger. ";
 
 1123        adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
 
 1124    CLOG(clog) << 
"network ledgerid " << netLgr << 
",  " 
 1125               << 
"previous ledger " << prevLedgerID_ << 
". ";
 
 1127    if (netLgr != prevLedgerID_)
 
 1130        ss << 
"View of consensus changed during " << 
to_string(phase_)
 
 1131           << 
" mode=" << 
to_string(mode_.get()) << 
", " << prevLedgerID_
 
 1132           << 
" to " << netLgr << 
", " 
 1135        CLOG(clog) << ss.
str();
 
 1136        CLOG(clog) << 
"State on consensus change " 
 1138        handleWrongLedger(netLgr, clog);
 
 1140    else if (previousLedger_.id() != prevLedgerID_)
 
 1142        CLOG(clog) << 
"previousLedger_.id() != prevLedgerID_: " 
 1143                   << previousLedger_.id() << 
',' << 
to_string(prevLedgerID_)
 
 1145        handleWrongLedger(netLgr, clog);
 
 
 1149template <
class Adaptor>
 
 1153    for (
auto const& it : recentPeerPositions_)
 
 1155        for (
auto const& pos : it.second)
 
 1157            if (pos.proposal().prevLedger() == prevLedgerID_)
 
 1159                if (peerProposalInternal(now_, pos))
 
 1160                    adaptor_.share(pos);
 
 
 1166template <
class Adaptor>
 
 1170    CLOG(clog) << 
"phaseOpen. ";
 
 1174    bool anyTransactions = adaptor_.hasOpenTransactions();
 
 1175    auto proposersClosed = currPeerPositions_.size();
 
 1176    auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
 
 1178    openTime_.tick(clock_.now());
 
 1183        auto const mode = mode_.get();
 
 1184        bool const closeAgree = previousLedger_.closeAgree();
 
 1185        auto const prevCloseTime = previousLedger_.closeTime();
 
 1186        auto const prevParentCloseTimePlus1 =
 
 1187            previousLedger_.parentCloseTime() + 1s;
 
 1188        bool const previousCloseCorrect =
 
 1190            (prevCloseTime != prevParentCloseTimePlus1);
 
 1192        auto const lastCloseTime = previousCloseCorrect
 
 1196        if (now_ >= lastCloseTime)
 
 1197            sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
 
 1199            sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
 
 1200        CLOG(
clog) << 
"calculating how long since last ledger's close time " 
 1202                   << 
to_string(mode) << 
", previous closeAgree: " << closeAgree
 
 1203                   << 
", previous close time: " << 
to_string(prevCloseTime)
 
 1204                   << 
", previous parent close time + 1s: " 
 1206                   << 
", previous close time seen internally: " 
 1208                   << 
", last close time: " << 
to_string(lastCloseTime)
 
 1209                   << 
", since close: " << sinceClose.
count() << 
". ";
 
 1213        adaptor_.parms().ledgerIDLE_INTERVAL,
 
 1214        2 * previousLedger_.closeTimeResolution());
 
 1215    CLOG(
clog) << 
"idle interval set to " << idleInterval.
count()
 
 1217               << 
"ledgerIDLE_INTERVAL: " 
 1218               << adaptor_.parms().ledgerIDLE_INTERVAL.count()
 
 1219               << 
", previous ledger close time resolution: " 
 1220               << previousLedger_.closeTimeResolution().count() << 
"ms. ";
 
 1236        CLOG(
clog) << 
"closing ledger. ";
 
 
 1241template <
class Adaptor>
 
 1246    CLOG(
clog) << 
"shouldPause? ";
 
 1247    auto const& parms = adaptor_.parms();
 
 1249        previousLedger_.seq() -
 
 1250        std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
 
 1251    auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
 
 1252    std::size_t const totalValidators = trustedKeys.size();
 
 1254        adaptor_.laggards(previousLedger_.seq(), trustedKeys);
 
 1258    vars << 
" consensuslog (working seq: " << previousLedger_.seq() << 
", " 
 1259         << 
"validated seq: " << adaptor_.getValidLedgerIndex() << 
", " 
 1260         << 
"am validator: " << adaptor_.validator() << 
", " 
 1261         << 
"have validated: " << adaptor_.haveValidated() << 
", " 
 1262         << 
"roundTime: " << result_->roundTime.
read().count() << 
", " 
 1263         << 
"max consensus time: " << parms.ledgerMAX_CONSENSUS.count() << 
", " 
 1264         << 
"validators: " << totalValidators << 
", " 
 1265         << 
"laggards: " << laggards << 
", " 
 1266         << 
"offline: " << offline << 
", " 
 1267         << 
"quorum: " << quorum << 
")";
 
 1269    if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
 
 1270        !adaptor_.haveValidated() ||
 
 1271        result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
 
 1273        j_.
debug() << 
"not pausing (early)" << vars.
str();
 
 1274        CLOG(
clog) << 
"Not pausing (early). ";
 
 1278    bool willPause = 
false;
 
 1314    std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
 
 1323            if (laggards + offline > totalValidators - quorum)
 
 1338            float const nonLaggards = totalValidators - (laggards + offline);
 
 1339            float const quorumRatio =
 
 1340                static_cast<float>(quorum) / totalValidators;
 
 1341            float const allowedDissent = 1.0f - quorumRatio;
 
 1342            float const phaseFactor = 
static_cast<float>(phase) / maxPausePhase;
 
 1344            if (nonLaggards / totalValidators <
 
 1345                quorumRatio + (allowedDissent * phaseFactor))
 
 1353        j_.
warn() << 
"pausing" << vars.
str();
 
 1354        CLOG(
clog) << 
"pausing " << vars.
str() << 
". ";
 
 1358        j_.
debug() << 
"not pausing" << vars.
str();
 
 1359        CLOG(
clog) << 
"not pausing. ";
 
 
 1364template <
class Adaptor>
 
 1369    CLOG(
clog) << 
"phaseEstablish. ";
 
 1371    XRPL_ASSERT(result_, 
"ripple::Consensus::phaseEstablish : result is set");
 
 1373    ++peerUnchangedCounter_;
 
 1374    ++establishCounter_;
 
 1379    result_->roundTime.tick(clock_.now());
 
 1380    result_->proposers = currPeerPositions_.size();
 
 1382    convergePercent_ = result_->roundTime.read() * 100 /
 
 1384    CLOG(
clog) << 
"convergePercent_ " << convergePercent_
 
 1385               << 
" is based on round duration so far: " 
 1386               << result_->roundTime.read().count() << 
"ms, " 
 1387               << 
"previous round duration: " << prevRoundTime_.count()
 
 1395        CLOG(
clog) << 
"ledgerMIN_CONSENSUS not reached: " 
 1400    updateOurPositions(
clog);
 
 1403    if (shouldPause(
clog) || !haveConsensus(
clog))
 
 1406    if (!haveCloseTimeConsensus_)
 
 1408        JLOG(j_.
info()) << 
"We have TX consensus but not CT consensus";
 
 1409        CLOG(
clog) << 
"We have TX consensus but not CT consensus. ";
 
 1413    JLOG(j_.
info()) << 
"Converge cutoff (" << currPeerPositions_.size()
 
 1414                    << 
" participants)";
 
 1415    CLOG(
clog) << 
"Converge cutoff (" << currPeerPositions_.size()
 
 1416               << 
" participants). Transitioned to ConsensusPhase::accepted. ";
 
 1417    adaptor_.updateOperatingMode(currPeerPositions_.size());
 
 1418    prevProposers_ = currPeerPositions_.size();
 
 1419    prevRoundTime_ = result_->roundTime.read();
 
 1421    JLOG(j_.
debug()) << 
"transitioned to ConsensusPhase::accepted";
 
 1429        adaptor_.validating());
 
 
 1432template <
class Adaptor>
 
 1437    XRPL_ASSERT(!result_, 
"ripple::Consensus::closeLedger : result is not set");
 
 1440    JLOG(j_.
debug()) << 
"transitioned to ConsensusPhase::establish";
 
 1441    rawCloseTimes_.self = now_;
 
 1442    peerUnchangedCounter_ = 0;
 
 1443    establishCounter_ = 0;
 
 1445    result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
 
 1446    result_->roundTime.reset(clock_.now());
 
 1449    if (acquired_.emplace(result_->txns.id(), result_->txns).second)
 
 1450        adaptor_.share(result_->txns);
 
 1452    auto const mode = mode_.get();
 
 1454        << 
"closeLedger transitioned to ConsensusPhase::establish, mode: " 
 1456        << 
", number of peer positions: " << currPeerPositions_.
size() << 
". ";
 
 1458        adaptor_.propose(result_->position);
 
 1461    for (
auto const& pit : currPeerPositions_)
 
 1463        auto const& pos = pit.second.proposal().position();
 
 1464        auto const it = acquired_.find(pos);
 
 1465        if (it != acquired_.end())
 
 1466            createDisputes(it->second, 
clog);
 
 
 1485    int result = ((participants * percent) + (percent / 2)) / 100;
 
 1487    return (result == 0) ? 1 : result;
 
 
 1490template <
class Adaptor>
 
 1497        result_, 
"ripple::Consensus::updateOurPositions : result is set");
 
 1503    CLOG(
clog) << 
"updateOurPositions. peerCutoff " << 
to_string(peerCutoff)
 
 1504               << 
", ourCutoff " << 
to_string(ourCutoff) << 
". ";
 
 1509        auto it = currPeerPositions_.
begin();
 
 1510        while (it != currPeerPositions_.end())
 
 1512            Proposal_t const& peerProp = it->second.proposal();
 
 1513            if (peerProp.
isStale(peerCutoff))
 
 1517                JLOG(j_.
warn()) << 
"Removing stale proposal from " << peerID;
 
 1518                for (
auto& dt : result_->disputes)
 
 1519                    dt.second.unVote(peerID);
 
 1520                it = currPeerPositions_.erase(it);
 
 1525                ++closeTimeVotes[asCloseTime(peerProp.
closeTime())];
 
 1537        for (
auto& [txId, dispute] : result_->disputes)
 
 1541            if (dispute.updateVote(
 
 1547                    mutableSet.
emplace(result_->txns);
 
 1549                if (dispute.getOurVote())
 
 1552                    mutableSet->insert(dispute.tx());
 
 1557                    mutableSet->erase(txId);
 
 1563            ourNewSet.
emplace(std::move(*mutableSet));
 
 1567    haveCloseTimeConsensus_ = 
false;
 
 1569    if (currPeerPositions_.empty())
 
 1572        haveCloseTimeConsensus_ = 
true;
 
 1573        consensusCloseTime = asCloseTime(result_->position.closeTime());
 
 1579            parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
 
 1581            closeTimeAvalancheState_ = *newState;
 
 1582        CLOG(
clog) << 
"neededWeight " << neededWeight << 
". ";
 
 1584        int participants = currPeerPositions_.size();
 
 1587            ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
 
 1595        int const threshConsensus =
 
 1599        ss << 
"Proposers:" << currPeerPositions_.size()
 
 1600           << 
" nw:" << neededWeight << 
" thrV:" << threshVote
 
 1601           << 
" thrC:" << threshConsensus;
 
 1605        for (
auto const& [t, v] : closeTimeVotes)
 
 1609                << 
static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << 
": " 
 1610                << t.time_since_epoch().count() << 
" has " << v << 
", " 
 1611                << threshVote << 
" required";
 
 1613            if (v >= threshVote)
 
 1616                consensusCloseTime = t;
 
 1619                if (threshVote >= threshConsensus)
 
 1620                    haveCloseTimeConsensus_ = 
true;
 
 1624        if (!haveCloseTimeConsensus_)
 
 1627                << 
"No CT consensus:" 
 1628                << 
" Proposers:" << currPeerPositions_.size()
 
 1630                << 
" Thresh:" << threshConsensus
 
 1632            CLOG(
clog) << 
"No close time consensus. ";
 
 1637        ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
 
 1638         result_->position.isStale(ourCutoff)))
 
 1641        ourNewSet.
emplace(result_->txns);
 
 1646        auto newID = ourNewSet->id();
 
 1648        result_->txns = std::move(*ourNewSet);
 
 1651        ss << 
"Position change: CTime " 
 1656        result_->position.changePosition(newID, consensusCloseTime, now_);
 
 1660        if (acquired_.emplace(newID, result_->txns).second)
 
 1662            if (!result_->position.isBowOut())
 
 1663                adaptor_.share(result_->txns);
 
 1665            for (
auto const& [nodeId, peerPos] : currPeerPositions_)
 
 1669                    updateDisputes(nodeId, result_->txns);
 
 1674        if (!result_->position.isBowOut() &&
 
 1676            adaptor_.propose(result_->position);
 
 
 1680template <
class Adaptor>
 
 1686    XRPL_ASSERT(result_, 
"ripple::Consensus::haveConsensus : has result");
 
 1689    int agree = 0, disagree = 0;
 
 1691    auto ourPosition = result_->position.position();
 
 1694    for (
auto const& [nodeId, peerPos] : currPeerPositions_)
 
 1696        Proposal_t const& peerProp = peerPos.proposal();
 
 1697        if (peerProp.
position() == ourPosition)
 
 1703            JLOG(j_.
debug()) << 
"Proposal disagreement: Peer " << nodeId
 
 1708    auto currentFinished =
 
 1709        adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
 
 1711    JLOG(j_.
debug()) << 
"Checking for TX consensus: agree=" << agree
 
 1712                     << 
", disagree=" << disagree;
 
 1718    bool const stalled = haveCloseTimeConsensus_ &&
 
 1719        !result_->disputes.empty() &&
 
 1721                            [
this, &parms, &
clog](
auto const& dispute) {
 
 1722                                return dispute.second.stalled(
 
 1724                                    mode_.get() == ConsensusMode::proposing,
 
 1725                                    peerUnchangedCounter_,
 
 1732        ss << 
"Consensus detects as stalled with " << (agree + disagree) << 
"/" 
 1733           << prevProposers_ << 
" proposers, and " << result_->disputes.size()
 
 1734           << 
" stalled disputed transactions.";
 
 1746        result_->roundTime.read(),
 
 1755        CLOG(
clog) << 
"No consensus. ";
 
 1762        static auto const minimumCounter =
 
 1765        if (establishCounter_ < minimumCounter)
 
 1774            ss << 
"Consensus time has expired in round " << establishCounter_
 
 1775               << 
"; continue until round " << minimumCounter << 
". " 
 1778            CLOG(
clog) << ss.
str() << 
". ";
 
 1783        CLOG(
clog) << ss.
str() << 
". ";
 
 1784        leaveConsensus(
clog);
 
 1790        JLOG(j_.
error()) << 
"Unable to reach consensus";
 
 1792        CLOG(
clog) << 
"Unable to reach consensus " 
 1796    CLOG(
clog) << 
"Consensus has been reached. ";
 
 
 1800template <
class Adaptor>
 
 1807        if (result_ && !result_->position.isBowOut())
 
 1809            result_->position.bowOut(now_);
 
 1810            adaptor_.propose(result_->position);
 
 1814        JLOG(j_.
info()) << 
"Bowing out of consensus";
 
 1815        CLOG(
clog) << 
"Bowing out of consensus. ";
 
 
 1819template <
class Adaptor>
 
 1826    XRPL_ASSERT(result_, 
"ripple::Consensus::createDisputes : result is set");
 
 1829    auto const emplaced = result_->compares.emplace(o.id()).second;
 
 1830    CLOG(
clog) << 
"createDisputes: new set? " << !emplaced << 
". ";
 
 1835    if (result_->txns.id() == o.id())
 
 1837        CLOG(
clog) << 
"both sets are identical. ";
 
 1841    CLOG(
clog) << 
"comparing existing with new set: " << result_->txns.id()
 
 1842               << 
',' << o.id() << 
". ";
 
 1843    JLOG(j_.
debug()) << 
"createDisputes " << result_->txns.id() << 
" to " 
 1846    auto differences = result_->txns.compare(o);
 
 1850    for (
auto const& [txId, inThisSet] : differences)
 
 1855            (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
 
 1856                (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
 
 1857            "ripple::Consensus::createDisputes : has disputed transactions");
 
 1859        Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
 
 1860        auto txID = tx.id();
 
 1862        if (result_->disputes.find(txID) != result_->disputes.end())
 
 1865        JLOG(j_.
debug()) << 
"Transaction " << txID << 
" is disputed";
 
 1869            result_->txns.exists(txID),
 
 1870            std::max(prevProposers_, currPeerPositions_.size()),
 
 1874        for (
auto const& [nodeId, peerPos] : currPeerPositions_)
 
 1876            Proposal_t const& peerProp = peerPos.proposal();
 
 1877            auto const cit = acquired_.find(peerProp.
position());
 
 1878            if (cit != acquired_.end() &&
 
 1879                dtx.setVote(nodeId, cit->second.exists(txID)))
 
 1880                peerUnchangedCounter_ = 0;
 
 1882        adaptor_.share(dtx.tx());
 
 1884        result_->disputes.emplace(txID, std::move(dtx));
 
 1886    JLOG(j_.
debug()) << dc << 
" differences found";
 
 1887    CLOG(
clog) << 
"disputes: " << dc << 
". ";
 
 
 1890template <
class Adaptor>
 
 1895    XRPL_ASSERT(result_, 
"ripple::Consensus::updateDisputes : result is set");
 
 1899    if (result_->compares.find(other.id()) == result_->compares.end())
 
 1900        createDisputes(other);
 
 1902    for (
auto& it : result_->disputes)
 
 1904        auto& d = it.second;
 
 1905        if (d.setVote(node, other.exists(d.tx().id())))
 
 1906            peerUnchangedCounter_ = 0;
 
 
 1910template <
class Adaptor>
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
Decorator for streaming out compact json.
 
Value & append(Value const &value)
Append value to array at the end.
 
A generic endpoint for log messages.
 
Stream trace() const
Severity stream access functions.
 
NodeID_t const & nodeID() const
Identifying which peer took this position.
 
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
 
Position_t const & position() const
Get the proposed position.
 
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
 
Measures the duration of phases of consensus.
 
void set(ConsensusMode mode, Adaptor &a)
 
MonitoredMode(ConsensusMode m)
 
ConsensusMode get() const
 
Generic implementation of consensus algorithm.
 
hash_map< typename TxSet_t::ID, TxSet_t const  > acquired_
 
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
 
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
 
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
 
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
 
typename Adaptor::PeerPosition_t PeerPosition_t
 
NetClock::time_point prevCloseTime_
 
clock_type const  & clock_
 
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
 
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
 
typename Adaptor::TxSet_t TxSet_t
 
ConsensusParms::AvalancheState closeTimeAvalancheState_
 
std::size_t establishCounter_
 
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
 
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
 
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
 
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
 
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
 
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
 
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
 
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
 
typename TxSet_t::Tx Tx_t
 
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
 
Consensus(Consensus &&) noexcept=default
 
NetClock::time_point now_
 
std::size_t prevProposers_
 
NetClock::time_point asCloseTime(NetClock::time_point raw) const
 
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
 
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
 
typename Adaptor::Ledger_t Ledger_t
 
ConsensusPhase phase() const
 
std::size_t peerUnchangedCounter_
 
typename Adaptor::NodeID_t NodeID_t
 
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
 
NetClock::duration closeResolution_
 
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
 
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
 
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
 
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
 
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
 
ConsensusCloseTimes rawCloseTimes_
 
std::chrono::milliseconds prevRoundTime_
 
std::optional< Result > result_
 
hash_set< NodeID_t > deadNodes_
 
Ledger_t::ID prevLedgerID_
 
bool haveCloseTimeConsensus_
 
A transaction discovered to be in dispute during consensus.
 
@ arrayValue
array value (ordered list)
 
@ objectValue
object value (collection of name/value pairs).
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
 
ConsensusMode
Represents how a node currently participates in Consensus.
 
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
 
@ proposing
We are normal participant in consensus and propose our position.
 
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
 
@ observing
We are observing peer positions, but not proposing our position.
 
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
 
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
 
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
 
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
 
ConsensusPhase
Phases of consensus for a single ledger round.
 
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
 
@ open
We haven't closed our ledger yet, but others might have.
 
@ establish
Establishing consensus by exchanging proposals with our peers.
 
ConsensusState
Whether we have or don't have a consensus.
 
@ Expired
Consensus time limit has hard-expired.
 
@ MovedOn
The network has consensus without us.
 
@ No
We do not have consensus.
 
std::string to_string(base_uint< Bits, Tag > const &a)
 
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
 
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
 
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
 
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
 
Stores the set of initial close times.
 
Consensus algorithm parameters.
 
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
 
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
 
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
 
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
 
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
 
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
 
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
 
Encapsulates the result of consensus.
 
T time_since_epoch(T... args)