xrpld
Loading...
Searching...
No Matches
PeerImp.cpp
1#include <xrpld/overlay/detail/PeerImp.h>
2
3#include <xrpld/app/consensus/RCLCxPeerPos.h>
4#include <xrpld/app/consensus/RCLValidations.h>
5#include <xrpld/app/ledger/InboundLedgers.h>
6#include <xrpld/app/ledger/InboundTransactions.h>
7#include <xrpld/app/ledger/LedgerMaster.h>
8#include <xrpld/app/ledger/TransactionMaster.h>
9#include <xrpld/app/misc/Transaction.h>
10#include <xrpld/app/misc/ValidatorList.h>
11#include <xrpld/consensus/Validations.h>
12#include <xrpld/overlay/Cluster.h>
13#include <xrpld/overlay/ClusterNode.h>
14#include <xrpld/overlay/Peer.h>
15#include <xrpld/overlay/ReduceRelayCommon.h>
16#include <xrpld/overlay/detail/Handshake.h>
17#include <xrpld/overlay/detail/OverlayImpl.h>
18#include <xrpld/overlay/detail/ProtocolMessage.h>
19#include <xrpld/overlay/detail/ProtocolVersion.h>
20#include <xrpld/overlay/detail/TrafficCount.h>
21#include <xrpld/overlay/detail/Tuning.h>
22#include <xrpld/peerfinder/PeerfinderManager.h>
23#include <xrpld/peerfinder/Slot.h>
24
25#include <xrpl/basics/Blob.h>
26#include <xrpl/basics/Log.h>
27#include <xrpl/basics/SHAMapHash.h>
28#include <xrpl/basics/Slice.h>
29#include <xrpl/basics/ToString.h>
30#include <xrpl/basics/UptimeClock.h>
31#include <xrpl/basics/base64.h>
32#include <xrpl/basics/base_uint.h>
33#include <xrpl/basics/chrono.h>
34#include <xrpl/basics/random.h>
35#include <xrpl/basics/safe_cast.h>
36#include <xrpl/basics/strHex.h>
37#include <xrpl/beast/utility/Journal.h>
38#include <xrpl/beast/utility/Zero.h>
39#include <xrpl/beast/utility/instrumentation.h>
40#include <xrpl/core/HashRouter.h>
41#include <xrpl/core/Job.h>
42#include <xrpl/core/PerfLog.h>
43#include <xrpl/json/json_forwards.h>
44#include <xrpl/json/json_value.h>
45#include <xrpl/ledger/Ledger.h>
46#include <xrpl/protocol/KeyType.h>
47#include <xrpl/protocol/LedgerHeader.h>
48#include <xrpl/protocol/Protocol.h>
49#include <xrpl/protocol/PublicKey.h>
50#include <xrpl/protocol/SField.h>
51#include <xrpl/protocol/STTx.h>
52#include <xrpl/protocol/Serializer.h>
53#include <xrpl/protocol/TxFlags.h>
54#include <xrpl/protocol/digest.h>
55#include <xrpl/protocol/jss.h>
56#include <xrpl/protocol/tokens.h>
57#include <xrpl/resource/Charge.h>
58#include <xrpl/resource/Consumer.h>
59#include <xrpl/resource/Disposition.h>
60#include <xrpl/resource/Fees.h>
61#include <xrpl/resource/Gossip.h>
62#include <xrpl/server/LoadFeeTrack.h>
63#include <xrpl/server/NetworkOPs.h>
64#include <xrpl/shamap/SHAMapNodeID.h>
65#include <xrpl/tx/apply.h>
66
67#include <boost/algorithm/string/predicate.hpp>
68#include <boost/asio/bind_executor.hpp>
69#include <boost/asio/buffer.hpp>
70#include <boost/asio/completion_condition.hpp>
71#include <boost/asio/dispatch.hpp>
72#include <boost/asio/error.hpp>
73#include <boost/asio/strand.hpp>
74#include <boost/asio/write.hpp>
75#include <boost/beast/core/multi_buffer.hpp>
76#include <boost/beast/core/ostream.hpp>
77#include <boost/system/system_error.hpp>
78
79#include <google/protobuf/message.h>
80
81#include <xrpl.pb.h>
82
83#include <algorithm>
84#include <atomic>
85#include <chrono>
86#include <cstddef>
87#include <cstdint>
88#include <exception>
89#include <functional>
90#include <map>
91#include <memory>
92#include <mutex>
93#include <numeric>
94#include <optional>
95#include <shared_mutex>
96#include <sstream>
97#include <string>
98#include <string_view>
99#include <tuple>
100#include <utility>
101#include <vector>
102
103using namespace std::chrono_literals;
104
105namespace xrpl {
106
107namespace {
109constexpr std::chrono::milliseconds kPeerHighLatency{300};
110
112constexpr std::chrono::seconds kPeerTimerInterval{60};
113
114} // namespace
115
116// TODO: Remove this exclusion once unit tests are added after the hotfix
117// release.
118
120 Application& app,
121 id_t id,
123 http_request_type&& request,
124 PublicKey const& publicKey,
126 Resource::Consumer consumer,
128 OverlayImpl& overlay)
129 : Child(overlay)
130 , app_(app)
131 , id_(id)
132 , fingerprint_(getFingerprint(slot->remoteEndpoint(), publicKey, to_string(id)))
134 , sink_(app_.getJournal("Peer"), prefix_)
135 , pSink_(app_.getJournal("Protocol"), prefix_)
136 , journal_(sink_)
138 , streamPtr_(std::move(streamPtr))
139 , socket_(streamPtr_->next_layer().socket())
141 , strand_(boost::asio::make_strand(socket_.get_executor()))
142 , timer_(waitable_timer{socket_.get_executor()})
143 , remoteAddress_(slot->remoteEndpoint())
144 , overlay_(overlay)
145 , inbound_(true)
146 , protocol_(std::move(protocol))
148 , trackingTime_(clock_type::now())
149 , publicKey_(publicKey)
150 , lastPingTime_(clock_type::now())
151 , creationTime_(clock_type::now())
152 , squelch_(app_.getJournal("Squelch"))
153 , usage_(consumer)
154 , fee_{.fee = Resource::kFeeTrivialPeer, .context = ""}
155 , slot_(slot)
156 , request_(std::move(request))
160 ? Compressed::On
161 : Compressed::Off)
163 peerFeatureEnabled(headers_, kFeatureTxrr, app_.config().txReduceRelayEnable))
165 peerFeatureEnabled(headers_, kFeatureLedgerReplay, app_.config().ledgerReplay))
166 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
167{
168 JLOG(journal_.info())
169 << "compression enabled " << (compressionEnabled_ == Compressed::On)
170 << " vp reduce-relay base squelch enabled "
171 << peerFeatureEnabled(headers_, kFeatureVprr, app_.config().vpReduceRelayBaseSquelchEnable)
172 << " tx reduce-relay enabled " << txReduceRelayEnabled_;
173}
174
176{
177 bool const inCluster{cluster()};
178
179 overlay_.deletePeer(id_);
180 overlay_.onPeerDeactivate(id_);
181 overlay_.peerFinder().onClosed(slot_);
182 overlay_.remove(slot_);
183
184 if (inCluster)
185 {
186 JLOG(journal_.warn()) << name() << " left cluster";
187 }
188}
189
190// Helper function to check for valid uint256 values in protobuf buffers
191static bool
193{
194 return pBuffStr.size() == uint256::size();
195}
196
197void
199{
200 dispatch(strand_, [self = shared_from_this()]() {
201 auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
202 if (uint256 ret; ret.parseHex(value))
203 return ret;
204
205 if (auto const s = base64Decode(value); s.size() == uint256::size())
206 return uint256::fromRaw(s);
207
208 return std::nullopt;
209 };
210
212 std::optional<uint256> previous;
213
214 if (auto const iter = self->headers_.find("Closed-Ledger"); iter != self->headers_.end())
215 {
216 closed = parseLedgerHash(iter->value());
217
218 if (!closed)
219 self->fail("Malformed handshake data (1)");
220 }
221
222 if (auto const iter = self->headers_.find("Previous-Ledger"); iter != self->headers_.end())
223 {
224 previous = parseLedgerHash(iter->value());
225
226 if (!previous)
227 self->fail("Malformed handshake data (2)");
228 }
229
230 if (previous && !closed)
231 self->fail("Malformed handshake data (3)");
232
233 {
234 std::scoped_lock const sl(self->recentLock_);
235 if (closed)
236 self->closedLedgerHash_ = *closed;
237 if (previous)
238 self->previousLedgerHash_ = *previous;
239 }
240
241 if (self->inbound_)
242 {
243 self->doAccept();
244 }
245 else
246 {
247 self->doProtocolStart();
248 }
249
250 // Anything else that needs to be done with the connection should be
251 // done in doProtocolStart
252 });
253}
254
255void
257{
258 dispatch(strand_, [self = shared_from_this()]() {
259 if (!self->socket_.is_open())
260 return;
261
262 self->close();
263 });
264}
265
266//------------------------------------------------------------------------------
267
268void
270{
271 dispatch(strand_, [self = shared_from_this(), m]() {
272 if (self->gracefulClose_)
273 return;
274 if (self->detaching_)
275 return;
276 if (!self->socket_.is_open())
277 return;
278
279 auto validator = m->getValidatorKey();
280 if (validator && !self->squelch_.expireSquelch(*validator))
281 {
282 self->overlay_.reportOutboundTraffic(
284 static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
285 return;
286 }
287
288 // report categorized outgoing traffic
289 self->overlay_.reportOutboundTraffic(
290 safeCast<TrafficCount::Category>(m->getCategory()),
291 static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
292
293 // report total outgoing traffic
294 self->overlay_.reportOutboundTraffic(
296 static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
297
298 auto sendqSize = self->sendQueue_.size();
299
300 if (sendqSize < Tuning::kTargetSendQueue)
301 {
302 // To detect a peer that does not read from their
303 // side of the connection, we expect a peer to have
304 // a small sendq periodically
305 self->largeSendq_ = 0;
306 }
307 else if (
308 auto sink = self->journal_.debug();
309 sink && (sendqSize % Tuning::kSendQueueLogFreq) == 0)
310 {
311 std::string const n = self->name();
312 sink << n << " sendq: " << sendqSize;
313 }
314
315 self->sendQueue_.push(m);
316
317 if (sendqSize != 0)
318 return;
319
320 boost::asio::async_write(
321 self->stream_,
322 boost::asio::buffer(self->sendQueue_.front()->getBuffer(self->compressionEnabled_)),
323 bind_executor(
324 self->strand_,
325 std::bind(
326 &PeerImp::onWriteMessage, self, std::placeholders::_1, std::placeholders::_2)));
327 });
328}
329
330void
332{
333 dispatch(strand_, [self = shared_from_this()]() {
334 if (!self->txQueue_.empty())
335 {
336 protocol::TMHaveTransactions ht;
338 self->txQueue_, [&](auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
339 JLOG(self->pJournal_.trace()) << "sendTxQueue " << self->txQueue_.size();
340 self->txQueue_.clear();
341 self->send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
342 }
343 });
344}
345
346void
348{
349 dispatch(strand_, [self = shared_from_this(), hash]() {
350 if (self->txQueue_.size() == reduce_relay::kMaxTxQueueSize)
351 {
352 JLOG(self->pJournal_.warn()) << "addTxQueue exceeds the cap";
353 self->sendTxQueue();
354 }
355
356 self->txQueue_.insert(hash);
357 JLOG(self->pJournal_.trace()) << "addTxQueue " << self->txQueue_.size();
358 });
359}
360
361void
363{
364 dispatch(strand_, [self = shared_from_this(), hash]() {
365 auto removed = self->txQueue_.erase(hash);
366 JLOG(self->pJournal_.trace()) << "removeTxQueue " << removed;
367 });
368}
369
370void
372{
373 dispatch(strand_, [self = shared_from_this(), fee, context]() {
374 if ((self->usage_.charge(fee, context) == Resource::Disposition::Drop) &&
375 self->usage_.disconnect(self->pJournal_))
376 {
377 // Idempotent: only the first worker to observe Drop counts the
378 // metric and posts fail(). Without the guard, several queued
379 // workers can all see Drop before fail() lands on the strand,
380 // overcounting peerDisconnectsCharges_ and posting duplicate
381 // shutdowns. fail(std::string const&) self-posts to strand_
382 // when invoked off-strand.
383 bool expected = false;
384 if (self->chargeDisconnectFired_.compare_exchange_strong(
385 expected, true, std::memory_order_acq_rel))
386 {
387 self->overlay_.incPeerDisconnectCharges();
388 self->fail("charge: Resources");
389 }
390 }
391 });
392}
393
394//------------------------------------------------------------------------------
395
396bool
398{
399 auto const iter = headers_.find("Crawl");
400 if (iter == headers_.end())
401 return false;
402 return boost::iequals(iter->value(), "public");
403}
404
405bool
407{
408 return static_cast<bool>(app_.getCluster().member(publicKey_));
409}
410
413{
414 if (inbound_)
415 return headers_["User-Agent"];
416 return headers_["Server"];
417}
418
421{
423
424 ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
425 ret[jss::address] = remoteAddress_.toString();
426
427 if (inbound_)
428 ret[jss::inbound] = true;
429
430 if (cluster())
431 {
432 ret[jss::cluster] = true;
433
434 if (auto const n = name(); !n.empty())
435 {
436 // Could move here if json::Value supported moving from a string
437 ret[jss::name] = n;
438 }
439 }
440
441 if (auto const d = domain(); !d.empty())
442 ret[jss::server_domain] = std::string{d};
443
444 if (auto const nid = headers_["Network-ID"]; !nid.empty())
445 ret[jss::network_id] = std::string{nid};
446
447 ret[jss::load] = usage_.balance();
448
449 if (auto const version = getVersion(); !version.empty())
450 ret[jss::version] = std::string{version};
451
452 ret[jss::protocol] = to_string(protocol_);
453
454 {
456 if (latency_)
457 ret[jss::latency] = static_cast<json::UInt>(latency_->count());
458 }
459
460 ret[jss::uptime] =
462
463 std::uint32_t minSeq = 0, maxSeq = 0;
464 ledgerRange(minSeq, maxSeq);
465
466 if ((minSeq != 0) || (maxSeq != 0))
467 ret[jss::complete_ledgers] = std::to_string(minSeq) + " - " + std::to_string(maxSeq);
468
469 switch (tracking_.load())
470 {
472 ret[jss::track] = "diverged";
473 break;
474
476 ret[jss::track] = "unknown";
477 break;
478
480 // Nothing to do here
481 break;
482 }
483
484 uint256 closedLedgerHash;
485 protocol::TMStatusChange lastStatus;
486 {
488 closedLedgerHash = closedLedgerHash_;
489 lastStatus = lastStatus_;
490 }
491
492 if (closedLedgerHash != beast::kZero)
493 ret[jss::ledger] = to_string(closedLedgerHash);
494
495 if (lastStatus.has_newstatus())
496 {
497 switch (lastStatus.newstatus())
498 {
499 case protocol::nsCONNECTING:
500 ret[jss::status] = "connecting";
501 break;
502
503 case protocol::nsCONNECTED:
504 ret[jss::status] = "connected";
505 break;
506
507 case protocol::nsMONITORING:
508 ret[jss::status] = "monitoring";
509 break;
510
511 case protocol::nsVALIDATING:
512 ret[jss::status] = "validating";
513 break;
514
515 case protocol::nsSHUTTING:
516 ret[jss::status] = "shutting";
517 break;
518
519 default:
520 JLOG(pJournal_.warn()) << "Unknown status: " << lastStatus.newstatus();
521 }
522 }
523
524 ret[jss::metrics] = json::Value(json::ValueType::Object);
525 ret[jss::metrics][jss::total_bytes_recv] = std::to_string(metrics_.recv.totalBytes());
526 ret[jss::metrics][jss::total_bytes_sent] = std::to_string(metrics_.sent.totalBytes());
527 ret[jss::metrics][jss::avg_bps_recv] = std::to_string(metrics_.recv.averageBytes());
528 ret[jss::metrics][jss::avg_bps_sent] = std::to_string(metrics_.sent.averageBytes());
529
530 return ret;
531}
532
533bool
535{
536 switch (f)
537 {
539 return protocol_ >= makeProtocol(2, 1);
541 return protocol_ >= makeProtocol(2, 2);
544 }
545 return false;
546}
547
548//------------------------------------------------------------------------------
549
550bool
552{
553 {
555 if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
556 (tracking_.load() == Tracking::Converged))
557 return true;
559 return true;
560 }
561 return false;
562}
563
564void
566{
568
569 minSeq = minLedger_;
570 maxSeq = maxLedger_;
571}
572
573bool
574PeerImp::hasTxSet(uint256 const& hash) const
575{
577 return std::ranges::find(recentTxSets_, hash) != recentTxSets_.end();
578}
579
580void
582{
583 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
584 // guarded by recentLock_.
587 closedLedgerHash_.zero();
588}
589
590bool
592{
594 return (tracking_ != Tracking::Diverged) && (uMin >= minLedger_) && (uMax <= maxLedger_);
595}
596
597//------------------------------------------------------------------------------
598
599void
601{
602 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::PeerImp::close : strand in this thread");
603 if (!socket_.is_open())
604 return;
605
606 detaching_ = true; // DEPRECATED
607
608 cancelTimer();
609 error_code ec;
610 socket_.close(ec); // NOLINT(bugprone-unused-return-value)
611
612 overlay_.incPeerDisconnect();
613 JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed";
614}
615
616void
618{
619 dispatch(strand_, [self = shared_from_this(), reason]() {
620 if (self->journal_.active(beast::Severity::Warning) && self->socket_.is_open())
621 {
622 std::string const n = self->name();
623 JLOG(self->journal_.warn()) << n << " failed: " << reason;
624 }
625 self->close();
626 });
627}
628
629void
631{
632 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::PeerImp::fail : strand in this thread");
633 if (!socket_.is_open())
634 return;
635
636 JLOG(journal_.warn()) << name << ": " << ec.message();
637
638 close();
639}
640
641void
643{
644 XRPL_ASSERT(
645 strand_.running_in_this_thread(), "xrpl::PeerImp::gracefulClose : strand in this thread");
646 XRPL_ASSERT(socket_.is_open(), "xrpl::PeerImp::gracefulClose : socket is open");
647 XRPL_ASSERT(!gracefulClose_, "xrpl::PeerImp::gracefulClose : socket is not closing");
648 gracefulClose_ = true;
649 if (!sendQueue_.empty())
650 return;
651 setTimer();
652 stream_.async_shutdown(bind_executor(
653 strand_, std::bind(&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
654}
655
656void
658{
659 try
660 {
661 timer_.expires_after(kPeerTimerInterval);
662 }
663 catch (boost::system::system_error const& e)
664 {
665 JLOG(journal_.error()) << "setTimer: " << e.code();
666 return;
667 }
668 timer_.async_wait(bind_executor(
669 strand_, std::bind(&PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
670}
671
672// convenience for ignoring the error code
673void
675{
676 try
677 {
678 timer_.cancel();
679 }
680 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
681 {
682 // ignored
683 }
684}
685
686//------------------------------------------------------------------------------
687
690{
692 ss << "[" << fingerprint << "] ";
693 return ss.str();
694}
695
696void
698{
699 if (!socket_.is_open())
700 return;
701
702 if (ec)
703 {
704 if (ec == boost::asio::error::operation_aborted)
705 return;
706
707 // This should never happen
708 JLOG(journal_.error()) << "onTimer: " << ec.message();
709 close();
710 return;
711 }
712
714 {
715 fail("Large send queue");
716 return;
717 }
718
719 if (auto const t = tracking_.load(); !inbound_ && t != Tracking::Converged)
720 {
721 clock_type::duration duration;
722
723 {
725 duration = clock_type::now() - trackingTime_;
726 }
727
728 if ((t == Tracking::Diverged && (duration > app_.config().maxDivergedTime)) ||
729 (t == Tracking::Unknown && (duration > app_.config().maxUnknownTime)))
730 {
731 overlay_.peerFinder().onFailure(slot_);
732 fail("Not useful");
733 return;
734 }
735 }
736
737 // Already waiting for PONG
738 if (lastPingSeq_)
739 {
740 fail("Ping Timeout");
741 return;
742 }
743
746
747 protocol::TMPing message;
748 message.set_type(protocol::TMPing::ptPING);
749 message.set_seq(*lastPingSeq_);
750
751 send(std::make_shared<Message>(message, protocol::mtPING));
752
753 setTimer();
754}
755
756void
758{
759 cancelTimer();
760
761 if (ec)
762 {
763 // - eof: the stream was cleanly closed
764 // - operation_aborted: an expired timer (slow shutdown)
765 // - stream_truncated: the tcp connection closed (no handshake) it could
766 // occur if a peer does not perform a graceful disconnect
767 // - broken_pipe: the peer is gone
768 bool const shouldLog =
769 (ec != boost::asio::error::eof && ec != boost::asio::error::operation_aborted &&
770 !ec.message().contains("application data after close notify"));
771
772 if (shouldLog)
773 {
774 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
775 }
776 }
777
778 close();
779}
780
781//------------------------------------------------------------------------------
782void
784{
785 XRPL_ASSERT(readBuffer_.size() == 0, "xrpl::PeerImp::doAccept : empty read buffer");
786
787 auto const sharedValue = makeSharedValue(*streamPtr_, journal_);
788
789 // This shouldn't fail since we already computed
790 // the shared value successfully in OverlayImpl
791 if (!sharedValue)
792 {
793 fail("makeSharedValue: Unexpected failure");
794 return;
795 }
796
797 JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
798
799 if (auto member = app_.getCluster().member(publicKey_))
800 {
801 {
802 std::unique_lock const lock{nameMutex_};
803 name_ = *member;
804 }
805 JLOG(journal_.info()) << "Cluster name: " << *member;
806 }
807
808 overlay_.activate(shared_from_this());
809
810 // XXX Set timer: connection is in grace period to be useful.
811 // XXX Set timer: connection idle (idle may vary depending on connection
812 // type.)
813
815
816 boost::beast::ostream(*writeBuffer) << makeResponse(
817 !overlay_.peerFinder().config().peerPrivate,
818 request_,
819 overlay_.setup().publicIp,
820 remoteAddress_.address(),
821 *sharedValue,
822 overlay_.setup().networkID,
823 protocol_,
824 app_);
825
826 // Write the whole buffer and only start protocol when that's done.
827 boost::asio::async_write(
828 stream_,
829 writeBuffer->data(),
830 boost::asio::transfer_all(),
831 bind_executor(
832 strand_,
833 [this, writeBuffer, self = shared_from_this()](
834 error_code ec, std::size_t bytesTransferred) {
835 if (!socket_.is_open())
836 return;
837 if (ec)
838 {
839 if (ec == boost::asio::error::operation_aborted)
840 return;
841
842 fail("onWriteResponse", ec);
843 return;
844 }
845
846 if (writeBuffer->size() == bytesTransferred)
847 {
849 return;
850 }
851 fail("Failed to write header");
852 return;
853 }));
854}
855
858{
859 std::shared_lock const readLock{nameMutex_};
860 return name_;
861}
862
865{
866 return headers_["Server-Domain"];
867}
868
869//------------------------------------------------------------------------------
870
871// Protocol logic
872
873void
875{
877
878 // Send all the validator lists that have been loaded
880 {
881 app_.getValidators().forEachAvailable(
882 [&](std::string const& manifest,
883 std::uint32_t version,
885 PublicKey const& pubKey,
886 std::size_t maxSequence,
887 uint256 const& hash) {
889 *this,
890 0,
891 pubKey,
892 maxSequence,
893 version,
894 manifest,
895 blobInfos,
896 app_.getHashRouter(),
897 pJournal_);
898
899 // Don't send it next time.
900 app_.getHashRouter().addSuppressionPeer(hash, id_);
901 });
902 }
903
904 if (auto m = overlay_.getManifestsMessage())
905 send(m);
906
907 setTimer();
908}
909
910// Called repeatedly with protocol message data
911void
913{
914 if (!socket_.is_open())
915 return;
916
917 if (ec)
918 {
919 if (ec == boost::asio::error::operation_aborted)
920 return;
921
922 if (ec == boost::asio::error::eof)
923 {
924 JLOG(journal_.info()) << "EOF";
926 return;
927 }
928
929 fail("onReadMessage", ec);
930 return;
931 }
932
933 if (auto stream = journal_.trace())
934 {
935 stream << "onReadMessage: "
936 << (bytesTransferred > 0 ? to_string(bytesTransferred) + " bytes" : "");
937 }
938
939 metrics_.recv.addMessage(bytesTransferred);
940
941 readBuffer_.commit(bytesTransferred);
942
943 auto hint = Tuning::kReadBufferBytes;
944
945 while (readBuffer_.size() > 0)
946 {
947 std::size_t bytesConsumed = 0;
948
949 using namespace std::chrono_literals;
950 std::tie(bytesConsumed, ec) = perf::measureDurationAndLog(
951 [&]() { return invokeProtocolMessage(readBuffer_.data(), *this, hint); },
952 "invokeProtocolMessage",
953 350ms,
954 journal_);
955
956 if (ec)
957 {
958 fail("onReadMessage", ec);
959 return;
960 }
961
962 if (!socket_.is_open())
963 return;
964
965 if (gracefulClose_)
966 return;
967
968 if (bytesConsumed == 0)
969 break;
970 readBuffer_.consume(bytesConsumed);
971 }
972
973 // Timeout on writes only
974 stream_.async_read_some(
976 bind_executor(
977 strand_,
978 std::bind(
981 std::placeholders::_1,
982 std::placeholders::_2)));
983}
984
985void
987{
988 if (!socket_.is_open())
989 return;
990
991 if (ec)
992 {
993 if (ec == boost::asio::error::operation_aborted)
994 return;
995
996 fail("onWriteMessage", ec);
997 return;
998 }
999 if (auto stream = journal_.trace())
1000 {
1001 stream << "onWriteMessage: "
1002 << (bytesTransferred > 0 ? to_string(bytesTransferred) + " bytes" : "");
1003 }
1004
1005 metrics_.sent.addMessage(bytesTransferred);
1006
1007 XRPL_ASSERT(!sendQueue_.empty(), "xrpl::PeerImp::onWriteMessage : non-empty send buffer");
1008 sendQueue_.pop();
1009 if (!sendQueue_.empty())
1010 {
1011 // Timeout on writes only
1012 boost::asio::async_write(
1013 stream_,
1014 boost::asio::buffer(sendQueue_.front()->getBuffer(compressionEnabled_)),
1015 bind_executor(
1016 strand_,
1017 std::bind(
1020 std::placeholders::_1,
1021 std::placeholders::_2)));
1022 return;
1023 }
1024
1025 if (gracefulClose_)
1026 {
1027 stream_.async_shutdown(bind_executor(
1028 strand_, std::bind(&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
1029 return;
1030 }
1031}
1032
1033//------------------------------------------------------------------------------
1034//
1035// ProtocolHandler
1036//
1037//------------------------------------------------------------------------------
1038
1039void
1041{
1042 // TODO
1043}
1044
1045void
1047 std::uint16_t type,
1049 std::size_t size,
1050 std::size_t uncompressedSize,
1051 bool isCompressed)
1052{
1053 auto const name = protocolMessageName(type);
1054 loadEvent_ = app_.getJobQueue().makeLoadEvent(JtPeer, name);
1055 fee_ = {.fee = Resource::kFeeTrivialPeer, .context = name};
1056
1057 auto const category =
1058 TrafficCount::categorize(*m, static_cast<protocol::MessageType>(type), true);
1059
1060 // report total incoming traffic
1061 overlay_.reportInboundTraffic(TrafficCount::Category::Total, static_cast<int>(size));
1062
1063 // increase the traffic received for a specific category
1064 overlay_.reportInboundTraffic(category, static_cast<int>(size));
1065
1066 using namespace protocol;
1067 if ((type == MessageType::mtTRANSACTION || type == MessageType::mtHAVE_TRANSACTIONS ||
1068 type == MessageType::mtTRANSACTIONS ||
1069 // GET_OBJECTS
1071 // GET_LEDGER
1074 // LEDGER_DATA
1076 category == TrafficCount::Category::GlTscGet) &&
1077 (txReduceRelayEnabled() || app_.config().txReduceRelayMetrics))
1078 {
1079 overlay_.addTxMetrics(static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
1080 }
1081 JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " << uncompressedSize
1082 << " " << isCompressed;
1083}
1084
1085void
1091
1092void
1094{
1095 auto const s = m->list_size();
1096
1097 if (s == 0)
1098 {
1099 fee_.update(Resource::kFeeUselessData, "empty");
1100 return;
1101 }
1102
1103 if (s > 100)
1104 fee_.update(Resource::kFeeModerateBurdenPeer, "oversize");
1105
1106 app_.getJobQueue().addJob(JtManifest, "RcvManifests", [this, that = shared_from_this(), m]() {
1107 overlay_.onManifests(m, that);
1108 });
1109}
1110
1111void
1113{
1114 if (m->type() == protocol::TMPing::ptPING)
1115 {
1116 // We have received a ping request, reply with a pong
1117 fee_.update(Resource::kFeeModerateBurdenPeer, "ping request");
1118 m->set_type(protocol::TMPing::ptPONG);
1119 send(std::make_shared<Message>(*m, protocol::mtPING));
1120 return;
1121 }
1122
1123 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1124 {
1125 // Only reset the ping sequence if we actually received a
1126 // PONG with the correct cookie. That way, any peers which
1127 // respond with incorrect cookies will eventually time out.
1128 if (m->seq() == lastPingSeq_)
1129 {
1130 lastPingSeq_.reset();
1131
1132 // Update latency estimate
1133 auto const rtt =
1134 std::chrono::round<std::chrono::milliseconds>(clock_type::now() - lastPingTime_);
1135
1136 std::scoped_lock const sl(recentLock_);
1137
1138 if (latency_)
1139 {
1140 latency_ = (*latency_ * 7 + rtt) / 8;
1141 }
1142 else
1143 {
1144 latency_ = rtt;
1145 }
1146 }
1147
1148 return;
1149 }
1150}
1151
1152void
1154{
1155 // VFALCO NOTE I think we should drop the peer immediately
1156 if (!cluster())
1157 {
1158 fee_.update(Resource::kFeeUselessData, "unknown cluster");
1159 return;
1160 }
1161
1162 for (int i = 0; i < m->clusternodes().size(); ++i)
1163 {
1164 protocol::TMClusterNode const& node = m->clusternodes(i);
1165
1167 if (node.has_nodename())
1168 name = node.nodename();
1169
1170 auto const publicKey = parseBase58<PublicKey>(TokenType::NodePublic, node.publickey());
1171
1172 // NIKB NOTE We should drop the peer immediately if
1173 // they send us a public key we can't parse
1174 if (publicKey)
1175 {
1176 auto const reportTime = NetClock::time_point{NetClock::duration{node.reporttime()}};
1177
1178 app_.getCluster().update(*publicKey, name, node.nodeload(), reportTime);
1179 }
1180 }
1181
1182 int const loadSources = m->loadsources().size();
1183 if (loadSources != 0)
1184 {
1185 Resource::Gossip gossip;
1186 gossip.items.reserve(loadSources);
1187 for (int i = 0; i < m->loadsources().size(); ++i)
1188 {
1189 protocol::TMLoadSource const& node = m->loadsources(i);
1191 item.address = beast::IP::Endpoint::fromString(node.name());
1192 item.balance = node.cost();
1193 if (item.address != beast::IP::Endpoint())
1194 gossip.items.push_back(item);
1195 }
1196 overlay_.resourceManager().importConsumers(name(), gossip);
1197 }
1198
1199 // Calculate the cluster fee:
1200 auto const thresh = app_.getTimeKeeper().now() - 90s;
1201 std::uint32_t clusterFee = 0;
1202
1204 fees.reserve(app_.getCluster().size());
1205
1206 app_.getCluster().forEach([&fees, thresh](ClusterNode const& status) {
1207 if (status.getReportTime() >= thresh)
1208 fees.push_back(status.getLoadFee());
1209 });
1210
1211 if (!fees.empty())
1212 {
1213 auto const index = fees.size() / 2;
1214 std::nth_element(fees.begin(), fees.begin() + index, fees.end());
1215 clusterFee = fees[index];
1216 }
1217
1218 app_.getFeeTrack().setClusterFee(clusterFee);
1219}
1220
1221void
1223{
1224 // Don't allow endpoints from peers that are not known tracking or are
1225 // not using a version of the message that we support:
1226 if (tracking_.load() != Tracking::Converged || m->version() != 2)
1227 return;
1228
1229 // The number is arbitrary and doesn't have any real significance or
1230 // implication for the protocol.
1231 if (m->endpoints_v2().size() >= 1024)
1232 {
1233 fee_.update(Resource::kFeeUselessData, "endpoints too large");
1234 return;
1235 }
1236
1238 endpoints.reserve(m->endpoints_v2().size());
1239
1240 auto malformed = 0;
1241 for (auto const& tm : m->endpoints_v2())
1242 {
1243 auto result = beast::IP::Endpoint::fromStringChecked(tm.endpoint());
1244
1245 if (!result)
1246 {
1247 JLOG(pJournal_.error())
1248 << "failed to parse incoming endpoint: {" << tm.endpoint() << "}";
1249 malformed++;
1250 continue;
1251 }
1252
1253 // If hops == 0, this Endpoint describes the peer we are connected
1254 // to -- in that case, we take the remote address seen on the
1255 // socket and store that in the IP::Endpoint. If this is the first
1256 // time, then we'll verify that their listener can receive incoming
1257 // by performing a connectivity test. if hops > 0, then we just
1258 // take the address/port we were given
1259 if (tm.hops() == 0)
1260 result = remoteAddress_.atPort(result->port());
1261
1262 endpoints.emplace_back(*result, tm.hops());
1263 }
1264
1265 // Charge the peer for each malformed endpoint. As there still may be
1266 // multiple valid endpoints we don't return early.
1267 if (malformed > 0)
1268 {
1269 fee_.update(
1270 Resource::kFeeInvalidData * malformed,
1271 std::to_string(malformed) + " malformed endpoints");
1272 }
1273
1274 if (!endpoints.empty())
1275 overlay_.peerFinder().onEndpoints(slot_, endpoints);
1276}
1277
1278void
1283
1284void
1287 bool eraseTxQueue,
1288 bool batch)
1289{
1290 XRPL_ASSERT(eraseTxQueue != batch, ("xrpl::PeerImp::handleTransaction : valid inputs"));
1291 if (tracking_.load() == Tracking::Diverged)
1292 return;
1293
1294 if (app_.getOPs().isNeedNetworkLedger())
1295 {
1296 // If we've never been in synch, there's nothing we can do
1297 // with a transaction
1298 JLOG(pJournal_.debug()) << "Ignoring incoming transaction: Need network ledger";
1299 return;
1300 }
1301
1302 SerialIter sit(makeSlice(m->rawtransaction()));
1303
1304 try
1305 {
1306 auto stx = std::make_shared<STTx const>(sit);
1307 uint256 const txID = stx->getTransactionID();
1308
1309 // Charge strongly for attempting to relay a txn with tfInnerBatchTxn
1310 // LCOV_EXCL_START
1311 /*
1312 There is no need to check whether the featureBatch amendment is
1313 enabled.
1314
1315 * If the `tfInnerBatchTxn` flag is set, and the amendment is
1316 enabled, then it's an invalid transaction because inner batch
1317 transactions should not be relayed.
1318 * If the `tfInnerBatchTxn` flag is set, and the amendment is *not*
1319 enabled, then the transaction is malformed because it's using an
1320 "unknown" flag. There's no need to waste the resources to send it
1321 to the transaction engine.
1322
1323 We don't normally check transaction validity at this level, but
1324 since we _need_ to check it when the amendment is enabled, we may as
1325 well drop it if the flag is set regardless.
1326 */
1327 if (stx->isFlag(tfInnerBatchTxn))
1328 {
1329 JLOG(pJournal_.warn()) << "Ignoring Network relayed Tx containing "
1330 "tfInnerBatchTxn (handleTransaction).";
1331 fee_.update(Resource::kFeeModerateBurdenPeer, "inner batch txn");
1332 return;
1333 }
1334 // LCOV_EXCL_STOP
1335
1337 static constexpr std::chrono::seconds kTxInterval = 10s;
1338
1339 if (!app_.getHashRouter().shouldProcess(txID, id_, flags, kTxInterval))
1340 {
1341 // we have seen this transaction recently
1342 if (any(flags & HashRouterFlags::BAD))
1343 {
1344 fee_.update(Resource::kFeeUselessData, "known bad");
1345 JLOG(pJournal_.debug()) << "Ignoring known bad tx " << txID;
1346 }
1347
1348 // Erase only if the server has seen this tx. If the server has not
1349 // seen this tx then the tx could not has been queued for this peer.
1350 else if (eraseTxQueue && txReduceRelayEnabled())
1351 {
1352 removeTxQueue(txID);
1353 }
1354
1355 overlay_.reportInboundTraffic(
1357
1358 return;
1359 }
1360
1361 JLOG(pJournal_.debug()) << "Got tx " << txID;
1362
1363 bool checkSignature = true;
1364 if (cluster())
1365 {
1366 if (!m->has_deferred() || !m->deferred())
1367 {
1368 // Skip local checks if a server we trust
1369 // put the transaction in its open ledger
1370 flags |= HashRouterFlags::TRUSTED;
1371 }
1372
1373 // for non-validator nodes only -- localPublicKey is set for
1374 // validators only
1375 if (!app_.getValidationPublicKey())
1376 {
1377 // For now, be paranoid and have each validator
1378 // check each transaction, regardless of source
1379 checkSignature = false;
1380 }
1381 }
1382
1383 if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
1384 {
1385 JLOG(pJournal_.trace()) << "No new transactions until synchronized";
1386 }
1387 else if (app_.getJobQueue().getJobCount(JtTransaction) > app_.config().maxTransactions)
1388 {
1389 overlay_.incJqTransOverflow();
1390 JLOG(pJournal_.info()) << "Transaction queue is full";
1391 }
1392 else
1393 {
1394 app_.getJobQueue().addJob(
1396 "RcvCheckTx",
1398 flags,
1399 checkSignature,
1400 batch,
1401 stx]() {
1402 if (auto peer = weak.lock())
1403 peer->checkTransaction(flags, checkSignature, stx, batch);
1404 });
1405 }
1406 }
1407 catch (std::exception const& ex)
1408 {
1409 JLOG(pJournal_.warn()) << "Transaction invalid: " << strHex(m->rawtransaction())
1410 << ". Exception: " << ex.what();
1411 }
1412}
1413
1414void
1416{
1417 auto badData = [&](std::string const& msg) {
1418 fee_.update(Resource::kFeeInvalidData, "get_ledger " + msg);
1419 JLOG(pJournal_.warn()) << "TMGetLedger: " << msg;
1420 };
1421 auto const itype{m->itype()};
1422
1423 // Verify ledger info type
1424 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1425 {
1426 badData("Invalid ledger info type");
1427 return;
1428 }
1429
1430 auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
1431 if (m->has_ltype())
1432 return m->ltype();
1433 return std::nullopt;
1434 }();
1435
1436 if (itype == protocol::liTS_CANDIDATE)
1437 {
1438 if (!m->has_ledgerhash())
1439 {
1440 badData("Invalid TX candidate set, missing TX set hash");
1441 return;
1442 }
1443 }
1444 else if (
1445 !m->has_ledgerhash() && !m->has_ledgerseq() && (!ltype || *ltype != protocol::ltCLOSED))
1446 {
1447 badData("Invalid request");
1448 return;
1449 }
1450
1451 // Verify ledger type
1452 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1453 {
1454 badData("Invalid ledger type");
1455 return;
1456 }
1457
1458 // Verify ledger hash
1459 if (m->has_ledgerhash() && !stringIsUInt256Sized(m->ledgerhash()))
1460 {
1461 badData("Invalid ledger hash");
1462 return;
1463 }
1464
1465 // Verify ledger sequence
1466 if (m->has_ledgerseq())
1467 {
1468 auto const ledgerSeq{m->ledgerseq()};
1469
1470 // Check if within a reasonable range
1471 using namespace std::chrono_literals;
1472 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1473 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1474 {
1475 badData("Invalid ledger sequence " + std::to_string(ledgerSeq));
1476 return;
1477 }
1478 }
1479
1480 // Verify ledger node IDs
1481 if (itype != protocol::liBASE)
1482 {
1483 if (m->nodeids_size() <= 0)
1484 {
1485 badData("Invalid ledger node IDs");
1486 return;
1487 }
1488
1489 for (auto const& nodeId : m->nodeids())
1490 {
1491 if (deserializeSHAMapNodeID(nodeId) == std::nullopt)
1492 {
1493 badData("Invalid SHAMap node ID");
1494 return;
1495 }
1496 }
1497 }
1498
1499 // Verify query type
1500 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1501 {
1502 badData("Invalid query type");
1503 return;
1504 }
1505
1506 // Verify query depth
1507 if (m->has_querydepth())
1508 {
1509 if (m->querydepth() > Tuning::kMaxQueryDepth || itype == protocol::liBASE)
1510 {
1511 badData("Invalid query depth");
1512 return;
1513 }
1514 }
1515
1516 // Queue a job to process the request
1518 app_.getJobQueue().addJob(JtLedgerReq, "RcvGetLedger", [weak, m]() {
1519 if (auto peer = weak.lock())
1520 peer->processLedgerRequest(m);
1521 });
1522}
1523
1524void
1526{
1527 JLOG(pJournal_.trace()) << "onMessage, TMProofPathRequest";
1529 {
1530 fee_.update(Resource::kFeeMalformedRequest, "proof_path_request disabled");
1531 return;
1532 }
1533
1534 fee_.update(Resource::kFeeModerateBurdenPeer, "received a proof path request");
1536 app_.getJobQueue().addJob(JtReplayReq, "RcvProofPReq", [weak, m]() {
1537 if (auto peer = weak.lock())
1538 {
1539 auto reply = peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1540 if (reply.has_error())
1541 {
1542 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1543 {
1544 peer->charge(Resource::kFeeMalformedRequest, "proof_path_request");
1545 }
1546 else
1547 {
1548 peer->charge(Resource::kFeeRequestNoReply, "proof_path_request");
1549 }
1550 }
1551 else
1552 {
1553 peer->send(std::make_shared<Message>(reply, protocol::mtPROOF_PATH_RESPONSE));
1554 }
1555 }
1556 });
1557}
1558
1559void
1561{
1563 {
1564 fee_.update(Resource::kFeeMalformedRequest, "proof_path_response disabled");
1565 return;
1566 }
1567
1568 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1569 {
1570 fee_.update(Resource::kFeeInvalidData, "proof_path_response");
1571 }
1572}
1573
1574void
1576{
1577 JLOG(pJournal_.trace()) << "onMessage, TMReplayDeltaRequest";
1579 {
1580 fee_.update(Resource::kFeeMalformedRequest, "replay_delta_request disabled");
1581 return;
1582 }
1583
1586 app_.getJobQueue().addJob(JtReplayReq, "RcvReplDReq", [weak, m]() {
1587 if (auto peer = weak.lock())
1588 {
1589 auto reply = peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1590 if (reply.has_error())
1591 {
1592 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1593 {
1594 peer->charge(Resource::kFeeMalformedRequest, "replay_delta_request");
1595 }
1596 else
1597 {
1598 peer->charge(Resource::kFeeRequestNoReply, "replay_delta_request");
1599 }
1600 }
1601 else
1602 {
1603 peer->send(std::make_shared<Message>(reply, protocol::mtREPLAY_DELTA_RESPONSE));
1604 }
1605 }
1606 });
1607}
1608
1609void
1611{
1613 {
1614 fee_.update(Resource::kFeeMalformedRequest, "replay_delta_response disabled");
1615 return;
1616 }
1617
1618 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1619 {
1620 fee_.update(Resource::kFeeInvalidData, "replay_delta_response");
1621 }
1622}
1623
1624void
1626{
1627 auto badData = [&](std::string const& msg) {
1628 fee_.update(Resource::kFeeInvalidData, msg);
1629 JLOG(pJournal_.warn()) << "TMLedgerData: " << msg;
1630 };
1631
1632 // Verify ledger hash
1633 if (!stringIsUInt256Sized(m->ledgerhash()))
1634 {
1635 badData("Invalid ledger hash");
1636 return;
1637 }
1638
1639 // Verify ledger sequence
1640 {
1641 auto const ledgerSeq{m->ledgerseq()};
1642 if (m->type() == protocol::liTS_CANDIDATE)
1643 {
1644 if (ledgerSeq != 0)
1645 {
1646 badData("Invalid ledger sequence " + std::to_string(ledgerSeq));
1647 return;
1648 }
1649 }
1650 else
1651 {
1652 // Check if within a reasonable range
1653 using namespace std::chrono_literals;
1654 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1655 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1656 {
1657 badData("Invalid ledger sequence " + std::to_string(ledgerSeq));
1658 return;
1659 }
1660 }
1661 }
1662
1663 // Verify ledger info type
1664 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1665 {
1666 badData("Invalid ledger info type");
1667 return;
1668 }
1669
1670 // Verify reply error
1671 if (m->has_error() &&
1672 (m->error() < protocol::reNO_LEDGER || m->error() > protocol::reBAD_REQUEST))
1673 {
1674 badData("Invalid reply error");
1675 return;
1676 }
1677
1678 // Verify ledger nodes.
1679 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::kHardMaxReplyNodes)
1680 {
1681 badData("Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
1682 return;
1683 }
1684
1685 // If there is a request cookie, attempt to relay the message
1686 if (m->has_requestcookie())
1687 {
1688 if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1689 {
1690 m->clear_requestcookie();
1691 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1692 }
1693 else
1694 {
1695 JLOG(pJournal_.info()) << "Unable to route TX/ledger data reply";
1696 }
1697 return;
1698 }
1699
1700 uint256 const ledgerHash = uint256::fromRaw(m->ledgerhash());
1701
1702 // Otherwise check if received data for a candidate transaction set
1703 if (m->type() == protocol::liTS_CANDIDATE)
1704 {
1706 app_.getJobQueue().addJob(JtTxnData, "RcvPeerData", [weak, ledgerHash, m]() {
1707 if (auto peer = weak.lock())
1708 {
1709 peer->app_.getInboundTransactions().gotData(ledgerHash, peer, m);
1710 }
1711 });
1712 return;
1713 }
1714
1715 // Consume the message
1716 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1717}
1718
1719void
1721{
1722 protocol::TMProposeSet const& set = *m;
1723
1724 auto const sig = makeSlice(set.signature());
1725
1726 // Preliminary check for the validity of the signature: A DER encoded
1727 // signature can't be longer than 72 bytes.
1728 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1729 (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::Secp256k1))
1730 {
1731 JLOG(pJournal_.warn()) << "Proposal: malformed";
1732 fee_.update(Resource::kFeeInvalidSignature, " signature can't be longer than 72 bytes");
1733 return;
1734 }
1735
1736 if (!stringIsUInt256Sized(set.currenttxhash()) || !stringIsUInt256Sized(set.previousledger()))
1737 {
1738 JLOG(pJournal_.warn()) << "Proposal: malformed";
1739 fee_.update(Resource::kFeeMalformedRequest, "bad hashes");
1740 return;
1741 }
1742
1743 // RH TODO: when isTrusted = false we should probably also cache a key
1744 // suppression for 30 seconds to avoid doing a relatively expensive lookup
1745 // every time a spam packet is received
1746 PublicKey const publicKey{makeSlice(set.nodepubkey())};
1747 auto const isTrusted = app_.getValidators().trusted(publicKey);
1748
1749 // If the operator has specified that untrusted proposals be dropped then
1750 // this happens here I.e. before further wasting CPU verifying the signature
1751 // of an untrusted key
1752 if (!isTrusted)
1753 {
1754 // report untrusted proposal messages
1755 overlay_.reportInboundTraffic(
1757
1758 if (app_.config().relayUntrustedProposals == -1)
1759 return;
1760 }
1761
1762 uint256 const proposeHash = uint256::fromRaw(set.currenttxhash());
1763 uint256 const prevLedger = uint256::fromRaw(set.previousledger());
1764
1765 NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
1766
1767 uint256 const suppression = proposalUniqueId(
1768 proposeHash, prevLedger, set.proposeseq(), closeTime, publicKey.slice(), sig);
1769
1770 if (auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1771 !added)
1772 {
1773 // Count unique messages (Slots has it's own 'HashRouter'), which a peer
1774 // receives within IDLED seconds since the message has been relayed.
1775 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::kIdled)
1776 overlay_.updateSlotAndSquelch(suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1777
1778 // report duplicate proposal messages
1779 overlay_.reportInboundTraffic(
1781
1782 JLOG(pJournal_.trace()) << "Proposal: duplicate";
1783
1784 return;
1785 }
1786
1787 if (!isTrusted)
1788 {
1789 if (tracking_.load() == Tracking::Diverged)
1790 {
1791 JLOG(pJournal_.debug()) << "Proposal: Dropping untrusted (peer divergence)";
1792 return;
1793 }
1794
1795 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1796 {
1797 JLOG(pJournal_.debug()) << "Proposal: Dropping untrusted (load)";
1798 return;
1799 }
1800 }
1801
1802 JLOG(pJournal_.trace()) << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
1803
1804 auto proposal = RCLCxPeerPos(
1805 publicKey,
1806 sig,
1807 suppression,
1809 prevLedger,
1810 set.proposeseq(),
1811 proposeHash,
1812 closeTime,
1813 app_.getTimeKeeper().closeTime(),
1814 calcNodeID(app_.getValidatorManifests().getMasterKey(publicKey))});
1815
1817 app_.getJobQueue().addJob(
1818 isTrusted ? JtProposalT : JtProposalUt, "checkPropose", [weak, isTrusted, m, proposal]() {
1819 if (auto peer = weak.lock())
1820 peer->checkPropose(isTrusted, m, proposal);
1821 });
1822}
1823
1824void
1826{
1827 JLOG(pJournal_.trace()) << "Status: Change";
1828
1829 if (!m->has_networktime())
1830 m->set_networktime(app_.getTimeKeeper().now().time_since_epoch().count());
1831
1832 {
1833 std::scoped_lock const sl(recentLock_);
1834 if (!lastStatus_.has_newstatus() || m->has_newstatus())
1835 {
1836 lastStatus_ = *m;
1837 }
1838 else
1839 {
1840 // preserve old status
1841 protocol::NodeStatus const status = lastStatus_.newstatus();
1842 lastStatus_ = *m;
1843 m->set_newstatus(status);
1844 }
1845 }
1846
1847 if (m->newevent() == protocol::neLOST_SYNC)
1848 {
1849 bool outOfSync{false};
1850 {
1851 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1852 // guarded by recentLock_.
1853 std::scoped_lock const sl(recentLock_);
1854 if (!closedLedgerHash_.isZero())
1855 {
1856 outOfSync = true;
1857 closedLedgerHash_.zero();
1858 }
1859 previousLedgerHash_.zero();
1860 }
1861 if (outOfSync)
1862 {
1863 JLOG(pJournal_.debug()) << "Status: Out of sync";
1864 }
1865 return;
1866 }
1867
1868 {
1869 uint256 closedLedgerHash{};
1870 bool const peerChangedLedgers{m->has_ledgerhash() && stringIsUInt256Sized(m->ledgerhash())};
1871
1872 {
1873 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1874 // guarded by recentLock_.
1875 std::scoped_lock const sl(recentLock_);
1876 if (peerChangedLedgers)
1877 {
1878 closedLedgerHash_ = m->ledgerhash();
1879 closedLedgerHash = closedLedgerHash_;
1880 addLedger(closedLedgerHash, sl);
1881 }
1882 else
1883 {
1884 closedLedgerHash_.zero();
1885 }
1886
1887 if (m->has_ledgerhashprevious() && stringIsUInt256Sized(m->ledgerhashprevious()))
1888 {
1889 previousLedgerHash_ = m->ledgerhashprevious();
1891 }
1892 else
1893 {
1894 previousLedgerHash_.zero();
1895 }
1896 }
1897 if (peerChangedLedgers)
1898 {
1899 JLOG(pJournal_.debug()) << "LCL is " << closedLedgerHash;
1900 }
1901 else
1902 {
1903 JLOG(pJournal_.debug()) << "Status: No ledger";
1904 }
1905 }
1906
1907 if (m->has_firstseq() && m->has_lastseq())
1908 {
1909 std::scoped_lock const sl(recentLock_);
1910
1911 minLedger_ = m->firstseq();
1912 maxLedger_ = m->lastseq();
1913
1914 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1915 minLedger_ = maxLedger_ = 0;
1916 }
1917
1918 if (m->has_ledgerseq() && app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1919 {
1920 checkTracking(m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1921 }
1922
1923 app_.getOPs().pubPeerStatus([m, this]() -> json::Value {
1925
1926 if (m->has_newstatus())
1927 {
1928 switch (m->newstatus())
1929 {
1930 case protocol::nsCONNECTING:
1931 j[jss::status] = "CONNECTING";
1932 break;
1933 case protocol::nsCONNECTED:
1934 j[jss::status] = "CONNECTED";
1935 break;
1936 case protocol::nsMONITORING:
1937 j[jss::status] = "MONITORING";
1938 break;
1939 case protocol::nsVALIDATING:
1940 j[jss::status] = "VALIDATING";
1941 break;
1942 case protocol::nsSHUTTING:
1943 j[jss::status] = "SHUTTING";
1944 break;
1945 }
1946 }
1947
1948 if (m->has_newevent())
1949 {
1950 switch (m->newevent())
1951 {
1952 case protocol::neCLOSING_LEDGER:
1953 j[jss::action] = "CLOSING_LEDGER";
1954 break;
1955 case protocol::neACCEPTED_LEDGER:
1956 j[jss::action] = "ACCEPTED_LEDGER";
1957 break;
1958 case protocol::neSWITCHED_LEDGER:
1959 j[jss::action] = "SWITCHED_LEDGER";
1960 break;
1961 case protocol::neLOST_SYNC:
1962 j[jss::action] = "LOST_SYNC";
1963 break;
1964 }
1965 }
1966
1967 if (m->has_ledgerseq())
1968 {
1969 j[jss::ledger_index] = m->ledgerseq();
1970 }
1971
1972 if (m->has_ledgerhash())
1973 {
1974 uint256 closedLedgerHash{};
1975 {
1976 std::scoped_lock const sl(recentLock_);
1977 closedLedgerHash = closedLedgerHash_;
1978 }
1979 j[jss::ledger_hash] = to_string(closedLedgerHash);
1980 }
1981
1982 if (m->has_networktime())
1983 {
1984 j[jss::date] = json::UInt(m->networktime());
1985 }
1986
1987 if (m->has_firstseq() && m->has_lastseq())
1988 {
1989 j[jss::ledger_index_min] = json::UInt(m->firstseq());
1990 j[jss::ledger_index_max] = json::UInt(m->lastseq());
1991 }
1992
1993 return j;
1994 });
1995}
1996
1997void
1999{
2000 std::uint32_t serverSeq = 0;
2001 {
2002 // Extract the sequence number of the highest
2003 // ledger this peer has
2004 std::scoped_lock const sl(recentLock_);
2005
2006 serverSeq = maxLedger_;
2007 }
2008 if (serverSeq != 0)
2009 {
2010 // Compare the peer's ledger sequence to the
2011 // sequence of a recently-validated ledger
2012 checkTracking(serverSeq, validationSeq);
2013 }
2014}
2015
2016void
2018{
2019 std::uint32_t const diff = std::max(seq1, seq2) - std::min(seq1, seq2);
2020
2022 {
2023 // The peer's ledger sequence is close to the validation's
2025 }
2026
2027 if ((diff > Tuning::kDivergedLedgerLimit) && (tracking_.load() != Tracking::Diverged))
2028 {
2029 // The peer's ledger sequence is way off the validation's
2030 std::scoped_lock const sl(recentLock_);
2031
2034 }
2035}
2036
2037void
2039{
2040 if (!stringIsUInt256Sized(m->hash()))
2041 {
2042 fee_.update(Resource::kFeeMalformedRequest, "bad hash");
2043 return;
2044 }
2045
2046 uint256 const hash = uint256::fromRaw(m->hash());
2047
2048 if (m->status() == protocol::tsHAVE)
2049 {
2050 std::scoped_lock const sl(recentLock_);
2051
2052 if (std::ranges::find(recentTxSets_, hash) != recentTxSets_.end())
2053 {
2054 fee_.update(Resource::kFeeUselessData, "duplicate (tsHAVE)");
2055 return;
2056 }
2057
2058 recentTxSets_.push_back(hash);
2059 }
2060}
2061
2062void
2064 std::string const& messageType,
2065 std::string const& manifest,
2066 std::uint32_t version,
2067 std::vector<ValidatorBlobInfo> const& blobs)
2068{
2069 // If there are no blobs, the message is malformed (possibly because of
2070 // ValidatorList class rules), so charge accordingly and skip processing.
2071 if (blobs.empty())
2072 {
2073 JLOG(pJournal_.warn()) << "Ignored malformed " << messageType;
2074 // This shouldn't ever happen with a well-behaved peer
2075 fee_.update(Resource::kFeeHeavyBurdenPeer, "no blobs");
2076 return;
2077 }
2078
2079 auto const hash = sha512Half(manifest, blobs, version);
2080
2081 JLOG(pJournal_.debug()) << "Received " << messageType;
2082
2083 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2084 {
2085 JLOG(pJournal_.debug()) << messageType << ": received duplicate " << messageType;
2086 // Charging this fee here won't hurt the peer in the normal
2087 // course of operation (ie. refresh every 5 minutes), but
2088 // will add up if the peer is misbehaving.
2089 fee_.update(Resource::kFeeUselessData, "duplicate");
2090 return;
2091 }
2092
2093 auto const applyResult = app_.getValidators().applyListsAndBroadcast(
2094 manifest,
2095 version,
2096 blobs,
2097 remoteAddress_.toString(),
2098 hash,
2099 app_.getOverlay(),
2100 app_.getHashRouter(),
2101 app_.getOPs());
2102
2103 JLOG(pJournal_.debug()) << "Processed " << messageType << " version " << version << " from "
2104 << (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
2105 : "unknown or invalid publisher")
2106 << " with best result " << to_string(applyResult.bestDisposition());
2107
2108 // Act based on the best result
2109 switch (applyResult.bestDisposition())
2110 {
2111 // New list
2113 // Newest list is expired, and that needs to be broadcast, too
2115 // Future list
2118
2119 XRPL_ASSERT(
2120 applyResult.publisherKey,
2121 "xrpl::PeerImp::onValidatorListMessage : publisher key is "
2122 "set");
2123 // NOLINTNEXTLINE(bugprone-unchecked-optional-access) assert above
2124 auto const& pubKey = *applyResult.publisherKey;
2125#ifndef NDEBUG
2126 if (auto const iter = publisherListSequences_.find(pubKey);
2127 iter != publisherListSequences_.end())
2128 {
2129 XRPL_ASSERT(
2130 iter->second < applyResult.sequence,
2131 "xrpl::PeerImp::onValidatorListMessage : lower sequence");
2132 }
2133#endif
2134 publisherListSequences_[pubKey] = applyResult.sequence;
2135 }
2136 break;
2139#ifndef NDEBUG
2140 {
2142 XRPL_ASSERT(
2143 applyResult.sequence && applyResult.publisherKey,
2144 "xrpl::PeerImp::onValidatorListMessage : nonzero sequence "
2145 "and set publisher key");
2146 XRPL_ASSERT(
2147 publisherListSequences_[*applyResult.publisherKey] <= applyResult.sequence,
2148 "xrpl::PeerImp::onValidatorListMessage : maximum sequence");
2149 }
2150#endif // !NDEBUG
2151
2152 break;
2157 break;
2158 // LCOV_EXCL_START
2159 default:
2160 UNREACHABLE(
2161 "xrpl::PeerImp::onValidatorListMessage : invalid best list "
2162 "disposition");
2163 // LCOV_EXCL_STOP
2164 }
2165
2166 // Charge based on the worst result
2167 switch (applyResult.worstDisposition())
2168 {
2172 // No charges for good data
2173 break;
2176 // Charging this fee here won't hurt the peer in the normal
2177 // course of operation (ie. refresh every 5 minutes), but
2178 // will add up if the peer is misbehaving.
2179 fee_.update(Resource::kFeeUselessData, " duplicate (same_sequence or known_sequence)");
2180 break;
2182 // There are very few good reasons for a peer to send an
2183 // old list, particularly more than once.
2184 fee_.update(Resource::kFeeInvalidData, "expired");
2185 break;
2187 // Charging this fee here won't hurt the peer in the normal
2188 // course of operation (ie. refresh every 5 minutes), but
2189 // will add up if the peer is misbehaving.
2190 fee_.update(Resource::kFeeUselessData, "untrusted");
2191 break;
2193 // This shouldn't ever happen with a well-behaved peer
2194 fee_.update(Resource::kFeeInvalidSignature, "invalid list disposition");
2195 break;
2197 // During a version transition, this may be legitimate.
2198 // If it happens frequently, that's probably bad.
2199 fee_.update(Resource::kFeeInvalidData, "version");
2200 break;
2201 // LCOV_EXCL_START
2202 default:
2203 UNREACHABLE(
2204 "xrpl::PeerImp::onValidatorListMessage : invalid worst list "
2205 "disposition");
2206 // LCOV_EXCL_STOP
2207 }
2208
2209 // Log based on all the results.
2210 for (auto const& [disp, count] : applyResult.dispositions)
2211 {
2212 switch (disp)
2213 {
2214 // New list
2216 JLOG(pJournal_.debug()) << "Applied " << count << " new " << messageType;
2217 break;
2218 // Newest list is expired, and that needs to be broadcast, too
2220 JLOG(pJournal_.debug()) << "Applied " << count << " expired " << messageType;
2221 break;
2222 // Future list
2224 JLOG(pJournal_.debug()) << "Processed " << count << " future " << messageType;
2225 break;
2227 JLOG(pJournal_.warn())
2228 << "Ignored " << count << " " << messageType << "(s) with current sequence";
2229 break;
2231 JLOG(pJournal_.warn())
2232 << "Ignored " << count << " " << messageType << "(s) with future sequence";
2233 break;
2235 JLOG(pJournal_.warn()) << "Ignored " << count << "stale " << messageType;
2236 break;
2238 JLOG(pJournal_.warn()) << "Ignored " << count << " untrusted " << messageType;
2239 break;
2241 JLOG(pJournal_.warn())
2242 << "Ignored " << count << "unsupported version " << messageType;
2243 break;
2245 JLOG(pJournal_.warn()) << "Ignored " << count << "invalid " << messageType;
2246 break;
2247 // LCOV_EXCL_START
2248 default:
2249 UNREACHABLE(
2250 "xrpl::PeerImp::onValidatorListMessage : invalid list "
2251 "disposition");
2252 // LCOV_EXCL_STOP
2253 }
2254 }
2255}
2256
2257void
2259{
2260 try
2261 {
2263 {
2264 JLOG(pJournal_.debug()) << "ValidatorList: received validator list from peer using "
2265 << "protocol version " << to_string(protocol_)
2266 << " which shouldn't support this feature.";
2267 fee_.update(Resource::kFeeUselessData, "unsupported peer");
2268 return;
2269 }
2271 "ValidatorList", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2272 }
2273 catch (std::exception const& e)
2274 {
2275 JLOG(pJournal_.warn()) << "ValidatorList: Exception, " << e.what();
2276 using namespace std::string_literals;
2277 fee_.update(Resource::kFeeInvalidData, e.what());
2278 }
2279}
2280
2281void
2283{
2284 try
2285 {
2287 {
2288 JLOG(pJournal_.debug()) << "ValidatorListCollection: received validator list from peer "
2289 << "using protocol version " << to_string(protocol_)
2290 << " which shouldn't support this feature.";
2291 fee_.update(Resource::kFeeUselessData, "unsupported peer");
2292 return;
2293 }
2294 if (m->version() < 2)
2295 {
2296 JLOG(pJournal_.debug())
2297 << "ValidatorListCollection: received invalid validator list "
2298 "version "
2299 << m->version() << " from peer using protocol version " << to_string(protocol_);
2300 fee_.update(Resource::kFeeInvalidData, "wrong version");
2301 return;
2302 }
2304 "ValidatorListCollection", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2305 }
2306 catch (std::exception const& e)
2307 {
2308 JLOG(pJournal_.warn()) << "ValidatorListCollection: Exception, " << e.what();
2309 using namespace std::string_literals;
2310 fee_.update(Resource::kFeeInvalidData, e.what());
2311 }
2312}
2313
2314void
2316{
2317 if (m->validation().size() < 50)
2318 {
2319 JLOG(pJournal_.warn()) << "Validation: Too small";
2320 fee_.update(Resource::kFeeMalformedRequest, "too small");
2321 return;
2322 }
2323
2324 try
2325 {
2326 auto const closeTime = app_.getTimeKeeper().closeTime();
2327
2329 {
2330 SerialIter sit(makeSlice(m->validation()));
2332 std::ref(sit),
2333 [this](PublicKey const& pk) {
2334 return calcNodeID(app_.getValidatorManifests().getMasterKey(pk));
2335 },
2336 false);
2337 val->setSeen(closeTime);
2338 }
2339
2340 if (!isCurrent(
2341 app_.getValidations().parms(),
2342 app_.getTimeKeeper().closeTime(),
2343 val->getSignTime(),
2344 val->getSeenTime()))
2345 {
2346 JLOG(pJournal_.trace()) << "Validation: Not current";
2347 fee_.update(Resource::kFeeUselessData, "not current");
2348 return;
2349 }
2350
2351 // RH TODO: when isTrusted = false we should probably also cache a key
2352 // suppression for 30 seconds to avoid doing a relatively expensive
2353 // lookup every time a spam packet is received
2354 auto const isTrusted = app_.getValidators().trusted(val->getSignerPublic());
2355
2356 // If the operator has specified that untrusted validations be
2357 // dropped then this happens here I.e. before further wasting CPU
2358 // verifying the signature of an untrusted key
2359 if (!isTrusted)
2360 {
2361 // increase untrusted validations received
2362 overlay_.reportInboundTraffic(
2364
2365 if (app_.config().relayUntrustedValidations == -1)
2366 return;
2367 }
2368
2369 auto key = sha512Half(makeSlice(m->validation()));
2370
2371 auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2372
2373 if (!added)
2374 {
2375 // Count unique messages (Slots has it's own 'HashRouter'), which a
2376 // peer receives within IDLED seconds since the message has been
2377 // relayed.
2378 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::kIdled)
2379 {
2380 overlay_.updateSlotAndSquelch(
2381 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2382 }
2383
2384 // increase duplicate validations received
2385 overlay_.reportInboundTraffic(
2387
2388 JLOG(pJournal_.trace()) << "Validation: duplicate";
2389 return;
2390 }
2391
2392 if (!isTrusted && (tracking_.load() == Tracking::Diverged))
2393 {
2394 JLOG(pJournal_.debug()) << "Dropping untrusted validation from diverged peer";
2395 }
2396 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2397 {
2398 std::string const name = isTrusted ? "ChkTrust" : "ChkUntrust";
2399
2401 app_.getJobQueue().addJob(
2402 isTrusted ? JtValidationT : JtValidationUt, name, [weak, val, m, key]() {
2403 if (auto peer = weak.lock())
2404 peer->checkValidation(val, key, m);
2405 });
2406 }
2407 else
2408 {
2409 JLOG(pJournal_.debug()) << "Dropping untrusted validation for load";
2410 }
2411 }
2412 catch (std::exception const& e)
2413 {
2414 JLOG(pJournal_.warn()) << "Exception processing validation: " << e.what();
2415 using namespace std::string_literals;
2417 }
2418}
2419
2420void
2422{
2423 protocol::TMGetObjectByHash const& packet = *m;
2424
2425 JLOG(pJournal_.trace()) << "received TMGetObjectByHash " << packet.type() << " "
2426 << packet.objects_size();
2427
2428 if (packet.query())
2429 {
2430 // this is a query
2431 if (sendQueue_.size() >= Tuning::kDropSendQueue)
2432 {
2433 JLOG(pJournal_.debug()) << "GetObject: Large send queue";
2434 return;
2435 }
2436
2437 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2438 {
2439 doFetchPack(m);
2440 return;
2441 }
2442
2443 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2444 {
2445 if (!txReduceRelayEnabled())
2446 {
2447 JLOG(pJournal_.error()) << "TMGetObjectByHash: tx reduce-relay is disabled";
2448 fee_.update(Resource::kFeeMalformedRequest, "disabled");
2449 return;
2450 }
2451
2453 app_.getJobQueue().addJob(JtRequestedTxn, "DoTxs", [weak, m]() {
2454 if (auto peer = weak.lock())
2455 peer->doTransactions(m);
2456 });
2457 return;
2458 }
2459
2460 if (packet.has_ledgerhash())
2461 {
2462 if (!stringIsUInt256Sized(packet.ledgerhash()))
2463 {
2464 JLOG(pJournal_.debug()) << "GetObj: malformed ledgerhash from peer " << id_;
2465 fee_.update(Resource::kFeeMalformedRequest, "get object ledger hash");
2466 return;
2467 }
2468 }
2469 // Reject oversized requests before touching the NodeStore.
2470 // The legitimate upper bound (InboundLedger::getNeededHashes())
2471 // is 8 hashes; anything beyond kHardMaxReplyNodes is non-conforming.
2472 if (packet.objects_size() > Tuning::kHardMaxReplyNodes)
2473 {
2474 JLOG(pJournal_.warn())
2475 << "GetObj: oversized request from peer " << id_ << " (" << packet.objects_size()
2476 << " > " << Tuning::kHardMaxReplyNodes << ")";
2477 fee_.update(Resource::kFeeInvalidData, "oversized get object request");
2478 return;
2479 }
2480
2481 // Dispatch heavy synchronous NodeStore lookups off the peer's
2482 // I/O strand and onto the bounded job queue, mirroring the pattern
2483 // used by processLedgerRequest.
2485 bool const queued = app_.getJobQueue().addJob(JtLedgerReq, "RcvGetObjByHash", [weak, m]() {
2486 auto peer = weak.lock();
2487 if (!peer)
2488 return;
2489 try
2490 {
2491 peer->processGetObjectByHash(m);
2492 }
2493 catch (std::exception const& e)
2494 {
2495 // Surface backend failures (NodeStore I/O, allocation)
2496 // back through the resource model so a misbehaving peer
2497 // is still accountable rather than silently dropped.
2498 JLOG(peer->pJournal_.warn()) << "GetObj: handler threw: " << e.what();
2499 peer->charge(Resource::kFeeRequestNoReply, "get object handler exception");
2500 }
2501 });
2502 if (!queued)
2503 {
2504 // The JobQueue is no longer accepting new work (typically
2505 // because it is shutting down / has been joined).
2506 JLOG(pJournal_.warn()) << "GetObj: job queue refused request from peer " << id_;
2507 return;
2508 }
2509
2510 // Admission-time charge: a peer that floods enqueues would
2511 // otherwise be billed only the trivial onMessageEnd fee per
2512 // message until the JobQueue catches up, re-creating an
2513 // uncharged DoS window. Charge the base burden up-front (after
2514 // a successful enqueue); the per-lookup differential is added
2515 // in the worker.
2516 fee_.update(Resource::kFeeModerateBurdenPeer, "received a get object by hash request");
2517 }
2518 else
2519 {
2520 // this is a reply
2521 std::uint32_t pLSeq = 0;
2522 bool pLDo = true;
2523 bool progress = false;
2524
2525 for (int i = 0; i < packet.objects_size(); ++i)
2526 {
2527 protocol::TMIndexedObject const& obj = packet.objects(i);
2528
2529 if (obj.has_hash() && stringIsUInt256Sized(obj.hash()))
2530 {
2531 if (obj.has_ledgerseq())
2532 {
2533 if (obj.ledgerseq() != pLSeq)
2534 {
2535 if (pLDo && (pLSeq != 0))
2536 {
2537 JLOG(pJournal_.debug()) << "GetObj: Full fetch pack for " << pLSeq;
2538 }
2539 pLSeq = obj.ledgerseq();
2540 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2541
2542 if (!pLDo)
2543 {
2544 JLOG(pJournal_.debug()) << "GetObj: Late fetch pack for " << pLSeq;
2545 }
2546 else
2547 {
2548 progress = true;
2549 }
2550 }
2551 }
2552
2553 if (pLDo)
2554 {
2555 uint256 const hash = uint256::fromRaw(obj.hash());
2556
2557 app_.getLedgerMaster().addFetchPack(
2558 hash, std::make_shared<Blob>(obj.data().begin(), obj.data().end()));
2559 }
2560 }
2561 }
2562
2563 if (pLDo && (pLSeq != 0))
2564 {
2565 JLOG(pJournal_.debug()) << "GetObj: Partial fetch pack for " << pLSeq;
2566 }
2567 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2568 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2569 }
2570}
2571
2572void
2574{
2575 protocol::TMGetObjectByHash const& packet = *m;
2576
2577 protocol::TMGetObjectByHash reply;
2578 reply.set_query(false);
2579 reply.set_type(packet.type());
2580
2581 if (packet.has_ledgerhash())
2582 {
2583 reply.set_ledgerhash(packet.ledgerhash());
2584 }
2585
2586 // Defense in depth: caller (onMessage) already validates cheap
2587 // structural properties of the request before dispatching here:
2588 // - objects_size() <= kHardMaxReplyNodes (oversize gate)
2589 // - if has_ledgerhash() then ledgerhash is uint256-sized
2590 // The iteration cap below mirrors the oversize gate so this method
2591 // remains safe if invoked directly by tests or future callers, and
2592 // a peer cannot drive unbounded NodeStore lookups by sending
2593 // non-existent hashes.
2594 int const requested = packet.objects_size();
2595 int const iterLimit = std::min(requested, Tuning::kHardMaxReplyNodes);
2596
2597 for (int i = 0; i < iterLimit; ++i)
2598 {
2599 auto const& obj = packet.objects(i);
2600 if (!obj.has_hash() || !stringIsUInt256Sized(obj.hash()))
2601 continue;
2602
2603 uint256 const hash = uint256::fromRaw(obj.hash());
2604 // VFALCO TODO Move this someplace more sensible so we don't
2605 // need to inject the NodeStore interfaces.
2606 std::uint32_t const seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2607 auto const nodeObject = app_.getNodeStore().fetchNodeObject(hash, seq);
2608 if (!nodeObject)
2609 continue;
2610
2611 protocol::TMIndexedObject& newObj = *reply.add_objects();
2612 newObj.set_hash(hash.begin(), hash.size());
2613 auto const& data = nodeObject->getData();
2614 newObj.set_data(data.data(), data.size());
2615 if (obj.has_nodeid())
2616 newObj.set_index(obj.nodeid());
2617 if (obj.has_ledgerseq())
2618 newObj.set_ledgerseq(obj.ledgerseq());
2619 }
2620
2621 // Apply work-proportional charge. `charge()` posts the disconnect
2622 // step (if any) back to strand_, so it is safe to call from this
2623 // JobQueue worker thread.
2624 charge(
2625 // We pass `requested` directly here, instead of actual lookups done. Which could be
2626 // std::min(packet.objects_size(), static_cast<int>(Tuning::kHardMaxReplyNodes));
2627 // Because we want to charge as per the request size, to discourage large requests.
2628 computeGetObjectByHashFee(requested, reply.objects_size()),
2629 "processed get object by hash request");
2630
2631 JLOG(pJournal_.trace()) << "GetObj: " << reply.objects_size() << " of " << requested;
2632 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2633}
2634
2635void
2637{
2638 if (!txReduceRelayEnabled())
2639 {
2640 JLOG(pJournal_.error()) << "TMHaveTransactions: tx reduce-relay is disabled";
2641 fee_.update(Resource::kFeeMalformedRequest, "disabled");
2642 return;
2643 }
2644
2646 app_.getJobQueue().addJob(JtMissingTxn, "HandleHaveTxs", [weak, m]() {
2647 if (auto peer = weak.lock())
2648 peer->handleHaveTransactions(m);
2649 });
2650}
2651
2652void
2654{
2655 protocol::TMGetObjectByHash tmBH;
2656 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2657 tmBH.set_query(true);
2658
2659 JLOG(pJournal_.trace()) << "received TMHaveTransactions " << m->hashes_size();
2660
2661 for (std::uint32_t i = 0; i < m->hashes_size(); i++)
2662 {
2663 if (!stringIsUInt256Sized(m->hashes(i)))
2664 {
2665 JLOG(pJournal_.error()) << "TMHaveTransactions with invalid hash size";
2666 fee_.update(Resource::kFeeMalformedRequest, "hash size");
2667 return;
2668 }
2669
2670 uint256 hash = uint256::fromRaw(m->hashes(i));
2671
2672 auto txn = app_.getMasterTransaction().fetchFromCache(hash);
2673
2674 JLOG(pJournal_.trace()) << "checking transaction " << (bool)txn;
2675
2676 if (!txn)
2677 {
2678 JLOG(pJournal_.debug()) << "adding transaction to request";
2679
2680 auto obj = tmBH.add_objects();
2681 obj->set_hash(hash.data(), hash.size());
2682 }
2683 else
2684 {
2685 // Erase only if a peer has seen this tx. If the peer has not
2686 // seen this tx then the tx could not has been queued for this
2687 // peer.
2688 removeTxQueue(hash);
2689 }
2690 }
2691
2692 JLOG(pJournal_.trace()) << "transaction request object is " << tmBH.objects_size();
2693
2694 if (tmBH.objects_size() > 0)
2695 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2696}
2697
2698void
2700{
2701 if (!txReduceRelayEnabled())
2702 {
2703 JLOG(pJournal_.error()) << "TMTransactions: tx reduce-relay is disabled";
2704 fee_.update(Resource::kFeeMalformedRequest, "disabled");
2705 return;
2706 }
2707
2708 JLOG(pJournal_.trace()) << "received TMTransactions " << m->transactions_size();
2709
2710 overlay_.addTxMetrics(m->transactions_size());
2711
2712 for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
2713 {
2716 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2717 false,
2718 true);
2719 }
2720}
2721
2722void
2724{
2725 dispatch(strand_, [self = shared_from_this(), m]() {
2726 if (!m->has_validatorpubkey())
2727 {
2728 self->fee_.update(Resource::kFeeInvalidData, "squelch no pubkey");
2729 return;
2730 }
2731 auto validator = m->validatorpubkey();
2732 auto const slice{makeSlice(validator)};
2733 if (!publicKeyType(slice))
2734 {
2735 self->fee_.update(Resource::kFeeInvalidData, "squelch bad pubkey");
2736 return;
2737 }
2738 PublicKey const key(slice);
2739
2740 // Ignore the squelch for validator's own messages.
2741 if (key == self->app_.getValidationPublicKey())
2742 {
2743 JLOG(self->pJournal_.debug())
2744 << "onMessage: TMSquelch discarding validator's squelch " << slice;
2745 return;
2746 }
2747
2748 std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
2749 if (!m->squelch())
2750 {
2751 self->squelch_.removeSquelch(key);
2752 }
2753 else if (!self->squelch_.addSquelch(key, std::chrono::seconds{duration}))
2754 {
2755 self->fee_.update(Resource::kFeeInvalidData, "squelch duration");
2756 }
2757
2758 JLOG(self->pJournal_.debug())
2759 << "onMessage: TMSquelch " << slice << " " << self->id() << " " << duration;
2760 });
2761}
2762
2763//--------------------------------------------------------------------------
2764
2765void
2766PeerImp::addLedger(uint256 const& hash, std::scoped_lock<std::mutex> const& lockedRecentLock)
2767{
2768 // lockedRecentLock is passed as a reminder that recentLock_ must be
2769 // locked by the caller.
2770 (void)lockedRecentLock;
2771
2773 return;
2774
2775 recentLedgers_.push_back(hash);
2776}
2777
2778void
2780{
2781 // VFALCO TODO Invert this dependency using an observer and shared state
2782 // object. Don't queue fetch pack jobs if we're under load or we already
2783 // have some queued.
2784 if (app_.getFeeTrack().isLoadedLocal() ||
2785 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2786 (app_.getJobQueue().getJobCount(JtPack) > 10))
2787 {
2788 JLOG(pJournal_.info()) << "Too busy to make fetch pack";
2789 return;
2790 }
2791
2792 if (!stringIsUInt256Sized(packet->ledgerhash()))
2793 {
2794 JLOG(pJournal_.warn()) << "FetchPack hash size malformed";
2795 fee_.update(Resource::kFeeMalformedRequest, "hash size");
2796 return;
2797 }
2798
2800
2801 uint256 const hash = uint256::fromRaw(packet->ledgerhash());
2802
2804 auto elapsed = UptimeClock::now();
2805 auto const pap = &app_;
2806 app_.getJobQueue().addJob(JtPack, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2807 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2808 });
2809}
2810
2811void
2813{
2814 protocol::TMTransactions reply;
2815
2816 JLOG(pJournal_.trace()) << "received TMGetObjectByHash requesting tx "
2817 << packet->objects_size();
2818
2819 if (packet->objects_size() > reduce_relay::kMaxTxQueueSize)
2820 {
2821 JLOG(pJournal_.error()) << "doTransactions, invalid number of hashes";
2822 fee_.update(Resource::kFeeMalformedRequest, "too big");
2823 return;
2824 }
2825
2826 for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
2827 {
2828 auto const& obj = packet->objects(i);
2829
2830 if (!stringIsUInt256Sized(obj.hash()))
2831 {
2832 fee_.update(Resource::kFeeMalformedRequest, "hash size");
2833 return;
2834 }
2835
2836 uint256 hash = uint256::fromRaw(obj.hash());
2837
2838 auto txn = app_.getMasterTransaction().fetchFromCache(hash);
2839
2840 if (!txn)
2841 {
2842 JLOG(pJournal_.error())
2843 << "doTransactions, transaction not found " << Slice(hash.data(), hash.size());
2844 fee_.update(Resource::kFeeMalformedRequest, "tx not found");
2845 return;
2846 }
2847
2848 Serializer s;
2849 auto tx = reply.add_transactions();
2850 auto sttx = txn->getSTransaction();
2851 sttx->add(s);
2852 tx->set_rawtransaction(s.data(), s.size());
2853 tx->set_status(
2854 txn->getStatus() == TransStatus::INCLUDED ? protocol::tsCURRENT : protocol::tsNEW);
2855 tx->set_receivetimestamp(app_.getTimeKeeper().now().time_since_epoch().count());
2856 tx->set_deferred(txn->getSubmitResult().queued);
2857 }
2858
2859 if (reply.transactions_size() > 0)
2860 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2861}
2862
2863void
2865 HashRouterFlags flags,
2866 bool checkSignature,
2867 std::shared_ptr<STTx const> const& stx,
2868 bool batch)
2869{
2870 // VFALCO TODO Rewrite to not use exceptions
2871 try
2872 {
2873 // charge strongly for relaying batch txns
2874 // LCOV_EXCL_START
2875 /*
2876 There is no need to check whether the featureBatch amendment is
2877 enabled.
2878
2879 * If the `tfInnerBatchTxn` flag is set, and the amendment is
2880 enabled, then it's an invalid transaction because inner batch
2881 transactions should not be relayed.
2882 * If the `tfInnerBatchTxn` flag is set, and the amendment is *not*
2883 enabled, then the transaction is malformed because it's using an
2884 "unknown" flag. There's no need to waste the resources to send it
2885 to the transaction engine.
2886
2887 We don't normally check transaction validity at this level, but
2888 since we _need_ to check it when the amendment is enabled, we may as
2889 well drop it if the flag is set regardless.
2890 */
2891 if (stx->isFlag(tfInnerBatchTxn))
2892 {
2893 JLOG(pJournal_.warn()) << "Ignoring Network relayed Tx containing "
2894 "tfInnerBatchTxn (checkSignature).";
2895 charge(Resource::kFeeModerateBurdenPeer, "inner batch txn");
2896 return;
2897 }
2898 // LCOV_EXCL_STOP
2899
2900 // Expired?
2901 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2902 (stx->getFieldU32(sfLastLedgerSequence) < app_.getLedgerMaster().getValidLedgerIndex()))
2903 {
2904 JLOG(pJournal_.info()) << "Marking transaction " << stx->getTransactionID()
2905 << "as BAD because it's expired";
2906 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2907 charge(Resource::kFeeUselessData, "expired tx");
2908 return;
2909 }
2910
2911 if (isPseudoTx(*stx))
2912 {
2913 // Don't do anything with pseudo transactions except put them in the
2914 // TransactionMaster cache
2915 std::string reason;
2916 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2917 XRPL_ASSERT(
2918 tx->getStatus() == TransStatus::NEW,
2919 "xrpl::PeerImp::checkTransaction Transaction created "
2920 "correctly");
2921 if (tx->getStatus() == TransStatus::NEW)
2922 {
2923 JLOG(pJournal_.debug()) << "Processing " << (batch ? "batch" : "unsolicited")
2924 << " pseudo-transaction tx " << tx->getID();
2925
2926 app_.getMasterTransaction().canonicalize(&tx);
2927 // Tell the overlay about it, but don't relay it.
2928 auto const toSkip = app_.getHashRouter().shouldRelay(tx->getID());
2929 if (toSkip)
2930 {
2931 JLOG(pJournal_.debug())
2932 << "Passing skipped pseudo pseudo-transaction tx " << tx->getID();
2933 app_.getOverlay().relay(tx->getID(), {}, *toSkip);
2934 }
2935 if (!batch)
2936 {
2937 JLOG(pJournal_.debug()) << "Charging for pseudo-transaction tx " << tx->getID();
2938 charge(Resource::kFeeUselessData, "pseudo tx");
2939 }
2940
2941 return;
2942 }
2943 }
2944
2945 if (checkSignature)
2946 {
2947 // Check the signature before handing off to the job queue.
2948 if (auto [valid, validReason] = checkValidity(
2949 app_.getHashRouter(), *stx, app_.getLedgerMaster().getValidatedRules());
2951 {
2952 if (!validReason.empty())
2953 {
2954 JLOG(pJournal_.debug()) << "Exception checking transaction: " << validReason;
2955 }
2956
2957 // Probably not necessary to set HashRouterFlags::BAD, but
2958 // doesn't hurt.
2959 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2960 charge(Resource::kFeeInvalidSignature, "check transaction signature failure");
2961 return;
2962 }
2963 }
2964 else
2965 {
2966 forceValidity(app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2967 }
2968
2969 std::string reason;
2970 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2971
2972 if (tx->getStatus() == TransStatus::INVALID)
2973 {
2974 if (!reason.empty())
2975 {
2976 JLOG(pJournal_.debug()) << "Exception checking transaction: " << reason;
2977 }
2978 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2979 charge(Resource::kFeeInvalidSignature, "tx (impossible)");
2980 return;
2981 }
2982
2983 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
2984 app_.getOPs().processTransaction(tx, trusted, false, NetworkOPs::FailHard::No);
2985 }
2986 catch (std::exception const& ex)
2987 {
2988 JLOG(pJournal_.warn()) << "Exception in " << __func__ << ": " << ex.what();
2989 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2990 using namespace std::string_literals;
2991 charge(Resource::kFeeInvalidData, "tx "s + ex.what());
2992 }
2993}
2994
2995// Called from our JobQueue
2996void
2998 bool isTrusted,
3000 RCLCxPeerPos peerPos)
3001{
3002 JLOG(pJournal_.trace()) << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal";
3003
3004 XRPL_ASSERT(packet, "xrpl::PeerImp::checkPropose : non-null packet");
3005
3006 if (!cluster() && !peerPos.checkSign())
3007 {
3008 std::string const desc{"Proposal fails sig check"};
3009 JLOG(pJournal_.warn()) << desc;
3011 return;
3012 }
3013
3014 bool relay = false;
3015
3016 if (isTrusted)
3017 {
3018 relay = app_.getOPs().processTrustedProposal(peerPos);
3019 }
3020 else
3021 {
3022 relay = app_.config().relayUntrustedProposals == 1 || cluster();
3023 }
3024
3025 if (relay)
3026 {
3027 // haveMessage contains peers, which are suppressed; i.e. the peers
3028 // are the source of the message, consequently the message should
3029 // not be relayed to these peers. But the message must be counted
3030 // as part of the squelch logic.
3031 auto haveMessage =
3032 app_.getOverlay().relay(*packet, peerPos.suppressionID(), peerPos.publicKey());
3033 if (!haveMessage.empty())
3034 {
3035 overlay_.updateSlotAndSquelch(
3036 peerPos.suppressionID(),
3037 peerPos.publicKey(),
3038 std::move(haveMessage),
3039 protocol::mtPROPOSE_LEDGER);
3040 }
3041 }
3042}
3043
3044void
3047 uint256 const& key,
3049{
3050 if (!val->isValid())
3051 {
3052 std::string const desc{"Validation forwarded by peer is invalid"};
3053 JLOG(pJournal_.debug()) << desc;
3055 return;
3056 }
3057
3058 // FIXME it should be safe to remove this try/catch. Investigate codepaths.
3059 try
3060 {
3061 if (app_.getOPs().recvValidation(val, std::to_string(id())) || cluster())
3062 {
3063 // haveMessage contains peers, which are suppressed; i.e. the peers
3064 // are the source of the message, consequently the message should
3065 // not be relayed to these peers. But the message must be counted
3066 // as part of the squelch logic.
3067 auto haveMessage = overlay_.relay(*packet, key, val->getSignerPublic());
3068 if (!haveMessage.empty())
3069 {
3070 overlay_.updateSlotAndSquelch(
3071 key, val->getSignerPublic(), std::move(haveMessage), protocol::mtVALIDATION);
3072 }
3073 }
3074 }
3075 catch (std::exception const& ex)
3076 {
3077 JLOG(pJournal_.trace()) << "Exception processing validation: " << ex.what();
3078 using namespace std::string_literals;
3079 charge(Resource::kFeeMalformedRequest, "validation "s + ex.what());
3080 }
3081}
3082
3083// Returns the set of peers that can help us get
3084// the TX tree with the specified root hash.
3085//
3087getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
3088{
3090 int retScore = 0;
3091
3092 ov.forEach([&](std::shared_ptr<PeerImp>&& p) {
3093 if (p->hasTxSet(rootHash) && p.get() != skip)
3094 {
3095 auto score = p->getScore(true);
3096 if (!ret || (score > retScore))
3097 {
3098 ret = std::move(p);
3099 retScore = score;
3100 }
3101 }
3102 });
3103
3104 return ret;
3105}
3106
3107// Returns a random peer weighted by how likely to
3108// have the ledger and how responsive it is.
3109//
3112 OverlayImpl& ov,
3113 uint256 const& ledgerHash,
3114 LedgerIndex ledger,
3115 PeerImp const* skip)
3116{
3118 int retScore = 0;
3119
3120 ov.forEach([&](std::shared_ptr<PeerImp>&& p) {
3121 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3122 {
3123 auto score = p->getScore(true);
3124 if (!ret || (score > retScore))
3125 {
3126 ret = std::move(p);
3127 retScore = score;
3128 }
3129 }
3130 });
3131
3132 return ret;
3133}
3134
3135void
3137 std::shared_ptr<Ledger const> const& ledger,
3138 protocol::TMLedgerData& ledgerData)
3139{
3140 JLOG(pJournal_.trace()) << "sendLedgerBase: Base data";
3141
3142 Serializer s(sizeof(LedgerHeader));
3143 addRaw(ledger->header(), s);
3144 ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
3145
3146 auto const& stateMap{ledger->stateMap()};
3147 if (stateMap.getHash() != beast::kZero)
3148 {
3149 // Return account state root node if possible
3150 Serializer root(768);
3151
3152 stateMap.serializeRoot(root);
3153 ledgerData.add_nodes()->set_nodedata(root.getDataPtr(), root.getLength());
3154
3155 if (ledger->header().txHash != beast::kZero)
3156 {
3157 auto const& txMap{ledger->txMap()};
3158 if (txMap.getHash() != beast::kZero)
3159 {
3160 // Return TX root node if possible
3161 root.erase();
3162 txMap.serializeRoot(root);
3163 ledgerData.add_nodes()->set_nodedata(root.getDataPtr(), root.getLength());
3164 }
3165 }
3166 }
3167
3168 auto message{std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3169 send(message);
3170}
3171
3174{
3175 JLOG(pJournal_.trace()) << "getLedger: Ledger";
3176
3178
3179 if (m->has_ledgerhash())
3180 {
3181 // Attempt to find ledger by hash
3182 uint256 const ledgerHash = uint256::fromRaw(m->ledgerhash());
3183 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3184 if (!ledger)
3185 {
3186 JLOG(pJournal_.trace()) << "getLedger: Don't have ledger with hash " << ledgerHash;
3187
3188 if (m->has_querytype() && !m->has_requestcookie())
3189 {
3190 // Attempt to relay the request to a peer
3191 if (auto const peer = getPeerWithLedger(
3192 overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0, this))
3193 {
3194 m->set_requestcookie(id());
3195 peer->send(std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3196 JLOG(pJournal_.debug()) << "getLedger: Request relayed to peer";
3197 return ledger;
3198 }
3199
3200 JLOG(pJournal_.trace()) << "getLedger: Failed to find peer to relay request";
3201 }
3202 }
3203 }
3204 else if (m->has_ledgerseq())
3205 {
3206 // Attempt to find ledger by sequence
3207 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3208 {
3209 JLOG(pJournal_.debug()) << "getLedger: Early ledger sequence request";
3210 }
3211 else
3212 {
3213 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3214 if (!ledger)
3215 {
3216 JLOG(pJournal_.debug())
3217 << "getLedger: Don't have ledger with sequence " << m->ledgerseq();
3218 }
3219 }
3220 }
3221 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3222 {
3223 ledger = app_.getLedgerMaster().getClosedLedger();
3224 }
3225
3226 if (ledger)
3227 {
3228 // Validate retrieved ledger sequence
3229 auto const ledgerSeq{ledger->header().seq};
3230 if (m->has_ledgerseq())
3231 {
3232 if (ledgerSeq != m->ledgerseq())
3233 {
3234 // Do not resource charge a peer responding to a relay
3235 if (!m->has_requestcookie())
3236 charge(Resource::kFeeMalformedRequest, "get_ledger ledgerSeq");
3237
3238 ledger.reset();
3239 JLOG(pJournal_.warn()) << "getLedger: Invalid ledger sequence " << ledgerSeq;
3240 }
3241 }
3242 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3243 {
3244 ledger.reset();
3245 JLOG(pJournal_.debug()) << "getLedger: Early ledger sequence request " << ledgerSeq;
3246 }
3247 }
3248 else
3249 {
3250 JLOG(pJournal_.debug()) << "getLedger: Unable to find ledger";
3251 }
3252
3253 return ledger;
3254}
3255
3258{
3259 JLOG(pJournal_.trace()) << "getTxSet: TX set";
3260
3261 uint256 const txSetHash = uint256::fromRaw(m->ledgerhash());
3262 std::shared_ptr<SHAMap> shaMap{app_.getInboundTransactions().getSet(txSetHash, false)};
3263 if (!shaMap)
3264 {
3265 if (m->has_querytype() && !m->has_requestcookie())
3266 {
3267 // Attempt to relay the request to a peer
3268 if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
3269 {
3270 m->set_requestcookie(id());
3271 peer->send(std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3272 JLOG(pJournal_.debug()) << "getTxSet: Request relayed";
3273 }
3274 else
3275 {
3276 JLOG(pJournal_.debug()) << "getTxSet: Failed to find relay peer";
3277 }
3278 }
3279 else
3280 {
3281 JLOG(pJournal_.debug()) << "getTxSet: Failed to find TX set";
3282 }
3283 }
3284
3285 return shaMap;
3286}
3287
3288void
3290{
3291 // Do not resource charge a peer responding to a relay
3292 if (!m->has_requestcookie())
3293 charge(Resource::kFeeModerateBurdenPeer, "received a get ledger request");
3294
3297 SHAMap const* map{nullptr};
3298 protocol::TMLedgerData ledgerData;
3299 bool fatLeaves{true};
3300 auto const itype{m->itype()};
3301
3302 if (itype == protocol::liTS_CANDIDATE)
3303 {
3304 if (sharedMap = getTxSet(m); !sharedMap)
3305 return;
3306 map = sharedMap.get();
3307
3308 // Fill out the reply
3309 ledgerData.set_ledgerseq(0);
3310 ledgerData.set_ledgerhash(m->ledgerhash());
3311 ledgerData.set_type(protocol::liTS_CANDIDATE);
3312 if (m->has_requestcookie())
3313 ledgerData.set_requestcookie(m->requestcookie());
3314
3315 // We'll already have most transactions
3316 fatLeaves = false;
3317 }
3318 else
3319 {
3320 if (sendQueue_.size() >= Tuning::kDropSendQueue)
3321 {
3322 JLOG(pJournal_.debug()) << "processLedgerRequest: Large send queue";
3323 return;
3324 }
3325 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3326 {
3327 JLOG(pJournal_.debug()) << "processLedgerRequest: Too busy";
3328 return;
3329 }
3330
3331 if (ledger = getLedger(m); !ledger)
3332 return;
3333
3334 // Fill out the reply
3335 auto const ledgerHash{ledger->header().hash};
3336 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3337 ledgerData.set_ledgerseq(ledger->header().seq);
3338 ledgerData.set_type(itype);
3339 if (m->has_requestcookie())
3340 ledgerData.set_requestcookie(m->requestcookie());
3341
3342 switch (itype)
3343 {
3344 case protocol::liBASE:
3345 sendLedgerBase(ledger, ledgerData);
3346 return;
3347
3348 case protocol::liTX_NODE:
3349 map = &ledger->txMap();
3350 JLOG(pJournal_.trace())
3351 << "processLedgerRequest: TX map hash " << to_string(map->getHash());
3352 break;
3353
3354 case protocol::liAS_NODE:
3355 map = &ledger->stateMap();
3356 JLOG(pJournal_.trace())
3357 << "processLedgerRequest: Account state map hash " << to_string(map->getHash());
3358 break;
3359
3360 default:
3361 // This case should not be possible here
3362 JLOG(pJournal_.error()) << "processLedgerRequest: Invalid ledger info type";
3363 return;
3364 }
3365 }
3366
3367 if (map == nullptr)
3368 {
3369 JLOG(pJournal_.warn()) << "processLedgerRequest: Unable to find map";
3370 return;
3371 }
3372
3373 // Add requested node data to reply
3374 if (m->nodeids_size() > 0)
3375 {
3376 std::uint32_t const defaultDepth = isHighLatency() ? 2 : 1;
3377 auto const queryDepth{m->has_querydepth() ? m->querydepth() : defaultDepth};
3378
3380
3381 for (int i = 0;
3382 i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::kSoftMaxReplyNodes;
3383 ++i)
3384 {
3385 auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
3386
3387 data.clear();
3388 data.reserve(Tuning::kSoftMaxReplyNodes);
3389
3390 try
3391 {
3392 // NOLINTNEXTLINE(bugprone-unchecked-optional-access) nodeids checked in onGetLedger
3393 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3394 {
3395 JLOG(pJournal_.trace())
3396 << "processLedgerRequest: getNodeFat got " << data.size() << " nodes";
3397
3398 for (auto const& d : data)
3399 {
3400 if (ledgerData.nodes_size() >= Tuning::kHardMaxReplyNodes)
3401 break;
3402 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3403 node->set_nodeid(d.first.getRawString());
3404 node->set_nodedata(d.second.data(), d.second.size());
3405 }
3406 }
3407 else
3408 {
3409 JLOG(pJournal_.warn()) << "processLedgerRequest: getNodeFat returns false";
3410 }
3411 }
3412 catch (std::exception const& e)
3413 {
3414 std::string info;
3415 switch (itype)
3416 {
3417 case protocol::liBASE:
3418 // This case should not be possible here
3419 info = "Ledger base";
3420 break;
3421
3422 case protocol::liTX_NODE:
3423 info = "TX node";
3424 break;
3425
3426 case protocol::liAS_NODE:
3427 info = "AS node";
3428 break;
3429
3430 case protocol::liTS_CANDIDATE:
3431 info = "TS candidate";
3432 break;
3433
3434 default:
3435 info = "Invalid";
3436 break;
3437 }
3438
3439 if (!m->has_ledgerhash())
3440 info += ", no hash specified";
3441
3442 JLOG(pJournal_.warn())
3443 << "processLedgerRequest: getNodeFat with nodeId " << *shaMapNodeId
3444 << " and ledger info type " << info << " throws exception: " << e.what();
3445 }
3446 }
3447
3448 JLOG(pJournal_.info()) << "processLedgerRequest: Got request for " << m->nodeids_size()
3449 << " nodes at depth " << queryDepth << ", return "
3450 << ledgerData.nodes_size() << " nodes";
3451 }
3452
3453 if (ledgerData.nodes_size() == 0)
3454 return;
3455
3456 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3457}
3458
3459// Differential pricing helper. Returns only the *dynamic* component
3460// of the per-message charge — the base `kFeeModerateBurdenPeer` is
3461// applied at admission time in `onMessage(TMGetObjectByHash)` so a
3462// high traffic client pays for the message regardless of when (or
3463// whether) the worker runs.
3464//
3465// Dynamic charge model:
3466//
3467// billable = max(0, requested - kFreeObjectsPerRequest)
3468// missed = max(0, requested - found)
3469// billableMisses = min(missed, billable) // misses billed first
3470// billableHits = billable - billableMisses
3471// sizeBand = (requested > kBandMediumMax) ? kCostBandLarge
3472// : (requested > kBandSmallMax) ? kCostBandMedium
3473// : kCostBandSmall
3474// dynamic = billableHits * kCostPerLookupHit
3475// + billableMisses * kCostPerLookupMiss
3476// + sizeBand
3477//
3478// Misses are billed first against the billable budget because a node store
3479// seek dominates a cache hit and because invalid hashes are ~100% miss by construction.
3481PeerImp::computeGetObjectByHashFee(int const requested, int const found)
3482{
3483 int const billable = std::max(0, requested - static_cast<int>(Tuning::kFreeObjectsPerRequest));
3484 // Clamp `missed` so a future caller passing found > requested cannot
3485 // produce a negative value that flips the hits/misses split.
3486 int const missed = std::max(0, requested - found);
3487 int const billableMisses = std::min(missed, billable);
3488 int const billableHits = billable - billableMisses;
3489
3490 int sizeBand = Tuning::kCostBandSmall;
3491 if (requested > Tuning::kBandMediumMax)
3492 {
3493 sizeBand = Tuning::kCostBandLarge;
3494 }
3495 else if (requested > Tuning::kBandSmallMax)
3496 {
3497 sizeBand = Tuning::kCostBandMedium;
3498 }
3499
3500 int const dynamic = (billableHits * Tuning::kCostPerLookupHit) +
3501 (billableMisses * Tuning::kCostPerLookupMiss) + sizeBand;
3502
3503 return Resource::Charge(dynamic, "GetObject differential");
3504}
3505
3506int
3507PeerImp::getScore(bool haveItem) const
3508{
3509 // Random component of score, used to break ties and avoid
3510 // overloading the "best" peer
3511 static int const kSpRandomMax = 9999;
3512
3513 // Score for being very likely to have the thing we are
3514 // look for; should be roughly spRandomMax
3515 static int const kSpHaveItem = 10000;
3516
3517 // Score reduction for each millisecond of latency; should
3518 // be roughly spRandomMax divided by the maximum reasonable
3519 // latency
3520 static int const kSpLatency = 30;
3521
3522 // Penalty for unknown latency; should be roughly spRandomMax
3523 static int const kSpNoLatency = 8000;
3524
3525 int score = randInt(kSpRandomMax);
3526
3527 if (haveItem)
3528 score += kSpHaveItem;
3529
3531 {
3532 std::scoped_lock const sl(recentLock_);
3533 latency = latency_;
3534 }
3535
3536 if (latency)
3537 {
3538 score -= latency->count() * kSpLatency;
3539 }
3540 else
3541 {
3542 score -= kSpNoLatency;
3543 }
3544
3545 return score;
3546}
3547
3548bool
3550{
3551 std::scoped_lock const sl(recentLock_);
3552 return latency_ >= kPeerHighLatency;
3553}
3554
3555void
3557{
3558 using namespace std::chrono_literals;
3559 std::unique_lock const lock{mutex_};
3560
3561 totalBytes_ += bytes;
3562 accumBytes_ += bytes;
3563 auto const timeElapsed = clock_type::now() - intervalStart_;
3564 auto const timeElapsedInSecs = std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3565
3566 if (timeElapsedInSecs >= 1s)
3567 {
3568 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3569 rollingAvg_.push_back(avgBytes);
3570
3571 auto const totalBytes = std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3573
3575 accumBytes_ = 0;
3576 }
3577}
3578
3581{
3582 std::shared_lock const lock{mutex_};
3583 return rollingAvgBytes_;
3584}
3585
3588{
3589 std::shared_lock const lock{mutex_};
3590 return totalBytes_;
3591}
3592
3593} // namespace xrpl
T accumulate(T... args)
T begin(T... args)
T bind(T... args)
T clamp(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:17
static std::optional< Endpoint > fromStringChecked(std::string const &s)
Create an Endpoint from a string.
static Endpoint fromString(std::string const &s)
Represents a JSON value.
Definition json_value.h:130
static BaseUInt fromRaw(Container const &c)
Definition base_uint.h:294
pointer data()
Definition base_uint.h:106
iterator begin()
Definition base_uint.h:117
static constexpr std::size_t size()
Definition base_uint.h:530
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:48
std::chrono::time_point< NetClock > time_point
Definition chrono.h:46
std::chrono::duration< rep, period > duration
Definition chrono.h:45
Child(OverlayImpl &overlay)
void forEach(UnaryFunc &&f) const
std::shared_mutex mutex_
Definition PeerImp.h:202
std::uint64_t rollingAvgBytes_
Definition PeerImp.h:207
clock_type::time_point intervalStart_
Definition PeerImp.h:204
boost::circular_buffer< std::uint64_t > rollingAvg_
Definition PeerImp.h:203
void addMessage(std::uint64_t bytes)
Definition PeerImp.cpp:3556
std::uint64_t averageBytes() const
Definition PeerImp.cpp:3580
std::uint64_t totalBytes() const
Definition PeerImp.cpp:3587
std::uint64_t accumBytes_
Definition PeerImp.h:206
std::uint64_t totalBytes_
Definition PeerImp.h:205
void checkTracking(std::uint32_t validationSeq)
Check if the peer is tracking.
Definition PeerImp.cpp:1998
void onTimer(boost::system::error_code const &ec)
Definition PeerImp.cpp:697
std::optional< std::chrono::milliseconds > latency_
Definition PeerImp.h:94
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
Definition PeerImp.cpp:347
std::string getVersion() const
Return the version of xrpld that the peer is running, if reported.
Definition PeerImp.cpp:412
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Definition PeerImp.cpp:1093
ProtocolVersion protocol_
Definition PeerImp.h:74
bool txReduceRelayEnabled_
Definition PeerImp.h:176
void checkTransaction(HashRouterFlags flags, bool checkSignature, std::shared_ptr< STTx const > const &stx, bool batch)
Definition PeerImp.cpp:2864
void setTimer()
Definition PeerImp.cpp:657
std::shared_ptr< PeerFinder::Slot > const & slot()
Definition PeerImp.h:257
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
Definition PeerImp.cpp:1285
http_request_type request_
Definition PeerImp.h:158
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
Definition PeerImp.cpp:362
beast::WrappedSink sink_
Definition PeerImp.h:54
std::string name() const
Definition PeerImp.cpp:857
bool txReduceRelayEnabled() const override
Definition PeerImp.h:420
std::shared_ptr< PeerFinder::Slot > const slot_
Definition PeerImp.h:156
compression::Compressed Compressed
Definition PeerImp.h:48
boost::beast::http::fields const & headers_
Definition PeerImp.h:160
Compressed compressionEnabled_
Definition PeerImp.h:169
std::chrono::steady_clock clock_type
Definition PeerImp.h:40
boost::system::error_code error_code
Definition PeerImp.h:41
static Resource::Charge computeGetObjectByHashFee(int const requested, int const found)
Compute the per-message resource charge for a TMGetObjectByHash request based on how much work was ac...
Definition PeerImp.cpp:3481
LedgerIndex minLedger_
Definition PeerImp.h:86
std::string prefix_
Definition PeerImp.h:53
void addLedger(uint256 const &hash, std::scoped_lock< std::mutex > const &lockedRecentLock)
Definition PeerImp.cpp:2766
boost::beast::multi_buffer readBuffer_
Definition PeerImp.h:157
id_t const id_
Definition PeerImp.h:51
std::string const & fingerprint() const override
Definition PeerImp.h:504
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
Definition PeerImp.cpp:371
bool ledgerReplayEnabled_
Definition PeerImp.h:178
void sendTxQueue() override
Send aggregated transactions' hashes.
Definition PeerImp.cpp:331
struct xrpl::PeerImp::@337373043150231020277011015352151251117171316327 metrics_
std::unique_ptr< stream_type > streamPtr_
Definition PeerImp.h:58
uint256 closedLedgerHash_
Definition PeerImp.h:88
reduce_relay::Squelch< UptimeClock > squelch_
Definition PeerImp.h:99
stream_type & stream_
Definition PeerImp.h:60
PeerImp(PeerImp const &)=delete
void checkValidation(std::shared_ptr< STValidation > const &val, uint256 const &key, std::shared_ptr< protocol::TMValidation > const &packet)
Definition PeerImp.cpp:3045
void cycleStatus() override
Definition PeerImp.cpp:581
bool gracefulClose_
Definition PeerImp.h:162
std::shared_mutex nameMutex_
Definition PeerImp.h:82
socket_type & socket_
Definition PeerImp.h:59
std::string domain() const
Definition PeerImp.cpp:864
std::atomic< Tracking > tracking_
Definition PeerImp.h:76
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
Definition PeerImp.cpp:565
clock_type::duration uptime() const
Definition PeerImp.h:348
LedgerIndex maxLedger_
Definition PeerImp.h:87
beast::WrappedSink pSink_
Definition PeerImp.h:55
Application & app_
Definition PeerImp.h:50
PublicKey const publicKey_
Definition PeerImp.h:80
void processGetObjectByHash(std::shared_ptr< protocol::TMGetObjectByHash > const &m)
Process a generic-query TMGetObjectByHash message.
Definition PeerImp.cpp:2573
bool cluster() const override
Returns true if this connection is a member of the cluster.
Definition PeerImp.cpp:406
std::queue< std::shared_ptr< Message > > sendQueue_
Definition PeerImp.h:161
void checkPropose(bool isTrusted, std::shared_ptr< protocol::TMProposeSet > const &packet, RCLCxPeerPos peerPos)
Definition PeerImp.cpp:2997
virtual void run()
Definition PeerImp.cpp:198
OverlayImpl & overlay_
Definition PeerImp.h:70
void sendLedgerBase(std::shared_ptr< Ledger const > const &ledger, protocol::TMLedgerData &ledgerData)
Definition PeerImp.cpp:3136
Peer::id_t id() const override
Definition PeerImp.h:315
beast::Journal const pJournal_
Definition PeerImp.h:57
friend class OverlayImpl
Definition PeerImp.h:181
static std::string makePrefix(std::string const &fingerprint)
Definition PeerImp.cpp:689
ChargeWithContext fee_
Definition PeerImp.h:149
void onShutdown(error_code ec)
Definition PeerImp.cpp:757
void handleHaveTransactions(std::shared_ptr< protocol::TMHaveTransactions > const &m)
Handle protocol message with hashes of transactions that have not been relayed by an upstream node do...
Definition PeerImp.cpp:2653
void onMessageUnknown(std::uint16_t type)
Definition PeerImp.cpp:1040
~PeerImp() override
Definition PeerImp.cpp:175
void gracefulClose()
Definition PeerImp.cpp:642
void processLedgerRequest(std::shared_ptr< protocol::TMGetLedger > const &m)
Definition PeerImp.cpp:3289
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Definition PeerImp.h:37
protocol::TMStatusChange lastStatus_
Definition PeerImp.h:147
clock_type::time_point const creationTime_
Definition PeerImp.h:97
LedgerReplayMsgHandler ledgerReplayMsgHandler_
Definition PeerImp.h:179
void fail(std::string const &reason)
Definition PeerImp.cpp:617
std::unique_ptr< LoadEvent > loadEvent_
Definition PeerImp.h:164
void send(std::shared_ptr< Message > const &m) override
Definition PeerImp.cpp:269
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressedSize, bool isCompressed)
Definition PeerImp.cpp:1046
void doFetchPack(std::shared_ptr< protocol::TMGetObjectByHash > const &packet)
Definition PeerImp.cpp:2779
bool supportsFeature(ProtocolFeature f) const override
Definition PeerImp.cpp:534
void doProtocolStart()
Definition PeerImp.cpp:874
int largeSendq_
Definition PeerImp.h:163
std::shared_ptr< Ledger const > getLedger(std::shared_ptr< protocol::TMGetLedger > const &m)
Definition PeerImp.cpp:3173
bool const inbound_
Definition PeerImp.h:71
boost::asio::strand< boost::asio::executor > strand_
Definition PeerImp.h:61
waitable_timer timer_
Definition PeerImp.h:62
clock_type::time_point lastPingTime_
Definition PeerImp.h:96
boost::circular_buffer< uint256 > recentLedgers_
Definition PeerImp.h:91
int getScore(bool haveItem) const override
Definition PeerImp.cpp:3507
std::string name_
Definition PeerImp.h:81
bool hasTxSet(uint256 const &hash) const override
Definition PeerImp.cpp:574
boost::circular_buffer< uint256 > recentTxSets_
Definition PeerImp.h:92
clock_type::time_point trackingTime_
Definition PeerImp.h:77
beast::Journal const journal_
Definition PeerImp.h:56
std::string fingerprint_
Definition PeerImp.h:52
bool isHighLatency() const override
Definition PeerImp.cpp:3549
void cancelTimer() noexcept
Definition PeerImp.cpp:674
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
Definition PeerImp.cpp:591
std::shared_ptr< SHAMap const > getTxSet(std::shared_ptr< protocol::TMGetLedger > const &m) const
Definition PeerImp.cpp:3257
uint256 previousLedgerHash_
Definition PeerImp.h:89
Resource::Consumer usage_
Definition PeerImp.h:148
std::optional< std::uint32_t > lastPingSeq_
Definition PeerImp.h:95
void onWriteMessage(error_code ec, std::size_t bytesTransferred)
Definition PeerImp.cpp:986
bool detaching_
Definition PeerImp.h:78
bool crawl() const
Returns true if this connection will publicly share its IP address.
Definition PeerImp.cpp:397
beast::IP::Endpoint const remoteAddress_
Definition PeerImp.h:66
void stop() override
Definition PeerImp.cpp:256
void onValidatorListMessage(std::string const &messageType, std::string const &manifest, std::uint32_t version, std::vector< ValidatorBlobInfo > const &blobs)
Definition PeerImp.cpp:2063
void doTransactions(std::shared_ptr< protocol::TMGetObjectByHash > const &packet)
Process peer's request to send missing transactions.
Definition PeerImp.cpp:2812
void doAccept()
Definition PeerImp.cpp:783
void onReadMessage(error_code ec, std::size_t bytesTransferred)
Definition PeerImp.cpp:912
hash_map< PublicKey, std::size_t > publisherListSequences_
Definition PeerImp.h:167
json::Value json() override
Definition PeerImp.cpp:420
std::mutex recentLock_
Definition PeerImp.h:146
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
Definition PeerImp.cpp:551
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
Definition PeerImp.h:47
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Definition PeerImp.cpp:1086
std::uint32_t id_t
Uniquely identifies a peer.
A public key.
Definition PublicKey.h:42
Slice slice() const noexcept
Definition PublicKey.h:103
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
ConsensusProposal< NodeID, uint256, uint256 > Proposal
bool checkSign() const
Verify the signing hash of the proposal.
A consumption charge.
Definition Charge.h:9
An endpoint that consumes resources.
Definition Consumer.h:15
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:77
bool getNodeFat(SHAMapNodeID const &wanted, std::vector< std::pair< SHAMapNodeID, Blob > > &data, bool fatLeaves, std::uint32_t depth) const
SHAMapHash getHash() const
Definition SHAMap.cpp:836
void const * getDataPtr() const
Definition Serializer.h:197
int getLength() const
Definition Serializer.h:207
std::size_t size() const noexcept
Definition Serializer.h:50
void const * data() const noexcept
Definition Serializer.h:56
An immutable linear range of bytes.
Definition Slice.h:26
static Category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static time_point now()
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
static std::vector< ValidatorBlobInfo > parseBlobs(std::uint32_t version, json::Value const &body)
Pull the blob/signature/manifest information out of the appropriate Json body fields depending on the...
T duration_cast(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T find(T... args)
T for_each(T... args)
T get(T... args)
T lock(T... args)
T make_shared(T... args)
T max(T... args)
T min(T... args)
unsigned int UInt
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
Charge const kFeeRequestNoReply
Charge const kFeeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const kFeeTrivialPeer
Charge const kFeeInvalidData
Charge const kFeeHeavyBurdenPeer
Charge const kFeeModerateBurdenPeer
Charge const kFeeInvalidSignature
Charge const kFeeUselessData
static constexpr auto kCostBandMedium
static constexpr auto kCostBandLarge
constexpr std::size_t kReadBufferBytes
Size of buffer used to read from the socket.
static constexpr auto kSoftMaxReplyNodes
The soft cap on the number of ledger entries in a single reply.
static constexpr auto kBandMediumMax
static constexpr auto kSendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
static constexpr auto kCostPerLookupHit
Cost of one cache-hit lookup.
static constexpr auto kFreeObjectsPerRequest
TMGetObjectByHash differential pricing.
static constexpr auto kTargetSendQueue
How many messages we consider reasonable sustained on a send queue.
static constexpr auto kCostPerLookupMiss
Cost of one node-store miss, in units of kCostPerLookupHit.
static constexpr auto kCostBandSmall
Size-band surcharges.
static constexpr auto kHardMaxReplyNodes
The hard cap on the number of ledger entries in a single reply.
static constexpr auto kMaxQueryDepth
The maximum number of levels to search.
static constexpr std::uint32_t kDivergedLedgerLimit
How many ledgers off a server has to be before we consider it diverged.
static constexpr auto kBandSmallMax
Cutoffs that decide which size band a request falls into.
static constexpr auto kDropSendQueue
How many messages on a send queue before we refuse queries.
static constexpr std::uint32_t kConvergedLedgerLimit
How many ledgers off a server can be and we will still consider it converged.
static constexpr auto kSendQueueLogFreq
How often to log send queue size.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:162
static constexpr std::size_t kMaxTxQueueSize
static constexpr auto kIdled
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::string base64Decode(std::string_view data)
constexpr FlagValue tfInnerBatchTxn
Definition TxFlags.h:43
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ Valid
Signature and local checks are good / passed.
Definition apply.h:25
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:204
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< AccountID > parseBase58(std::string const &s)
Parse AccountID from checked, base58 string.
std::uint32_t LedgerIndex
A ledger index.
Definition Protocol.h:259
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:94
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:10
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules)
Checks transaction signature and local checks.
Definition apply.cpp:37
@ UnsupportedVersion
List version is not supported.
@ Expired
List is expired, but has the largest non-pending sequence seen so far.
@ SameSequence
Same sequence as current list.
@ KnownSequence
Future sequence already seen.
@ Pending
List will be valid in the future.
@ Accepted
List is valid.
@ Invalid
Invalid format or signature.
@ Untrusted
List signed by untrusted publisher key.
@ Stale
Trusted publisher key, but seq is too old.
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safeCast(Src s) noexcept
Definition safe_cast.h:21
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:93
std::enable_if_t< std::is_integral_v< Integral >, Integral > randInt()
static constexpr char kFeatureLedgerReplay[]
Definition Handshake.h:124
Number root(Number f, unsigned d)
Definition Number.cpp:1201
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
Definition PeerImp.cpp:3111
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
static constexpr char kFeatureTxrr[]
Definition Handshake.h:122
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
@ JtPack
Definition Job.h:24
@ JtLedgerReq
Definition Job.h:40
@ JtValidationUt
Definition Job.h:35
@ JtTransaction
Definition Job.h:43
@ JtProposalUt
Definition Job.h:41
@ JtReplayReq
Definition Job.h:39
@ JtRequestedTxn
Definition Job.h:45
@ JtTxnData
Definition Job.h:50
@ JtValidationT
Definition Job.h:52
@ JtMissingTxn
Definition Job.h:44
@ JtManifest
Definition Job.h:36
@ JtProposalT
Definition Job.h:55
@ JtPeer
Definition Job.h:61
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address publicIp, beast::IP::Address remoteIp, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
constexpr ProtocolVersion makeProtocol(std::uint16_t major, std::uint16_t minor)
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
Definition PublicKey.h:242
HashRouterFlags
Definition HashRouter.h:14
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
bool peerFeatureEnabled(Headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
Definition Handshake.h:171
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
std::pair< std::uint16_t, std::uint16_t > ProtocolVersion
Represents a particular version of the peer-to-peer protocol.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:12
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
Definition apply.cpp:112
static bool stringIsUInt256Sized(std::string const &pBuffStr)
Definition PeerImp.cpp:192
static constexpr char kFeatureCompr[]
Definition Handshake.h:118
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:810
BaseUInt< 256 > uint256
Definition base_uint.h:562
static constexpr char kFeatureVprr[]
Definition Handshake.h:120
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
Definition PeerImp.cpp:3087
std::enable_if_t< std::is_same_v< T, char >||std::is_same_v< T, unsigned char >, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
constexpr bool any(HashRouterFlags flags)
Definition HashRouter.h:63
T nth_element(T... args)
T push_back(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Information about the notional ledger backing the view.
Describes a single consumer.
Definition Gossip.h:16
beast::IP::Endpoint address
Definition Gossip.h:20
Data format for exchanging consumption information across peers.
Definition Gossip.h:11
std::vector< Item > items
Definition Gossip.h:23
T tie(T... args)
T to_string(T... args)
T what(T... args)