2#include <test/jtx/Env.h>
4#include <xrpld/overlay/Message.h>
5#include <xrpld/overlay/Peer.h>
6#include <xrpld/overlay/Slot.h>
7#include <xrpld/overlay/Squelch.h>
8#include <xrpld/overlay/detail/Handshake.h>
10#include <xrpl/basics/random.h>
11#include <xrpl/beast/unit_test.h>
12#include <xrpl/protocol/SecretKey.h>
13#include <xrpl/protocol/messages.h>
15#include <boost/thread.hpp>
232 protocol::MessageType type = protocol::mtVALIDATION) = 0;
267 peer->onMessage(m, f);
309 protocol::TMValidation v;
310 v.set_validation(
"validation");
363 for (
auto id : peers)
418 it->second->up(
true);
426 it->second->up(
false);
528 return res ? *res : 0;
537 protocol::MessageType type = protocol::mtVALIDATION)
override
547 slots_.deletePeer(
id,
true);
606 for (
auto& [
id, _] :
peers_)
633 return selected.contains(peer);
640 assert(!selected.empty());
641 return *selected.begin();
780 squelch.clear_validatorpubkey();
791 auto size = max - min;
807 bool resetClock =
true)
818 for (
int m = 0; m < nMessages; ++m)
849 for (
auto& [_, v] : peers)
876 for (
auto& [k, v] : peers)
889 bool const res =
static_cast<bool>(
duration);
894 auto sp = peerPtr.
lock();
933 bool squelched =
false;
947 str <<
" selected: ";
948 for (
auto s : selected)
953 <<
" random, squelched, validator: " <<
validator.id()
958 countingState ==
false &&
968 events[event].cnt_++;
969 events[event].validator_ =
validator.id();
971 events[event].peer_ = link.
peerId();
973 events[event].time_ = now;
977 events[event].isSelected_ =
998 if (event.isSelected_)
999 sendSquelch(v, peerPtr, {});
1000 event.handled_ =
true;
1007 bool const handled = (!
event.isSelected_ && !
event.handled_) ||
1008 (event.isSelected_ && (event.handled_ || allCounting));
1009 BEAST_EXPECT(handled);
1011 event.isSelected_ =
false;
1012 event.handledCnt_ += handled;
1013 event.handled_ =
false;
1029 bool mustHandle =
false;
1030 if (event.state_ ==
State::On && BEAST_EXPECT(event.key_))
1034 auto d = reduce_relay::epoch<milliseconds>(now).count() -
1036 mustHandle =
event.isSelected_ &&
1040 peers.contains(event.peer_);
1043 event.handled_ =
true;
1044 if (mustHandle && v == event.key_)
1046 event.state_ = State::WaitReset;
1047 sendSquelch(validator, ptr, {});
1050 bool const handled = (
event.handled_ &&
event.state_ ==
State::WaitReset) ||
1051 (!event.handled_ && !mustHandle);
1052 BEAST_EXPECT(handled);
1059 BEAST_EXPECT(handled);
1061 event.isSelected_ =
false;
1062 event.handledCnt_ += handled;
1063 event.handled_ =
false;
1068 auto&
down = events[EventType::LinkDown];
1069 auto& disconnected = events[EventType::PeerDisconnected];
1071 BEAST_EXPECT(
down.handledCnt_ >=
down.cnt_ - 1);
1073 BEAST_EXPECT(disconnected.cnt_ == disconnected.handledCnt_);
1077 <<
" peer disconnect count: " << disconnected.cnt_ <<
"/"
1078 << disconnected.handledCnt_;
1085 auto countingState = network_.overlay().isCountingState(
validator);
1086 BEAST_EXPECT(countingState == isCountingState);
1087 return countingState == isCountingState;
1105 doTest(
"Initial Round", log, [
this](
bool log) { BEAST_EXPECT(propagateAndSquelch(log)); });
1114 doTest(
"Peer Unsquelched Too Soon", log, [
this](
bool log) {
1115 BEAST_EXPECT(propagateNoSquelch(log, 1,
false,
false,
false));
1125 ManualClock::advance(
seconds(601));
1126 doTest(
"Peer Unsquelched", log, [
this](
bool log) {
1127 BEAST_EXPECT(propagateNoSquelch(log, 2,
true,
true,
false));
1143 sendSquelch(key, peerPtr,
duration);
1149 MAX_PEERS - env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1157 auto selected = network_.overlay().getSelected(network_.validator(0));
1159 selected.size() == env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1160 BEAST_EXPECT(n == 1);
1161 auto res = checkCounting(network_.validator(0),
false);
1163 return n == 1 && res;
1173 bool resetClock =
true)
1175 bool squelched =
false;
1182 BEAST_EXPECT(
false);
1189 auto res = checkCounting(network_.validator(0), countingState);
1190 return !squelched && res;
1199 doTest(
"New Peer", log, [
this](
bool log) {
1200 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1202 BEAST_EXPECT(propagateNoSquelch(log, 1,
true,
false,
false));
1211 doTest(
"Selected Peer Disconnects", log, [
this](
bool log) {
1212 ManualClock::advance(
seconds(601));
1213 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1214 auto id = network_.overlay().getSelectedPeer(network_.validator(0));
1216 network_.overlay().deletePeer(
1220 MAX_PEERS - env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1221 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1230 doTest(
"Selected Peer Stops Relaying", log, [
this](
bool log) {
1231 ManualClock::advance(
seconds(601));
1232 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1235 network_.overlay().deleteIdlePeers(
1237 auto peers = network_.overlay().getPeers(network_.validator(0));
1240 MAX_PEERS - env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1241 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1250 doTest(
"Squelched Peer Disconnects", log, [
this](
bool log) {
1251 ManualClock::advance(
seconds(601));
1252 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1253 auto peers = network_.overlay().getPeers(network_.validator(0));
1254 auto it =
std::find_if(peers.begin(), peers.end(), [&](
auto it) {
1255 return std::get<reduce_relay::PeerState>(it.second) ==
1256 reduce_relay::PeerState::Squelched;
1258 assert(it != peers.end());
1260 network_.overlay().deletePeer(
1262 BEAST_EXPECT(unsquelched == 0);
1263 BEAST_EXPECT(checkCounting(network_.validator(0),
false));
1270 doTest(
"Test Config - squelch enabled (legacy)", log, [&](
bool log) {
1282 doTest(
"Test Config - squelch disabled (legacy)", log, [&](
bool log) {
1295 toLoad = R
"rippleConfig(
1303 doTest(
"Test Config - squelch enabled", log, [&](
bool log) {
1308vp_base_squelch_enable=1
1315 doTest(
"Test Config - squelch disabled", log, [&](
bool log) {
1320vp_base_squelch_enable=0
1327 doTest(
"Test Config - legacy and new", log, [&](
bool log) {
1332vp_base_squelch_enable=0
1337 auto const expectedError =
1338 "Invalid reduce_relay"
1339 " cannot specify both vp_base_squelch_enable and vp_enable "
1341 "vp_enable was deprecated and replaced by "
1342 "vp_base_squelch_enable";
1353 BEAST_EXPECT(error == expectedError);
1356 doTest(
"Test Config - max selected peers", log, [&](
bool log) {
1368 toLoad = R"rippleConfig(
1370vp_base_squelch_max_selected_peers=6
1378 toLoad = R"rippleConfig(
1380vp_base_squelch_max_selected_peers=2
1384 auto const expectedError =
1385 "Invalid reduce_relay"
1386 " vp_base_squelch_max_selected_peers must be "
1387 "greater than or equal to 3";
1397 BEAST_EXPECT(error == expectedError);
1404 doTest(
"BaseSquelchReady", log, [&](
bool log) {
1405 ManualClock::reset();
1407 env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = baseSquelchEnabled;
1409 env_.app(), network_.overlay(), env_.app().config());
1412 BEAST_EXPECT(!createSlots(
false).baseSquelchReady());
1416 BEAST_EXPECT(!createSlots(
true).baseSquelchReady());
1421 BEAST_EXPECT(createSlots(
true).baseSquelchReady());
1425 BEAST_EXPECT(!createSlots(
false).baseSquelchReady());
1432 doTest(
"Duplicate Message", log, [&](
bool log) {
1436 for (
int i = 0; i < nMessages; i++)
1439 network_.overlay().updateSlotAndSquelch(
1443 auto peers = network_.overlay().getPeers(network_.validator(0));
1446 BEAST_EXPECT(
std::get<1>(peers[0]) == (nMessages - 1));
1448 uint256 const key(nMessages - 1);
1449 network_.overlay().updateSlotAndSquelch(
1452 peers = network_.overlay().getPeers(network_.validator(0));
1453 BEAST_EXPECT(
std::get<1>(peers[0]) == (nMessages - 1));
1456 network_.overlay().updateSlotAndSquelch(
1458 peers = network_.overlay().getPeers(network_.validator(0));
1478 mutable int maxDuration_{0};
1484 doTest(
"Random Squelch", l, [&](
bool l) {
1488 auto run = [&](
int npeers) {
1497 for (
int peer = 0; peer < npeers; peer++)
1504 message,
validator, peer, protocol::MessageType::mtVALIDATION);
1508 ManualClock::advance(
hours(1));
1511 using namespace reduce_relay;
1516 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1517 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1520 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1521 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1530 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1531 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1533 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1535 log << make_reason(
"warning: squelch duration is low", __FILE__, __LINE__)
1542 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1543 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1544 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1546 log << make_reason(
"warning: squelch duration is low", __FILE__, __LINE__)
1556 doTest(
"Handshake", log, [&](
bool log) {
1557 auto setEnv = [&](
bool enable) {
1560 str <<
"[reduce_relay]\n"
1561 <<
"vp_enable=" << enable <<
"\n"
1562 <<
"[compression]\n"
1565 env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
1570 auto handshake = [&](
int outboundEnable,
int inboundEnable) {
1573 setEnv(outboundEnable);
1576 env_.app().config().COMPRESSION,
1578 env_.app().config().TX_REDUCE_RELAY_ENABLE,
1579 env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
1581 http_request.version(request.version());
1582 http_request.base() = request.base();
1585 auto const peerEnabled = inboundEnable && outboundEnable;
1588 auto const inboundEnabled =
1590 BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));
1592 setEnv(inboundEnable);
1594 true, http_request, addr, addr,
uint256{1}, 1, {1, 0}, env_.app());
1597 auto const outboundEnabled =
1599 BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
1614 cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
true;
1615 cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
1618 , network_(env_.app())
1625 bool const log =
false;
1627 testInitialRound(log);
1628 testPeerUnsquelchedTooSoon(log);
1629 testPeerUnsquelched(log);
1631 testSquelchedPeerDisconnects(log);
1632 testSelectedPeerDisconnects(log);
1633 testSelectedPeerStopsRelaying(log);
1634 testInternalHashRouter(log);
1635 testRandomSquelch(log);
1637 testBaseSquelchReady(log);
1646 doTest(
"Random Test", log, [&](
bool log) { random(log); });
1652 bool const log =
false;
1657BEAST_DEFINE_TESTSUITE(reduce_relay, overlay,
xrpl);
1658BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, overlay,
xrpl);
T back_inserter(T... args)
A version-independent IP address and port combination.
A generic endpoint for log messages.
log_os< char > log
Logging output stream.
virtual Config & config()=0
void loadFromString(std::string const &fileContents)
Load the config from the contents of the string.
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS
Represents a peer connection in the overlay.
std::uint32_t id_t
Uniquely identifies a peer.
std::uint8_t const * data() const noexcept
std::size_t size() const noexcept
Service registry for dependency injection.
virtual beast::Journal getJournal(std::string const &name)=0
An immutable linear range of bytes.
Slot is associated with a specific validator via validator's public key.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
Maintains squelching of relaying messages from validators.
Simulate link from a validator to a peer directly connected to the server.
Link(Validator &validator, PeerSPtr peer, Latency const &latency={milliseconds(5), milliseconds(15)})
void send(MessageSPtr const &m, SquelchCB f)
std::chrono::duration< std::uint32_t, period > duration
static void randAdvance(milliseconds min, milliseconds max)
static duration randDuration(milliseconds min, milliseconds max)
static bool const is_steady
static void advance(duration d) noexcept
static void reset() noexcept
static time_point now() noexcept
std::chrono::time_point< ManualClock > time_point
bool allCounting(Peer::id_t peer)
Check if there are peers to unsquelch - peer is in Selected state in any of the slots and there are p...
bool isSelected(Peer::id_t id)
Is peer in Selected state in any of the slots.
void enableLink(std::uint16_t validatorId, Peer::id_t peer, bool enable)
std::vector< Validator > validators_
Network(Application &app)
static void for_rand(std::uint32_t min, std::uint32_t max, std::function< void(std::uint32_t)> f)
Validator & validator(std::uint16_t v)
void onDisconnectPeer(Peer::id_t peer)
void propagate(LinkIterCB link, std::uint16_t nValidators=MAX_VALIDATORS, std::uint32_t nMessages=MAX_MESSAGES, bool purge=true, bool resetClock=true)
bool isSelected(PublicKey const &validator, Peer::id_t peer)
std::uint16_t getNumPeers() const
std::optional< Peer::id_t > deleteLastPeer()
std::unordered_map< id_t, std::tuple< reduce_relay::PeerState, std::uint16_t, std::uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
std::set< id_t > getSelected(PublicKey const &validator)
PeerSPtr addPeer(bool useCache=true)
void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t squelchDuration) const override
Squelch handler.
void deletePeer(Peer::id_t id, bool useCache=true)
ServiceRegistry & registry_
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
void deleteIdlePeers(UnsquelchCB f) override
void deletePeer(id_t id, UnsquelchCB f) override
id_t getSelectedPeer(PublicKey const &validator)
std::uint16_t inState(PublicKey const &validator, reduce_relay::PeerState state)
OverlaySim(Application &app)
bool isCountingState(PublicKey const &validator)
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION) override
reduce_relay::Slots< ManualClock > slots_
Simulate server's OverlayImpl.
virtual void deleteIdlePeers(UnsquelchCB)=0
virtual void deletePeer(Peer::id_t, UnsquelchCB)=0
virtual ~Overlay()=default
virtual void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION)=0
Simulate two entities - peer directly connected to the server (via squelch in PeerSim) and PeerImp (v...
void send(std::shared_ptr< Message > const &m) override
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
void addTxQueue(uint256 const &) override
Aggregate transaction's hash.
virtual void onMessage(protocol::TMSquelch const &squelch)=0
bool compressionEnabled() const override
virtual void onMessage(MessageSPtr const &m, SquelchCB f)=0
void sendTxQueue() override
Send aggregated transactions' hashes.
void removeTxQueue(uint256 const &) override
Remove hash from the transactions' hashes queue.
std::optional< std::size_t > publisherListSequence(PublicKey const &) const override
bool supportsFeature(ProtocolFeature f) const override
Json::Value json() override
bool hasTxSet(uint256 const &hash) const override
void send(protocol::TMSquelch const &squelch)
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
bool cluster() const override
Returns true if this connection is a member of the cluster.
uint256 const & getClosedLedgerHash() const override
void cycleStatus() override
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
void charge(Resource::Charge const &fee, std::string const &context={}) override
Adjust this peer's load balance based on the type of load imposed.
beast::IP::Endpoint getRemoteAddress() const override
bool isHighLatency() const override
PublicKey const & getNodePublic() const override
void setPublisherListSequence(PublicKey const &, std::size_t const) override
bool txReduceRelayEnabled() const override
int getScore(bool) const override
reduce_relay::Squelch< ManualClock > squelch_
std::string const & fingerprint() const override
void onMessage(MessageSPtr const &m, SquelchCB f) override
Local Peer (PeerImp)
virtual void onMessage(protocol::TMSquelch const &squelch) override
Remote Peer (Directly connected Peer)
PeerSim(Overlay &overlay, beast::Journal journal)
void addPeer(PeerSPtr peer)
Validator & operator=(Validator &&)=default
void send(std::vector< Peer::id_t > peers, SquelchCB f)
Send to specific peers.
void for_links(LinkIterCB f, bool simulateSlow=false)
void send(SquelchCB f)
Send to all peers.
void linkDown(Peer::id_t id)
Validator(Validator const &)=default
void linkUp(Peer::id_t id)
void deletePeer(Peer::id_t id)
Validator & operator=(Validator const &)=default
Validator(Validator &&)=default
static std::uint16_t sid_
void for_links(std::vector< Peer::id_t > peers, LinkIterCB f)
A transaction testing environment.
void testRandom(bool log)
void run() override
Runs the suite.
void testRandomSquelch(bool l)
void printPeers(std::string const &msg, std::uint16_t validator=0)
void doTest(std::string const &msg, bool log, std::function< void(bool)> f)
bool propagateNoSquelch(bool log, std::uint16_t nMessages, bool countingState, bool purge=true, bool resetClock=true)
Send fewer message so that squelch event is not generated.
bool propagateAndSquelch(bool log, bool purge=true, bool resetClock=true)
Propagate enough messages to generate one squelch event.
static Peer::id_t sendSquelch(PublicKey const &validator, PeerWPtr const &peerPtr, std::optional< std::uint32_t > duration)
Send squelch (if duration is set) or unsquelch (if duration not set)
void testConfig(bool log)
void random(bool log)
Randomly brings the link between a validator and a peer down.
void testSquelchedPeerDisconnects(bool log)
Squelched peer disconnects.
void testBaseSquelchReady(bool log)
void testPeerUnsquelchedTooSoon(bool log)
Receiving message from squelched peer too soon should not change the slot's state to Counting.
void testInternalHashRouter(bool log)
void testInitialRound(bool log)
Initial counting round: three peers receive message "faster" then others.
void testSelectedPeerStopsRelaying(bool log)
Selected peer stops relaying.
void testSelectedPeerDisconnects(bool log)
Selected peer disconnects.
void run() override
Runs the suite.
bool checkCounting(PublicKey const &validator, bool isCountingState)
void testHandshake(bool log)
void testPeerUnsquelched(bool log)
Receiving message from squelched peer should change the slot's state to Counting.
void testNewPeer(bool log)
Receiving a message from new peer should change the slot's state to Counting.
boost::asio::ip::address Address
static constexpr auto IDLED
static constexpr auto WAIT_ON_BOOTUP
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
std::unique_ptr< Config > envconfig()
creates and initializes a default configuration for jtx::Env
std::unique_ptr< Config > validator(std::unique_ptr< Config >, std::string const &)
adjust configuration with params needed to be a validator
std::shared_ptr< Message > MessageSPtr
static constexpr std::uint32_t MAX_PEERS
static constexpr std::uint32_t MAX_VALIDATORS
static constexpr std::uint32_t MAX_MESSAGES
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::pair< PublicKey, SecretKey > randomKeyPair(KeyType type)
Create a key pair using secure random numbers.
PublicKey derivePublicKey(KeyType type, SecretKey const &sk)
Derive the public key from a secret key.
int run(int argc, char **argv)
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
SecretKey randomSecretKey()
Create a secret key using secure random numbers.
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
constexpr Number squelch(Number const &x, Number const &limit) noexcept
static constexpr char FEATURE_VPRR[]
Set the sequence number on a JTx.
time_point< ManualClock > time_
std::optional< PublicKey > key_
std::uint32_t handledCnt_
void squelch(PublicKey const &, Peer::id_t, std::uint32_t duration) const override
Squelch handler.
void unsquelch(PublicKey const &, Peer::id_t) const override
Unsquelch handler.