1#include <test/jtx/Env.h>
2#include <test/jtx/envconfig.h>
4#include <xrpld/app/main/Application.h>
5#include <xrpld/overlay/Message.h>
6#include <xrpld/overlay/Peer.h>
7#include <xrpld/overlay/ReduceRelayCommon.h>
8#include <xrpld/overlay/Slot.h>
9#include <xrpld/overlay/Squelch.h>
10#include <xrpld/overlay/detail/Handshake.h>
12#include <xrpl/basics/base_uint.h>
13#include <xrpl/basics/random.h>
14#include <xrpl/beast/net/IPAddress.h>
15#include <xrpl/beast/net/IPEndpoint.h>
16#include <xrpl/beast/unit_test/suite.h>
17#include <xrpl/beast/utility/Journal.h>
18#include <xrpl/json/json_value.h>
19#include <xrpl/protocol/KeyType.h>
20#include <xrpl/protocol/PublicKey.h>
21#include <xrpl/protocol/SecretKey.h>
23#include <boost/asio/ip/address.hpp>
51using namespace std::chrono;
254 protocol::MessageType type = protocol::mtVALIDATION) = 0;
283 auto sp =
peer_.lock();
286 peer->onMessage(m, f);
301 auto p =
peer_.lock();
308 auto p =
peer_.lock();
328 protocol::TMValidation v;
329 v.set_validation(
"validation");
382 for (
auto id : peers)
384 assert(
links_.contains(
id));
433 auto it =
links_.find(
id);
434 assert(it !=
links_.end());
435 it->second->up(
true);
441 auto it =
links_.find(
id);
442 assert(it !=
links_.end());
443 it->second->up(
false);
545 return res ? *res : 0;
554 protocol::MessageType type = protocol::mtVALIDATION)
override
564 slots_.deletePeer(
id,
true);
598 auto it =
peers_.find(
id);
599 assert(it !=
peers_.end());
623 for (
auto& [
id, _] :
peers_)
650 return selected.contains(peer);
657 assert(!selected.empty());
658 return *selected.begin();
740 auto id =
overlay_.deleteLastPeer();
796 squelch.clear_validatorpubkey();
823 bool resetClock =
true)
834 for (
int m = 0; m < nMessages; ++m)
865 for (
auto& [_, v] : peers)
892 for (
auto& [k, v] : peers)
893 std::cout << k <<
":" << (int)std::get<reduce_relay::PeerState>(v) <<
" ";
905 bool const res =
static_cast<bool>(
duration);
910 auto sp = peerPtr.
lock();
950 bool squelched =
false;
964 str <<
" selected: ";
965 for (
auto s : selected)
969 std::cout << (double)reduce_relay::epoch<milliseconds>(now).count() / 1000.
970 <<
" random, squelched, validator: " <<
validator.id()
975 countingState ==
false &&
976 selected.size() ==
env_.app().config().vpReduceRelaySquelchMaxSelectedPeers);
985 events[event].validator =
validator.id();
987 events[event].peer = link.
peerId();
989 events[event].time = now;
993 events[event].isSelected =
1011 bool const allCounting =
network_.allCounting(event.peer);
1014 if (event.isSelected)
1015 sendSquelch(v, peerPtr, {});
1016 event.handled =
true;
1023 bool const handled = (!
event.isSelected && !
event.handled) ||
1024 (event.isSelected && (event.handled || allCounting));
1025 BEAST_EXPECT(handled);
1027 event.isSelected =
false;
1028 event.handledCnt += handled;
1029 event.handled =
false;
1030 network_.onDisconnectPeer(event.peer);
1045 bool mustHandle =
false;
1046 if (event.state ==
State::On && BEAST_EXPECT(event.key))
1048 event.isSelected =
network_.overlay().isSelected(*event.key, event.peer);
1049 auto peers =
network_.overlay().getPeers(*event.key);
1051 std::get<3>(peers[event.peer]);
1052 mustHandle =
event.isSelected &&
1056 peers.contains(event.peer);
1059 event.handled =
true;
1060 if (mustHandle && v == event.key)
1067 (!event.handled && !mustHandle);
1068 BEAST_EXPECT(handled);
1075 BEAST_EXPECT(handled);
1077 event.isSelected =
false;
1078 event.handledCnt += handled;
1079 event.handled =
false;
1080 network_.enableLink(event.validator, event.peer,
true);
1087 BEAST_EXPECT(down.handledCnt >= down.cnt - 1);
1089 BEAST_EXPECT(disconnected.cnt == disconnected.handledCnt);
1092 std::cout <<
"link down count: " << down.cnt <<
"/" << down.handledCnt
1093 <<
" peer disconnect count: " << disconnected.cnt <<
"/"
1094 << disconnected.handledCnt;
1102 BEAST_EXPECT(countingState == isCountingState);
1103 return countingState == isCountingState;
1130 doTest(
"Peer Unsquelched Too Soon",
log, [
this](
bool log) {
1165 kMaxPeers -
env_.app().config().vpReduceRelaySquelchMaxSelectedPeers);
1174 BEAST_EXPECT(selected.size() ==
env_.app().config().vpReduceRelaySquelchMaxSelectedPeers);
1175 BEAST_EXPECT(n == 1);
1178 return n == 1 && res;
1188 bool resetClock =
true)
1190 bool squelched =
false;
1197 BEAST_EXPECT(
false);
1205 return !squelched && res;
1226 doTest(
"Selected Peer Disconnects",
log, [
this](
bool log) {
1235 kMaxPeers -
env_.app().config().vpReduceRelaySquelchMaxSelectedPeers);
1245 doTest(
"Selected Peer Stops Relaying",
log, [
this](
bool log) {
1250 network_.overlay().deleteIdlePeers(
1255 kMaxPeers -
env_.app().config().vpReduceRelaySquelchMaxSelectedPeers);
1265 doTest(
"Squelched Peer Disconnects",
log, [
this](
bool log) {
1270 return std::get<reduce_relay::PeerState>(it.second) ==
1273 assert(it != peers.end());
1277 BEAST_EXPECT(unsquelched == 0);
1285 doTest(
"Test Config - squelch enabled (legacy)",
log, [&](
bool log) {
1297 doTest(
"Test Config - squelch disabled (legacy)",
log, [&](
bool log) {
1310 toLoad = R
"xrpldConfig(
1318 doTest(
"Test Config - squelch enabled",
log, [&](
bool log) {
1323vp_base_squelch_enable=1
1330 doTest(
"Test Config - squelch disabled",
log, [&](
bool log) {
1335vp_base_squelch_enable=0
1342 doTest(
"Test Config - legacy and new",
log, [&](
bool log) {
1347vp_base_squelch_enable=0
1352 auto const expectedError =
1353 "Invalid reduce_relay"
1354 " cannot specify both vp_base_squelch_enable and vp_enable "
1356 "vp_enable was deprecated and replaced by "
1357 "vp_base_squelch_enable";
1368 BEAST_EXPECT(error == expectedError);
1371 doTest(
"Test Config - max selected peers",
log, [&](
bool log) {
1383 toLoad = R"xrpldConfig(
1385vp_base_squelch_max_selected_peers=6
1393 toLoad = R"xrpldConfig(
1395vp_base_squelch_max_selected_peers=2
1399 auto const expectedError =
1400 "Invalid reduce_relay"
1401 " vp_base_squelch_max_selected_peers must be "
1402 "greater than or equal to 3";
1412 BEAST_EXPECT(error == expectedError);
1417 testBaseSquelchReady(
bool log)
1422 env_.app().config().vpReduceRelayBaseSquelchEnable = baseSquelchEnabled;
1427 BEAST_EXPECT(!createSlots(
false).baseSquelchReady());
1431 BEAST_EXPECT(!createSlots(
true).baseSquelchReady());
1436 BEAST_EXPECT(createSlots(
true).baseSquelchReady());
1440 BEAST_EXPECT(!createSlots(
false).baseSquelchReady());
1445 testInternalHashRouter(
bool log)
1447 doTest(
"Duplicate Message",
log, [&](
bool log) {
1451 for (
int i = 0; i < nMessages; i++)
1454 network_.overlay().updateSlotAndSquelch(
1461 BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
1463 uint256 const key(nMessages - 1);
1464 network_.overlay().updateSlotAndSquelch(
1468 BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
1471 network_.overlay().updateSlotAndSquelch(
1475 BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
1481 Handler() =
default;
1488 unsquelch(PublicKey
const&, Peer::id_t)
const override
1491 mutable int maxDuration{0};
1495 testRandomSquelch(
bool l)
1497 doTest(
"Random Squelch", l, [&](
bool l) {
1501 auto run = [&](
int npeers) {
1502 handler.maxDuration = 0;
1503 reduce_relay::Slots<ManualClock> slots(env_.app(), handler, env_.app().config());
1510 for (
int peer = 0; peer < npeers; peer++)
1514 std::uint64_t
const mid = (m * 1000) + peer;
1516 slots.updateSlotAndSquelch(
1517 message,
validator, peer, protocol::MessageType::mtVALIDATION);
1524 using namespace reduce_relay;
1529 handler.maxDuration >= kMinUnsquelchExpire.count() &&
1530 handler.maxDuration <= kMaxUnsquelchExpireDefault.count());
1533 handler.maxDuration >= kMinUnsquelchExpire.count() &&
1534 handler.maxDuration <= kMaxUnsquelchExpireDefault.count());
1543 handler.maxDuration >= kMinUnsquelchExpire.count() &&
1544 handler.maxDuration <= kMaxUnsquelchExpirePeers.count());
1546 if (handler.maxDuration <= kMaxUnsquelchExpireDefault.count())
1548 log <<
makeReason(
"warning: squelch duration is low", __FILE__, __LINE__)
1555 handler.maxDuration >= kMinUnsquelchExpire.count() &&
1556 handler.maxDuration <= kMaxUnsquelchExpirePeers.count());
1557 if (handler.maxDuration <= kMaxUnsquelchExpireDefault.count())
1559 log <<
makeReason(
"warning: squelch duration is low", __FILE__, __LINE__)
1567 testHandshake(
bool log)
1569 doTest(
"Handshake", log, [&](
bool log) {
1570 auto setEnv = [&](
bool enable) {
1572 std::stringstream str;
1573 str <<
"[reduce_relay]\n"
1574 <<
"vp_enable=" << enable <<
"\n"
1575 <<
"[compression]\n"
1577 c.loadFromString(str.
str());
1578 env_.app().config().vpReduceRelayBaseSquelchEnable =
1579 c.vpReduceRelayBaseSquelchEnable;
1581 env_.app().config().compression = c.compression;
1583 auto handshake = [&](
int outboundEnable,
int inboundEnable) {
1586 setEnv(outboundEnable);
1589 env_.app().config().compression,
1591 env_.app().config().txReduceRelayEnable,
1592 env_.app().config().vpReduceRelayBaseSquelchEnable);
1594 httpRequest.version(request.version());
1595 httpRequest.base() = request.base();
1598 auto const peerEnabled = inboundEnable && outboundEnable;
1601 auto const inboundEnabled =
1603 BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));
1605 setEnv(inboundEnable);
1607 true, httpRequest, addr, addr,
uint256{1}, 1, {1, 0}, env_.app());
1610 auto const outboundEnabled =
1612 BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
1627 cfg->vpReduceRelayBaseSquelchEnable =
true;
1628 cfg->vpReduceRelaySquelchMaxSelectedPeers = 6;
1631 , network_(env_.app())
1638 bool const log =
false;
1640 testInitialRound(log);
1641 testPeerUnsquelchedTooSoon(log);
1642 testPeerUnsquelched(log);
1644 testSquelchedPeerDisconnects(log);
1645 testSelectedPeerDisconnects(log);
1646 testSelectedPeerStopsRelaying(log);
1647 testInternalHashRouter(log);
1648 testRandomSquelch(log);
1650 testBaseSquelchReady(log);
1665 bool const log =
false;
T back_inserter(T... args)
A version-independent IP address and port combination.
A generic endpoint for log messages.
LogOs< char > log
Logging output stream.
TestcaseT testcase
Memberspace for declaring test cases.
std::size_t vpReduceRelaySquelchMaxSelectedPeers
bool vpReduceRelayBaseSquelchEnable
void loadFromString(std::string const &fileContents)
Load the config from the contents of the string.
Represents a peer connection in the overlay.
std::uint32_t id_t
Uniquely identifies a peer.
std::uint8_t const * data() const noexcept
static std::size_t size() noexcept
Service registry for dependency injection.
An immutable linear range of bytes.
Slot is associated with a specific validator via validator's public key.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
Maintains squelching of relaying messages from validators.
Simulate link from a validator to a peer directly connected to the server.
std::pair< milliseconds, milliseconds > Latency
Link(Validator &validator, PeerSPtr peer, Latency latency={milliseconds(5), milliseconds(15)})
void send(MessageSPtr const &m, SquelchCB f)
static void randAdvance(milliseconds min, milliseconds max)
static duration randDuration(milliseconds min, milliseconds max)
static bool const is_steady
static void advance(duration d) noexcept
std::chrono::time_point< ManualClock > time_point
static void reset() noexcept
static time_point now() noexcept
std::chrono::duration< std::uint32_t, period > duration
bool allCounting(Peer::id_t peer)
Check if there are peers to unsquelch - peer is in Selected state in any of the slots and there are p...
static void forRand(std::uint32_t min, std::uint32_t max, std::function< void(std::uint32_t)> f)
bool isSelected(Peer::id_t id)
Is peer in Selected state in any of the slots.
void enableLink(std::uint16_t validatorId, Peer::id_t peer, bool enable)
void propagate(LinkIterCB link, std::uint16_t nValidators=kMaxValidators, std::uint32_t nMessages=kMaxMessages, bool purge=true, bool resetClock=true)
std::vector< Validator > validators_
Network(Application &app)
Validator & validator(std::uint16_t v)
void onDisconnectPeer(Peer::id_t peer)
bool isSelected(PublicKey const &validator, Peer::id_t peer)
std::uint16_t getNumPeers() const
std::optional< Peer::id_t > deleteLastPeer()
std::unordered_map< id_t, std::tuple< reduce_relay::PeerState, std::uint16_t, std::uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
std::set< id_t > getSelected(PublicKey const &validator)
PeerSPtr addPeer(bool useCache=true)
void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t squelchDuration) const override
Squelch handler.
void deletePeer(Peer::id_t id, bool useCache=true)
ServiceRegistry & registry_
std::unordered_map< Peer::id_t, PeerSPtr > Peers
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
void deleteIdlePeers(UnsquelchCB f) override
void deletePeer(id_t id, UnsquelchCB f) override
id_t getSelectedPeer(PublicKey const &validator)
std::uint16_t inState(PublicKey const &validator, reduce_relay::PeerState state)
OverlaySim(Application &app)
bool isCountingState(PublicKey const &validator)
~OverlaySim() override=default
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION) override
reduce_relay::Slots< ManualClock > slots_
Simulate server's OverlayImpl.
virtual void deleteIdlePeers(UnsquelchCB)=0
virtual void deletePeer(Peer::id_t, UnsquelchCB)=0
virtual ~Overlay()=default
virtual void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION)=0
void send(std::shared_ptr< Message > const &m) override
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
void addTxQueue(uint256 const &) override
Aggregate transaction's hash.
virtual void onMessage(protocol::TMSquelch const &squelch)=0
bool compressionEnabled() const override
virtual void onMessage(MessageSPtr const &m, SquelchCB f)=0
void sendTxQueue() override
Send aggregated transactions' hashes.
void removeTxQueue(uint256 const &) override
Remove hash from the transactions' hashes queue.
json::Value json() override
std::optional< std::size_t > publisherListSequence(PublicKey const &) const override
bool supportsFeature(ProtocolFeature f) const override
bool hasTxSet(uint256 const &hash) const override
void send(protocol::TMSquelch const &squelch)
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
~PeerPartial() override=default
bool cluster() const override
Returns true if this connection is a member of the cluster.
uint256 const & getClosedLedgerHash() const override
void cycleStatus() override
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
void charge(Resource::Charge const &fee, std::string const &context={}) override
Adjust this peer's load balance based on the type of load imposed.
beast::IP::Endpoint getRemoteAddress() const override
bool isHighLatency() const override
PublicKey const & getNodePublic() const override
void setPublisherListSequence(PublicKey const &, std::size_t const) override
bool txReduceRelayEnabled() const override
int getScore(bool) const override
~PeerSim() override=default
void onMessage(protocol::TMSquelch const &squelch) override
Remote Peer (Directly connected Peer).
reduce_relay::Squelch< ManualClock > squelch_
std::string const & fingerprint() const override
void onMessage(MessageSPtr const &m, SquelchCB f) override
Local Peer (PeerImp).
PeerSim(Overlay &overlay, beast::Journal journal)
void addPeer(PeerSPtr peer)
Validator & operator=(Validator &&)=default
void send(std::vector< Peer::id_t > peers, SquelchCB f)
Send to specific peers.
void send(SquelchCB f)
Send to all peers.
void forLinks(LinkIterCB f, bool simulateSlow=false)
void linkDown(Peer::id_t id)
Validator(Validator const &)=default
void linkUp(Peer::id_t id)
void deletePeer(Peer::id_t id)
void forLinks(std::vector< Peer::id_t > peers, LinkIterCB f)
Validator & operator=(Validator const &)=default
std::unordered_map< Peer::id_t, LinkSPtr > Links
Validator(Validator &&)=default
void testRandom(bool log)
void run() override
Runs the suite.
void printPeers(std::string const &msg, std::uint16_t validator=0)
void doTest(std::string const &msg, bool log, std::function< void(bool)> f)
bool propagateNoSquelch(bool log, std::uint16_t nMessages, bool countingState, bool purge=true, bool resetClock=true)
Send fewer message so that squelch event is not generated.
bool propagateAndSquelch(bool log, bool purge=true, bool resetClock=true)
Propagate enough messages to generate one squelch event.
static Peer::id_t sendSquelch(PublicKey const &validator, PeerWPtr const &peerPtr, std::optional< std::uint32_t > duration)
Send squelch (if duration is set) or unsquelch (if duration not set).
void testConfig(bool log)
void random(bool log)
Randomly brings the link between a validator and a peer down.
void testSquelchedPeerDisconnects(bool log)
Squelched peer disconnects.
void testPeerUnsquelchedTooSoon(bool log)
Receiving message from squelched peer too soon should not change the slot's state to Counting.
reduce_relay::Slot< ManualClock > Slot
void testInitialRound(bool log)
Initial counting round: three peers receive message "faster" then others.
void testSelectedPeerStopsRelaying(bool log)
Selected peer stops relaying.
void testSelectedPeerDisconnects(bool log)
Selected peer disconnects.
bool checkCounting(PublicKey const &validator, bool isCountingState)
void testPeerUnsquelched(bool log)
Receiving message from squelched peer should change the slot's state to Counting.
void testNewPeer(bool log)
Receiving a message from new peer should change the slot's state to Counting.
boost::asio::ip::address Address
static std::string makeReason(String const &reason, char const *file, int line)
static constexpr uint16_t kMaxMessageThreshold
static constexpr auto kWaitOnBootup
static constexpr auto kIdled
std::unique_ptr< Config > envconfig()
creates and initializes a default configuration for jtx::Env
std::unique_ptr< Config > validator(std::unique_ptr< Config >, std::string const &)
adjust configuration with params needed to be a validator
static constexpr std::uint32_t kMaxMessages
std::function< void(Link &, MessageSPtr)> LinkIterCB
BEAST_DEFINE_TESTSUITE(AMMClawback, app, xrpl)
std::shared_ptr< Peer > PeerSPtr
static constexpr std::uint32_t kMaxPeers
BEAST_DEFINE_TESTSUITE_MANUAL(AMMCalc, app, xrpl)
std::shared_ptr< Message > MessageSPtr
static constexpr std::uint32_t kMaxValidators
std::shared_ptr< Link > LinkSPtr
std::weak_ptr< Peer > PeerWPtr
std::function< void(PublicKey const &, PeerWPtr const &)> UnsquelchCB
std::function< void(PublicKey const &, PeerWPtr const &, std::uint32_t)> SquelchCB
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::pair< PublicKey, SecretKey > randomKeyPair(KeyType type)
Create a key pair using secure random numbers.
PublicKey derivePublicKey(KeyType type, SecretKey const &sk)
Derive the public key from a secret key.
int run(int argc, char **argv)
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
std::enable_if_t< std::is_integral_v< Integral >, Integral > randInt()
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address publicIp, beast::IP::Address remoteIp, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
SecretKey randomSecretKey()
Create a secret key using secure random numbers.
bool peerFeatureEnabled(Headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
constexpr Number squelch(Number const &x, Number const &limit) noexcept
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
static constexpr char kFeatureVprr[]
T dynamic_pointer_cast(T... args)
time_point< ManualClock > time
std::optional< PublicKey > key