1#ifndef XRPL_TEST_CSF_PEER_H_INCLUDED
2#define XRPL_TEST_CSF_PEER_H_INCLUDED
4#include <test/csf/CollectorRef.h>
5#include <test/csf/Scheduler.h>
6#include <test/csf/TrustGraph.h>
7#include <test/csf/Tx.h>
8#include <test/csf/Validation.h>
9#include <test/csf/events.h>
10#include <test/csf/ledgers.h>
12#include <xrpld/consensus/Consensus.h>
13#include <xrpld/consensus/Validations.h>
15#include <xrpl/beast/utility/WrappedSink.h>
16#include <xrpl/protocol/PublicKey.h>
18#include <boost/container/flat_map.hpp>
19#include <boost/container/flat_set.hpp>
27namespace bc = boost::container;
296 using namespace std::chrono_literals;
343 for (
auto const p :
trustGraph.trustedPeers(
this))
361 return net.connect(
this, &o, dur);
374 return net.disconnect(
this, &o);
386 return &(it->second);
390 if (
net.links(
this).empty())
401 using namespace std::chrono_literals;
403 for (
auto const link :
net.links(
this))
405 minDuration =
std::min(minDuration, link.data.delay);
409 this, link.target, [to = link.target, from =
this, ledgerID]() {
410 if (auto it = to->ledgers.find(ledgerID);
411 it != to->ledgers.end())
416 to->net.send(to, from, [from, ledger = it->second]() {
417 from->acquiringLedgers.erase(ledger.id());
418 from->ledgers.emplace(ledger.id(), ledger);
431 if (
auto it = txSets.find(setId); it != txSets.end())
433 return &(it->second);
437 if (net.
links(
this).empty())
441 auto aIt = acquiringTxSets.
find(setId);
442 if (aIt != acquiringTxSets.end())
444 if (scheduler.
now() < aIt->second)
448 using namespace std::chrono_literals;
450 for (
auto const link : net.
links(
this))
452 minDuration =
std::min(minDuration, link.data.delay);
455 this, link.target, [to = link.target, from =
this, setId]() {
456 if (auto it = to->txSets.find(setId);
457 it != to->txSets.end())
462 to->net.send(to, from, [from, txSet = it->second]() {
463 from->acquiringTxSets.erase(txSet.id());
469 acquiringTxSets[setId] = scheduler.
now() + 2 * minDuration;
476 return !openTxs.empty();
504 TxSet::calcID(openTxs),
525 std::move(consensusJson),
537 bool const validating)
540 bool const proposing = mode == ConsensusMode::proposing;
541 bool const consensusFail = result.state == ConsensusState::MovedOn;
543 TxSet const acceptedTxs = injectTxs(prevLedger, result.txns);
544 Ledger const newLedger = oracle.accept(
548 result.position.closeTime());
549 ledgers[newLedger.id()] = newLedger;
551 issue(AcceptLedger{newLedger, lastClosedLedger});
554 lastClosedLedger = newLedger;
557 openTxs.begin(), openTxs.end(), [&](
Tx const& tx) {
558 return acceptedTxs.exists(tx.id());
560 openTxs.erase(it, openTxs.end());
564 bool const isCompatible =
565 newLedger.isAncestor(fullyValidatedLedger);
568 if (runAsValidator && isCompatible && !consensusFail &&
584 addTrustedValidation(v);
587 checkFullyValidated(newLedger);
595 if (completedLedgers <= targetLedgers)
607 return fullyValidatedLedger.seq();
621 validations.getPreferred(ledger, earliestAllowedSeq());
623 if (netLgr != ledgerID)
641 return consensusParms;
682 checkFullyValidated(*lgr);
691 if (ledger.
seq() <= fullyValidatedLedger.seq())
694 std::size_t const count = validations.numTrustedForLedger(ledger.
id());
695 std::size_t const numTrustedPeers = trustGraph.graph().outDegree(
this);
697 if (count >= quorum && ledger.
isAncestor(fullyValidatedLedger))
700 fullyValidatedLedger = ledger;
743 for (
auto const link : net.links(
this))
745 if (link.target->id != from && link.target->id != bm.
origin)
749 if (link.target->router.lastObservedSeq[bm.
origin] < bm.
seq)
755 [to = link.target, bm,
id = this->id] {
772 schedule(delays.onReceive(bm.
mesg), [
this, bm, from] {
786 return p.
prevLedger() == lastClosedLedger.id();
791 if (
std::find(dest.begin(), dest.end(), p) != dest.end())
797 return consensus.peerProposal(now(),
Position{p});
803 bool const inserted =
806 consensus.gotTxSet(now(), txs);
815 TxSetType const& lastClosedTxs = lastClosedLedger.txs();
816 if (lastClosedTxs.find(tx) != lastClosedTxs.end())
820 return openTxs.insert(tx).second;
831 return addTrustedValidation(v);
837 return fullyValidatedLedger.seq() >
Ledger::Seq{0};
843 return earliestAllowedSeq();
850 for (
auto const p : trustGraph.trustedPeers(
this))
852 return {quorum, keys};
858 return validations.laggards(
seq, trusted);
864 return runAsValidator;
896 consensus.timerEntry(now());
898 if (completedLedgers < targetLedgers)
899 scheduler.in(parms().ledgerGRANULARITY, [
this]() { timerEntry(); });
910 validations.getPreferred(lastClosedLedger, earliestAllowedSeq());
912 bestLCL = lastClosedLedger.id();
918 consensus.startRound(
919 now(), bestLCL, lastClosedLedger, nowUntrusted, runAsValidator, {});
928 validations.expire(j);
929 scheduler.in(parms().ledgerGRANULARITY, [&]() { timerEntry(); });
941 using namespace std::chrono_literals;
943 scheduler.now().time_since_epoch() + 86400s + clockSkew));
949 return consensus.prevLedgerID();
973 auto const it = txInjections.
find(prevLedger.
seq());
975 if (it == txInjections.
end())
978 res.insert(it->second);
Decorator for streaming out compact json.
A generic endpoint for log messages.
Wraps a Journal::Sink to prefix its output with a string.
LedgerID_t const & prevLedger() const
Get the prior accepted ledger this position is based on.
NodeID_t const & nodeID() const
Identifying which peer took this position.
Json::Value getJson() const
Get JSON representation for debugging.
std::chrono::milliseconds read() const
Generic implementation of consensus algorithm.
std::chrono::time_point< NetClock > time_point
Maintains current and recent ledger validations.
std::size_t numTrustedForLedger(ID const &ledgerID)
Count the number of trusted full validations for the given ledger.
std::size_t getNodesAfter(Ledger const &ledger, ID const &ledgerID)
Count the number of current trusted validators working on a ledger after the specified one.
bool canValidateSeq(Seq const s)
Return whether the local node can issue a validation for the given sequence number.
Peer to peer network simulator.
void send(Peer const &from, Peer const &to, Function &&f)
Send a message to a peer.
auto links(Peer const &from)
Return the range of active links.
A container of CollectorRefs.
void on(PeerID node, SimTime when, E const &e)
Oracle maintaining unique ledgers for a simulation.
A ledger is a set of observed transactions and a sequence number identifying the ledger.
bool isAncestor(Ledger const &ancestor) const
Determine whether ancestor is really an ancestor of this ledger.
Basic wrapper of a proposed position taken by a peer.
Position(Proposal const &p)
Json::Value getJson() const
Proposal const & proposal() const
std::string render() const
Generic Validations adaptor that simply ignores recently stale validations.
std::optional< Ledger > acquire(Ledger::ID const &lId)
NetClock::time_point now() const
Simulated discrete-event scheduler.
time_point now() const
Return the current network time.
cancel_token in(duration const &delay, Function &&f)
Schedule an event after a specified duration passes.
TxSet is a set of transactions to consider including in the ledger.
TxSetType const & txs() const
beast::uhash<>::result_type ID
Tx const * find(Tx::ID const &txId) const
Validation of a specific ledger by a specific Peer.
Ledger::ID ledgerID() const
void setSeen(NetClock::time_point seen)
PeerID const & nodeID() const
std::pair< PeerID, std::uint32_t > PeerKey
The current key of a peer.
tagged_integer< std::uint32_t, PeerIDTag > PeerID
typename SimClock::duration SimDuration
boost::container::flat_set< Tx > TxSetType
std::string to_string(TxSetType const &txs)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
ConsensusMode
Represents how a node currently participates in Consensus.
@ proposing
We are normal participant in consensus and propose our position.
boost::outcome_v2::result< T, std::error_code > Result
ValStatus
Status of validation we received.
@ stale
Not current or was older than current from this node.
Stores the set of initial close times.
Consensus algorithm parameters.
Encapsulates the result of consensus.
Timing parameters to control validation staleness and expiration.
Peer closed the open ledger.
Peer fully validated a new ledger.
Simulated delays in internal peer processing.
SimDuration onReceive(M const &) const
std::chrono::milliseconds ledgerAccept
Delay in consensus calling doAccept to accepting and issuing validation TODO: This should be a functi...
std::chrono::milliseconds recvValidation
Delay in processing validations from remote peers.
SimDuration onReceive(Validation const &) const
bc::flat_map< PeerID, std::size_t > lastObservedSeq
A single peer in the simulation.
bool handle(TxSet const &txs)
bc::flat_map< Ledger::ID, std::vector< Proposal > > peerPositions
Map from Ledger::ID to vector of Positions with that ledger as the prior ledger.
void propose(Proposal const &pos)
bool addTrustedValidation(Validation v)
Add a trusted validation and return true if it is worth forwarding.
bool hasOpenTransactions() const
bool handle(Validation const &v)
Ledger const * acquireLedger(Ledger::ID const &ledgerID)
Consensus< Peer > consensus
Generic consensus.
void receive(BroadcastMesg< M > const &bm, PeerID from)
BasicNetwork< Peer * > & net
Handle to network for sending messages.
void onAccept(Result const &result, Ledger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson, bool const validating)
void checkFullyValidated(Ledger const &ledger)
Check if a new ledger can be deemed fully validated.
hash_set< NodeKey_t > trustedKeys
std::pair< std::size_t, hash_set< NodeKey_t > > getQuorumKeys()
hash_map< Ledger::ID, Ledger > ledgers
Ledgers this node has closed or loaded from the network.
Ledger::Seq earliestAllowedSeq() const
TrustGraph< Peer * > & trustGraph
Handle to Trust graph of network.
void send(BroadcastMesg< M > const &bm, PeerID from)
LedgerOracle & oracle
The oracle that manages unique ledgers.
TxSetType openTxs
openTxs that haven't been closed in a ledger yet
bool connect(Peer &o, SimDuration dur)
Create network connection.
void timerEntry()
Heartbeat timer call.
Ledger lastClosedLedger
The last ledger closed by this node.
std::size_t laggards(Ledger::Seq const seq, hash_set< NodeKey_t > &trusted)
PeerKey key
Current signing key.
hash_map< Ledger::Seq, Tx > txInjections
CollectorRefs & collectors
The collectors to report events to.
Result onClose(Ledger const &prevLedger, NetClock::time_point closeTime, ConsensusMode mode)
ConsensusParms const & parms() const
void issue(E const &event)
void updateOperatingMode(std::size_t const positions) const
bc::flat_map< Ledger::ID, SimTime > acquiringLedgers
void onForceAccept(Result const &result, Ledger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson)
ProcessingDelays delays
Simulated delays to use for internal processing.
std::size_t proposersFinished(Ledger const &prevLedger, Ledger::ID const &prevLedgerID)
Ledger fullyValidatedLedger
The most recent ledger that has been fully validated by the network from the perspective of this Peer...
bc::flat_map< TxSet::ID, TxSet > txSets
TxSet associated with a TxSet::ID.
Peer(PeerID i, Scheduler &s, LedgerOracle &o, BasicNetwork< Peer * > &n, TrustGraph< Peer * > &tg, CollectorRefs &c, beast::Journal jIn)
Constructor.
TxSet const * acquireTxSet(TxSet::ID const &setId)
bc::flat_map< TxSet::ID, SimTime > acquiringTxSets
bool trusts(PeerID const &oId)
Ledger::ID prevLedgerID() const
Scheduler & scheduler
Scheduler of events.
int targetLedgers
The number of ledgers this peer should complete before stopping to run.
ConsensusParms consensusParms
beast::WrappedSink sink
Logging support that prefixes messages with the peer ID.
std::size_t prevProposers
bool haveValidated() const
bool disconnect(Peer &o)
Remove a network connection.
std::chrono::seconds clockSkew
Skew of time relative to the common scheduler clock.
std::size_t proposersValidated(Ledger::ID const &prevLedger)
Ledger::ID getPrevLedger(Ledger::ID const &ledgerID, Ledger const &ledger, ConsensusMode mode)
bool handle(Proposal const &p)
void onModeChange(ConsensusMode, ConsensusMode)
int completedLedgers
The number of ledgers this peer has completed.
Validations< ValAdaptor > validations
Validations from trusted nodes.
TxSet injectTxs(Ledger prevLedger, TxSet const &src)
Inject non-consensus Tx.
bool runAsValidator
Whether to simulate running as validator or a tracking node.
bool handle(Tx const &tx)
void schedule(std::chrono::nanoseconds when, T &&what)
Schedule the provided callback in when duration, but if when is 0, call immediately.
void share(Position const &p)
std::chrono::milliseconds prevRoundTime
Ledger::Seq getValidLedgerIndex() const
void submit(Tx const &tx)
NetClock::time_point now() const
A value received from another peer as part of flooding.
A value relayed to another peer as part of flooding.
A value to be flooded to all other peers starting from this peer.
Peer starts a new consensus round.
A transaction submitted to a peer.
Peer detected a wrong prior ledger during consensus.
Set the sequence number on a JTx.