3#include <test/csf/CollectorRef.h>
4#include <test/csf/Scheduler.h>
5#include <test/csf/TrustGraph.h>
6#include <test/csf/Tx.h>
7#include <test/csf/Validation.h>
8#include <test/csf/events.h>
9#include <test/csf/ledgers.h>
11#include <xrpld/consensus/Consensus.h>
12#include <xrpld/consensus/Validations.h>
14#include <xrpl/beast/utility/WrappedSink.h>
15#include <xrpl/protocol/PublicKey.h>
17#include <boost/container/flat_map.hpp>
18#include <boost/container/flat_set.hpp>
26namespace bc = boost::container;
295 using namespace std::chrono_literals;
342 for (
auto const p :
trustGraph.trustedPeers(
this))
360 return net.connect(
this, &o, dur);
373 return net.disconnect(
this, &o);
385 return &(it->second);
389 if (
net.links(
this).empty())
400 using namespace std::chrono_literals;
402 for (
auto const link :
net.links(
this))
404 minDuration =
std::min(minDuration, link.data.delay);
407 net.send(
this, link.target, [to = link.target, from =
this, ledgerID]() {
408 if (auto it = to->ledgers.find(ledgerID); it != to->ledgers.end())
413 to->net.send(to, from, [from, ledger = it->second]() {
414 from->acquiringLedgers.erase(ledger.id());
415 from->ledgers.emplace(ledger.id(), ledger);
428 if (
auto it = txSets.find(setId); it != txSets.end())
430 return &(it->second);
434 if (net.
links(
this).empty())
438 auto aIt = acquiringTxSets.
find(setId);
439 if (aIt != acquiringTxSets.end())
441 if (scheduler.
now() < aIt->second)
445 using namespace std::chrono_literals;
447 for (
auto const link : net.
links(
this))
449 minDuration =
std::min(minDuration, link.data.delay);
451 net.
send(
this, link.target, [to = link.target, from =
this, setId]() {
452 if (auto it = to->txSets.find(setId); it != to->txSets.end())
457 to->net.send(to, from, [from, txSet = it->second]() {
458 from->acquiringTxSets.erase(txSet.id());
464 acquiringTxSets[setId] = scheduler.
now() + 2 * minDuration;
471 return !openTxs.empty();
492 TxSet{openTxs},
Proposal(prevLedger.
id(), Proposal::seqJoin, TxSet::calcID(openTxs), closeTime, now(),
id));
504 onAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(consensusJson), validating());
515 bool const validating)
518 bool const proposing = mode == ConsensusMode::proposing;
519 bool const consensusFail = result.state == ConsensusState::MovedOn;
521 TxSet const acceptedTxs = injectTxs(prevLedger, result.txns);
522 Ledger const newLedger =
523 oracle.accept(prevLedger, acceptedTxs.txs(), closeResolution, result.position.closeTime());
524 ledgers[newLedger.id()] = newLedger;
526 issue(AcceptLedger{newLedger, lastClosedLedger});
529 lastClosedLedger = newLedger;
532 openTxs.begin(), openTxs.end(), [&](
Tx const& tx) { return acceptedTxs.exists(tx.id()); });
533 openTxs.erase(it, openTxs.end());
537 bool const isCompatible = newLedger.isAncestor(fullyValidatedLedger);
540 if (runAsValidator && isCompatible && !consensusFail && validations.
canValidateSeq(newLedger.seq()))
544 Validation v{newLedger.id(), newLedger.
seq(), now(), now(), key, id, isFull};
548 addTrustedValidation(v);
551 checkFullyValidated(newLedger);
559 if (completedLedgers <= targetLedgers)
571 return fullyValidatedLedger.seq();
581 Ledger::ID const netLgr = validations.getPreferred(ledger, earliestAllowedSeq());
583 if (netLgr != ledgerID)
601 return consensusParms;
642 checkFullyValidated(*lgr);
651 if (ledger.
seq() <= fullyValidatedLedger.seq())
654 std::size_t const count = validations.numTrustedForLedger(ledger.
id());
655 std::size_t const numTrustedPeers = trustGraph.graph().outDegree(
this);
657 if (count >= quorum && ledger.
isAncestor(fullyValidatedLedger))
660 fullyValidatedLedger = ledger;
703 for (
auto const link : net.links(
this))
705 if (link.target->id != from && link.target->id != bm.
origin)
709 if (link.target->router.lastObservedSeq[bm.
origin] < bm.
seq)
712 net.send(
this, link.target, [to = link.target, bm,
id = this->id] { to->receive(bm, id); });
727 schedule(delays.onReceive(bm.
msg), [
this, bm, from] {
741 return p.
prevLedger() == lastClosedLedger.id();
746 if (
std::find(dest.begin(), dest.end(), p) != dest.end())
752 return consensus.peerProposal(now(),
Position{p});
760 consensus.gotTxSet(now(), txs);
769 TxSetType const& lastClosedTxs = lastClosedLedger.txs();
770 if (lastClosedTxs.find(tx) != lastClosedTxs.end())
774 return openTxs.insert(tx).second;
785 return addTrustedValidation(v);
791 return fullyValidatedLedger.seq() >
Ledger::Seq{0};
797 return earliestAllowedSeq();
804 for (
auto const p : trustGraph.trustedPeers(
this))
806 return {quorum, keys};
812 return validations.laggards(
seq, trusted);
818 return runAsValidator;
850 consensus.timerEntry(now());
852 if (completedLedgers < targetLedgers)
853 scheduler.in(parms().ledgerGRANULARITY, [
this]() { timerEntry(); });
863 Ledger::ID bestLCL = validations.getPreferred(lastClosedLedger, earliestAllowedSeq());
865 bestLCL = lastClosedLedger.id();
871 consensus.startRound(now(), bestLCL, lastClosedLedger, nowUntrusted, runAsValidator, {});
880 validations.expire(j);
881 scheduler.in(parms().ledgerGRANULARITY, [&]() { timerEntry(); });
893 using namespace std::chrono_literals;
895 duration_cast<NetClock::duration>(scheduler.now().time_since_epoch() + 86400s + clockSkew));
901 return consensus.prevLedgerID();
925 auto const it = txInjections.
find(prevLedger.
seq());
927 if (it == txInjections.
end())
930 res.insert(it->second);
Decorator for streaming out compact json.
A generic endpoint for log messages.
Wraps a Journal::Sink to prefix its output with a string.
LedgerID_t const & prevLedger() const
Get the prior accepted ledger this position is based on.
Json::Value getJson() const
Get JSON representation for debugging.
NodeID_t const & nodeID() const
Identifying which peer took this position.
std::chrono::milliseconds read() const
Generic implementation of consensus algorithm.
std::chrono::time_point< NetClock > time_point
Maintains current and recent ledger validations.
std::size_t getNodesAfter(Ledger const &ledger, ID const &ledgerID)
Count the number of current trusted validators working on a ledger after the specified one.
bool canValidateSeq(Seq const s)
Return whether the local node can issue a validation for the given sequence number.
std::size_t numTrustedForLedger(ID const &ledgerID)
Count the number of trusted full validations for the given ledger.
Peer to peer network simulator.
auto links(Peer const &from)
Return the range of active links.
void send(Peer const &from, Peer const &to, Function &&f)
Send a message to a peer.
A container of CollectorRefs.
void on(PeerID node, SimTime when, E const &e)
Oracle maintaining unique ledgers for a simulation.
A ledger is a set of observed transactions and a sequence number identifying the ledger.
bool isAncestor(Ledger const &ancestor) const
Determine whether ancestor is really an ancestor of this ledger.
Basic wrapper of a proposed position taken by a peer.
std::string render() const
Position(Proposal const &p)
Proposal const & proposal() const
Json::Value getJson() const
Generic Validations adaptor that simply ignores recently stale validations.
NetClock::time_point now() const
std::optional< Ledger > acquire(Ledger::ID const &lId)
Simulated discrete-event scheduler.
cancel_token in(duration const &delay, Function &&f)
Schedule an event after a specified duration passes.
time_point now() const
Return the current network time.
TxSet is a set of transactions to consider including in the ledger.
Tx const * find(Tx::ID const &txId) const
TxSetType const & txs() const
beast::uhash<>::result_type ID
Validation of a specific ledger by a specific Peer.
void setSeen(NetClock::time_point seen)
PeerID const & nodeID() const
Ledger::ID ledgerID() const
typename SimClock::duration SimDuration
std::string to_string(TxSetType const &txs)
boost::container::flat_set< Tx > TxSetType
std::pair< PeerID, std::uint32_t > PeerKey
The current key of a peer.
tagged_integer< std::uint32_t, PeerIDTag > PeerID
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
ConsensusMode
Represents how a node currently participates in Consensus.
@ proposing
We are normal participant in consensus and propose our position.
ValStatus
Status of validation we received.
@ stale
Not current or was older than current from this node.
boost::outcome_v2::result< T, std::error_code > Result
Stores the set of initial close times.
Consensus algorithm parameters.
Encapsulates the result of consensus.
Timing parameters to control validation staleness and expiration.
Peer closed the open ledger.
Peer fully validated a new ledger.
Simulated delays in internal peer processing.
std::chrono::milliseconds recvValidation
Delay in processing validations from remote peers.
SimDuration onReceive(Validation const &) const
std::chrono::milliseconds ledgerAccept
Delay in consensus calling doAccept to accepting and issuing validation TODO: This should be a functi...
SimDuration onReceive(M const &) const
bc::flat_map< PeerID, std::size_t > lastObservedSeq
A single peer in the simulation.
Result onClose(Ledger const &prevLedger, NetClock::time_point closeTime, ConsensusMode mode)
std::chrono::seconds clockSkew
Skew of time relative to the common scheduler clock.
void propose(Proposal const &pos)
Ledger::ID prevLedgerID() const
TxSetType openTxs
openTxs that haven't been closed in a ledger yet
void updateOperatingMode(std::size_t const positions) const
void receive(BroadcastMesg< M > const &bm, PeerID from)
ConsensusParms const & parms() const
Ledger::Seq getValidLedgerIndex() const
bc::flat_map< TxSet::ID, TxSet > txSets
TxSet associated with a TxSet::ID.
BasicNetwork< Peer * > & net
Handle to network for sending messages.
std::size_t prevProposers
int completedLedgers
The number of ledgers this peer has completed.
hash_set< NodeKey_t > trustedKeys
hash_map< Ledger::ID, Ledger > ledgers
Ledgers this node has closed or loaded from the network.
bool haveValidated() const
Peer(PeerID i, Scheduler &s, LedgerOracle &o, BasicNetwork< Peer * > &n, TrustGraph< Peer * > &tg, CollectorRefs &c, beast::Journal jIn)
Constructor.
bool runAsValidator
Whether to simulate running as validator or a tracking node.
hash_map< Ledger::Seq, Tx > txInjections
TxSet const * acquireTxSet(TxSet::ID const &setId)
ConsensusParms consensusParms
CollectorRefs & collectors
The collectors to report events to.
bool trusts(PeerID const &oId)
void onAccept(Result const &result, Ledger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson, bool const validating)
TxSet injectTxs(Ledger prevLedger, TxSet const &src)
Inject non-consensus Tx.
Ledger lastClosedLedger
The last ledger closed by this node.
LedgerOracle & oracle
The oracle that manages unique ledgers.
std::pair< std::size_t, hash_set< NodeKey_t > > getQuorumKeys()
Ledger const * acquireLedger(Ledger::ID const &ledgerID)
bc::flat_map< TxSet::ID, SimTime > acquiringTxSets
Ledger::ID getPrevLedger(Ledger::ID const &ledgerID, Ledger const &ledger, ConsensusMode mode)
Consensus< Peer > consensus
Generic consensus.
bool connect(Peer &o, SimDuration dur)
Create network connection.
void onForceAccept(Result const &result, Ledger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson)
bool handle(Validation const &v)
void send(BroadcastMesg< M > const &bm, PeerID from)
void issue(E const &event)
bool handle(TxSet const &txs)
void checkFullyValidated(Ledger const &ledger)
Check if a new ledger can be deemed fully validated.
bc::flat_map< Ledger::ID, SimTime > acquiringLedgers
void onModeChange(ConsensusMode, ConsensusMode)
NetClock::time_point now() const
TrustGraph< Peer * > & trustGraph
Handle to Trust graph of network.
std::size_t laggards(Ledger::Seq const seq, hash_set< NodeKey_t > &trusted)
bool handle(Proposal const &p)
void timerEntry()
Heartbeat timer call.
bc::flat_map< Ledger::ID, std::vector< Proposal > > peerPositions
Map from Ledger::ID to vector of Positions with that ledger as the prior ledger.
Validations< ValAdaptor > validations
Validations from trusted nodes.
bool addTrustedValidation(Validation v)
Add a trusted validation and return true if it is worth forwarding.
bool handle(Tx const &tx)
PeerKey key
Current signing key.
void schedule(std::chrono::nanoseconds when, T &&what)
Schedule the provided callback in when duration, but if when is 0, call immediately.
Scheduler & scheduler
Scheduler of events.
ProcessingDelays delays
Simulated delays to use for internal processing.
void share(Position const &p)
Ledger fullyValidatedLedger
The most recent ledger that has been fully validated by the network from the perspective of this Peer...
bool hasOpenTransactions() const
beast::WrappedSink sink
Logging support that prefixes messages with the peer ID.
int targetLedgers
The number of ledgers this peer should complete before stopping to run.
std::size_t proposersFinished(Ledger const &prevLedger, Ledger::ID const &prevLedgerID)
std::size_t proposersValidated(Ledger::ID const &prevLedger)
bool disconnect(Peer &o)
Remove a network connection.
Ledger::Seq earliestAllowedSeq() const
std::chrono::milliseconds prevRoundTime
void submit(Tx const &tx)
A value received from another peer as part of flooding.
A value relayed to another peer as part of flooding.
A value to be flooded to all other peers starting from this peer.
Peer starts a new consensus round.
A transaction submitted to a peer.
Peer detected a wrong prior ledger during consensus.
Set the sequence number on a JTx.