1#ifndef XRPL_TEST_CSF_PEER_H_INCLUDED
2#define XRPL_TEST_CSF_PEER_H_INCLUDED
4#include <test/csf/CollectorRef.h>
5#include <test/csf/Scheduler.h>
6#include <test/csf/TrustGraph.h>
7#include <test/csf/Tx.h>
8#include <test/csf/Validation.h>
9#include <test/csf/events.h>
10#include <test/csf/ledgers.h>
12#include <xrpld/consensus/Consensus.h>
13#include <xrpld/consensus/Validations.h>
15#include <xrpl/beast/utility/WrappedSink.h>
16#include <xrpl/protocol/PublicKey.h>
18#include <boost/container/flat_map.hpp>
19#include <boost/container/flat_set.hpp>
27namespace bc = boost::container;
296 using namespace std::chrono_literals;
343 for (
auto const p :
trustGraph.trustedPeers(
this))
361 return net.connect(
this, &o, dur);
374 return net.disconnect(
this, &o);
386 return &(it->second);
390 if (
net.links(
this).empty())
401 using namespace std::chrono_literals;
403 for (
auto const link :
net.links(
this))
405 minDuration =
std::min(minDuration, link.data.delay);
409 this, link.target, [to = link.target, from =
this, ledgerID]() {
410 if (auto it = to->ledgers.find(ledgerID);
411 it != to->ledgers.end())
416 to->net.send(to, from, [from, ledger = it->second]() {
417 from->acquiringLedgers.erase(ledger.id());
418 from->ledgers.emplace(ledger.id(), ledger);
431 if (
auto it = txSets.find(setId); it != txSets.end())
433 return &(it->second);
437 if (net.
links(
this).empty())
441 auto aIt = acquiringTxSets.
find(setId);
442 if (aIt != acquiringTxSets.end())
444 if (scheduler.
now() < aIt->second)
448 using namespace std::chrono_literals;
450 for (
auto const link : net.
links(
this))
452 minDuration =
std::min(minDuration, link.data.delay);
455 this, link.target, [to = link.target, from =
this, setId]() {
456 if (auto it = to->txSets.find(setId);
457 it != to->txSets.end())
462 to->net.send(to, from, [from, txSet = it->second]() {
463 from->acquiringTxSets.erase(txSet.id());
469 acquiringTxSets[setId] = scheduler.
now() + 2 * minDuration;
476 return !openTxs.empty();
504 TxSet::calcID(openTxs),
525 std::move(consensusJson),
537 bool const validating)
540 bool const proposing = mode == ConsensusMode::proposing;
541 bool const consensusFail = result.state == ConsensusState::MovedOn;
543 TxSet const acceptedTxs = injectTxs(prevLedger, result.txns);
544 Ledger const newLedger = oracle.accept(
548 result.position.closeTime());
549 ledgers[newLedger.id()] = newLedger;
551 issue(AcceptLedger{newLedger, lastClosedLedger});
554 lastClosedLedger = newLedger;
557 openTxs.begin(), openTxs.end(), [&](
Tx const& tx) {
558 return acceptedTxs.exists(tx.id());
560 openTxs.erase(it, openTxs.end());
564 bool const isCompatible =
565 newLedger.isAncestor(fullyValidatedLedger);
568 if (runAsValidator && isCompatible && !consensusFail &&
584 addTrustedValidation(v);
587 checkFullyValidated(newLedger);
595 if (completedLedgers <= targetLedgers)
607 return fullyValidatedLedger.seq();
621 validations.getPreferred(ledger, earliestAllowedSeq());
623 if (netLgr != ledgerID)
641 return consensusParms;
682 checkFullyValidated(*lgr);
691 if (ledger.
seq() <= fullyValidatedLedger.seq())
694 std::size_t const count = validations.numTrustedForLedger(ledger.
id());
695 std::size_t const numTrustedPeers = trustGraph.graph().outDegree(
this);
697 if (count >= quorum && ledger.
isAncestor(fullyValidatedLedger))
700 fullyValidatedLedger = ledger;
743 for (
auto const link : net.links(
this))
745 if (link.target->id != from && link.target->id != bm.
origin)
749 if (link.target->router.lastObservedSeq[bm.
origin] < bm.
seq)
755 [to = link.target, bm,
id = this->id] {
772 schedule(delays.onReceive(bm.
msg), [
this, bm, from] {
786 return p.
prevLedger() == lastClosedLedger.id();
791 if (
std::find(dest.begin(), dest.end(), p) != dest.end())
797 return consensus.peerProposal(now(),
Position{p});
803 bool const inserted =
806 consensus.gotTxSet(now(), txs);
815 TxSetType const& lastClosedTxs = lastClosedLedger.txs();
816 if (lastClosedTxs.find(tx) != lastClosedTxs.end())
820 return openTxs.insert(tx).second;
831 return addTrustedValidation(v);
837 return fullyValidatedLedger.seq() >
Ledger::Seq{0};
843 return earliestAllowedSeq();
850 for (
auto const p : trustGraph.trustedPeers(
this))
852 return {quorum, keys};
858 return validations.laggards(
seq, trusted);
864 return runAsValidator;
896 consensus.timerEntry(now());
898 if (completedLedgers < targetLedgers)
899 scheduler.in(parms().ledgerGRANULARITY, [
this]() { timerEntry(); });
910 validations.getPreferred(lastClosedLedger, earliestAllowedSeq());
912 bestLCL = lastClosedLedger.id();
918 consensus.startRound(
919 now(), bestLCL, lastClosedLedger, nowUntrusted, runAsValidator, {});
928 validations.expire(j);
929 scheduler.in(parms().ledgerGRANULARITY, [&]() { timerEntry(); });
941 using namespace std::chrono_literals;
943 scheduler.now().time_since_epoch() + 86400s + clockSkew));
949 return consensus.prevLedgerID();
973 auto const it = txInjections.
find(prevLedger.
seq());
975 if (it == txInjections.
end())
978 res.insert(it->second);
Decorator for streaming out compact json.
A generic endpoint for log messages.
Wraps a Journal::Sink to prefix its output with a string.
LedgerID_t const & prevLedger() const
Get the prior accepted ledger this position is based on.
Json::Value getJson() const
Get JSON representation for debugging.
NodeID_t const & nodeID() const
Identifying which peer took this position.
std::chrono::milliseconds read() const
Generic implementation of consensus algorithm.
std::chrono::time_point< NetClock > time_point
Maintains current and recent ledger validations.
std::size_t getNodesAfter(Ledger const &ledger, ID const &ledgerID)
Count the number of current trusted validators working on a ledger after the specified one.
bool canValidateSeq(Seq const s)
Return whether the local node can issue a validation for the given sequence number.
std::size_t numTrustedForLedger(ID const &ledgerID)
Count the number of trusted full validations for the given ledger.
Peer to peer network simulator.
auto links(Peer const &from)
Return the range of active links.
void send(Peer const &from, Peer const &to, Function &&f)
Send a message to a peer.
A container of CollectorRefs.
void on(PeerID node, SimTime when, E const &e)
Oracle maintaining unique ledgers for a simulation.
A ledger is a set of observed transactions and a sequence number identifying the ledger.
bool isAncestor(Ledger const &ancestor) const
Determine whether ancestor is really an ancestor of this ledger.
Basic wrapper of a proposed position taken by a peer.
std::string render() const
Position(Proposal const &p)
Proposal const & proposal() const
Json::Value getJson() const
Generic Validations adaptor that simply ignores recently stale validations.
NetClock::time_point now() const
std::optional< Ledger > acquire(Ledger::ID const &lId)
Simulated discrete-event scheduler.
cancel_token in(duration const &delay, Function &&f)
Schedule an event after a specified duration passes.
time_point now() const
Return the current network time.
TxSet is a set of transactions to consider including in the ledger.
Tx const * find(Tx::ID const &txId) const
TxSetType const & txs() const
beast::uhash<>::result_type ID
Validation of a specific ledger by a specific Peer.
void setSeen(NetClock::time_point seen)
PeerID const & nodeID() const
Ledger::ID ledgerID() const
typename SimClock::duration SimDuration
std::string to_string(TxSetType const &txs)
boost::container::flat_set< Tx > TxSetType
std::pair< PeerID, std::uint32_t > PeerKey
The current key of a peer.
tagged_integer< std::uint32_t, PeerIDTag > PeerID
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
ConsensusMode
Represents how a node currently participates in Consensus.
@ proposing
We are normal participant in consensus and propose our position.
ValStatus
Status of validation we received.
@ stale
Not current or was older than current from this node.
boost::outcome_v2::result< T, std::error_code > Result
Stores the set of initial close times.
Consensus algorithm parameters.
Encapsulates the result of consensus.
Timing parameters to control validation staleness and expiration.
Peer closed the open ledger.
Peer fully validated a new ledger.
Simulated delays in internal peer processing.
std::chrono::milliseconds recvValidation
Delay in processing validations from remote peers.
SimDuration onReceive(Validation const &) const
std::chrono::milliseconds ledgerAccept
Delay in consensus calling doAccept to accepting and issuing validation TODO: This should be a functi...
SimDuration onReceive(M const &) const
bc::flat_map< PeerID, std::size_t > lastObservedSeq
A single peer in the simulation.
Result onClose(Ledger const &prevLedger, NetClock::time_point closeTime, ConsensusMode mode)
std::chrono::seconds clockSkew
Skew of time relative to the common scheduler clock.
void propose(Proposal const &pos)
Ledger::ID prevLedgerID() const
TxSetType openTxs
openTxs that haven't been closed in a ledger yet
void updateOperatingMode(std::size_t const positions) const
void receive(BroadcastMesg< M > const &bm, PeerID from)
ConsensusParms const & parms() const
Ledger::Seq getValidLedgerIndex() const
bc::flat_map< TxSet::ID, TxSet > txSets
TxSet associated with a TxSet::ID.
BasicNetwork< Peer * > & net
Handle to network for sending messages.
std::size_t prevProposers
int completedLedgers
The number of ledgers this peer has completed.
hash_set< NodeKey_t > trustedKeys
hash_map< Ledger::ID, Ledger > ledgers
Ledgers this node has closed or loaded from the network.
bool haveValidated() const
Peer(PeerID i, Scheduler &s, LedgerOracle &o, BasicNetwork< Peer * > &n, TrustGraph< Peer * > &tg, CollectorRefs &c, beast::Journal jIn)
Constructor.
bool runAsValidator
Whether to simulate running as validator or a tracking node.
hash_map< Ledger::Seq, Tx > txInjections
TxSet const * acquireTxSet(TxSet::ID const &setId)
ConsensusParms consensusParms
CollectorRefs & collectors
The collectors to report events to.
bool trusts(PeerID const &oId)
void onAccept(Result const &result, Ledger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson, bool const validating)
TxSet injectTxs(Ledger prevLedger, TxSet const &src)
Inject non-consensus Tx.
Ledger lastClosedLedger
The last ledger closed by this node.
LedgerOracle & oracle
The oracle that manages unique ledgers.
std::pair< std::size_t, hash_set< NodeKey_t > > getQuorumKeys()
Ledger const * acquireLedger(Ledger::ID const &ledgerID)
bc::flat_map< TxSet::ID, SimTime > acquiringTxSets
Ledger::ID getPrevLedger(Ledger::ID const &ledgerID, Ledger const &ledger, ConsensusMode mode)
Consensus< Peer > consensus
Generic consensus.
bool connect(Peer &o, SimDuration dur)
Create network connection.
void onForceAccept(Result const &result, Ledger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson)
bool handle(Validation const &v)
void send(BroadcastMesg< M > const &bm, PeerID from)
void issue(E const &event)
bool handle(TxSet const &txs)
void checkFullyValidated(Ledger const &ledger)
Check if a new ledger can be deemed fully validated.
bc::flat_map< Ledger::ID, SimTime > acquiringLedgers
void onModeChange(ConsensusMode, ConsensusMode)
NetClock::time_point now() const
TrustGraph< Peer * > & trustGraph
Handle to Trust graph of network.
std::size_t laggards(Ledger::Seq const seq, hash_set< NodeKey_t > &trusted)
bool handle(Proposal const &p)
void timerEntry()
Heartbeat timer call.
bc::flat_map< Ledger::ID, std::vector< Proposal > > peerPositions
Map from Ledger::ID to vector of Positions with that ledger as the prior ledger.
Validations< ValAdaptor > validations
Validations from trusted nodes.
bool addTrustedValidation(Validation v)
Add a trusted validation and return true if it is worth forwarding.
bool handle(Tx const &tx)
PeerKey key
Current signing key.
void schedule(std::chrono::nanoseconds when, T &&what)
Schedule the provided callback in when duration, but if when is 0, call immediately.
Scheduler & scheduler
Scheduler of events.
ProcessingDelays delays
Simulated delays to use for internal processing.
void share(Position const &p)
Ledger fullyValidatedLedger
The most recent ledger that has been fully validated by the network from the perspective of this Peer...
bool hasOpenTransactions() const
beast::WrappedSink sink
Logging support that prefixes messages with the peer ID.
int targetLedgers
The number of ledgers this peer should complete before stopping to run.
std::size_t proposersFinished(Ledger const &prevLedger, Ledger::ID const &prevLedgerID)
std::size_t proposersValidated(Ledger::ID const &prevLedger)
bool disconnect(Peer &o)
Remove a network connection.
Ledger::Seq earliestAllowedSeq() const
std::chrono::milliseconds prevRoundTime
void submit(Tx const &tx)
A value received from another peer as part of flooding.
A value relayed to another peer as part of flooding.
A value to be flooded to all other peers starting from this peer.
Peer starts a new consensus round.
A transaction submitted to a peer.
Peer detected a wrong prior ledger during consensus.
Set the sequence number on a JTx.