xrpld
Loading...
Searching...
No Matches
InboundLedger.cpp
1#include <xrpld/app/ledger/InboundLedger.h>
2
3#include <xrpld/app/ledger/AccountStateSF.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerMaster.h>
6#include <xrpld/app/ledger/TransactionStateSF.h>
7#include <xrpld/app/ledger/detail/TimeoutCounter.h>
8#include <xrpld/app/main/Application.h>
9#include <xrpld/overlay/Message.h>
10#include <xrpld/overlay/Overlay.h>
11#include <xrpld/overlay/PeerSet.h>
12
13#include <xrpl/basics/Blob.h>
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/Slice.h>
16#include <xrpl/basics/base_uint.h>
17#include <xrpl/beast/utility/instrumentation.h>
18#include <xrpl/core/Job.h>
19#include <xrpl/core/JobQueue.h>
20#include <xrpl/json/json_value.h>
21#include <xrpl/nodestore/Database.h>
22#include <xrpl/nodestore/NodeObject.h>
23#include <xrpl/protocol/HashPrefix.h>
24#include <xrpl/protocol/Indexes.h>
25#include <xrpl/protocol/LedgerHeader.h>
26#include <xrpl/protocol/Rules.h>
27#include <xrpl/protocol/Serializer.h>
28#include <xrpl/protocol/SystemParameters.h>
29#include <xrpl/protocol/jss.h>
30#include <xrpl/resource/Fees.h>
31#include <xrpl/shamap/SHAMapNodeID.h>
32#include <xrpl/shamap/SHAMapSyncFilter.h>
33
34#include <boost/iterator/function_output_iterator.hpp>
35
36#include <xrpl.pb.h>
37
38#include <algorithm>
39#include <chrono>
40#include <cstddef>
41#include <cstdint>
42#include <exception>
43#include <memory>
44#include <mutex>
45#include <random>
46#include <sstream>
47#include <stdexcept>
48#include <string>
49#include <tuple>
50#include <unordered_map>
51#include <utility>
52#include <vector>
53
54namespace xrpl {
55
56using namespace std::chrono_literals;
57
58static constexpr auto kPeerCountStart = 5; // Number of peers to start with
59static constexpr auto kPeerCountAdd = 3; // Number of peers to add on a timeout
60static constexpr auto kLedgerTimeoutRetriesMax = 6; // how many timeouts before we give up
61static constexpr auto kLedgerBecomeAggressiveThreshold =
62 4; // how many timeouts before we get aggressive
63static constexpr auto kMissingNodesFind = 256; // Number of nodes to find initially
64static constexpr auto kReqNodesReply = 128; // Number of nodes to request for a reply
65static constexpr auto kReqNodes = 12; // Number of nodes to request blindly
66
67// millisecond for each ledger timeout
68constexpr auto kLedgerAcquireTimeout = 3000ms;
69
71 Application& app,
72 uint256 const& hash,
73 std::uint32_t seq,
74 Reason reason,
75 clock_type& clock,
78 app,
79 hash,
81 {.jobType = JtLedgerData, .jobName = "InboundLedger", .jobLimit = 5},
82 app.getJournal("InboundLedger"))
83 , clock_(clock)
84 , seq_(seq)
85 , reason_(reason)
86 , peerSet_(std::move(peerSet))
87{
88 JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
89 touch();
90}
91
92void
94{
96 collectionLock.unlock();
97
98 tryDB(app_.getNodeFamily().db());
99 if (failed_)
100 return;
101
102 if (!complete_)
103 {
104 addPeers();
105 queueJob(sl);
106 return;
107 }
108
109 JLOG(journal_.debug()) << "Acquiring ledger we already have in "
110 << " local store. " << hash_;
111 XRPL_ASSERT(
112 ledger_->header().seq < kXrpLedgerEarliestFees || ledger_->read(keylet::feeSettings()),
113 "xrpl::InboundLedger::init : valid ledger fees");
114 ledger_->setImmutable();
115
117 return;
118
119 app_.getLedgerMaster().storeLedger(ledger_);
120
121 // Check if this could be a newer fully-validated ledger
123 app_.getLedgerMaster().checkAccept(ledger_);
124}
125
128{
129 auto const& peerIds = peerSet_->getPeerIds();
131 peerIds, [this](auto id) { return (app_.getOverlay().findPeerByShortID(id) != nullptr); });
132}
133
134void
136{
137 ScopedLockType const sl(mtx_);
138
139 // If we didn't know the sequence number, but now do, save it
140 if ((seq != 0) && (seq_ == 0))
141 seq_ = seq;
142
143 // Prevent this from being swept
144 touch();
145}
146
147bool
149{
150 ScopedLockType const sl(mtx_);
151 if (!isDone())
152 {
153 if (ledger_)
154 {
155 tryDB(ledger_->stateMap().family().db());
156 }
157 else
158 {
159 tryDB(app_.getNodeFamily().db());
160 }
161 if (failed_ || complete_)
162 {
163 done();
164 return true;
165 }
166 }
167 return false;
168}
169
171{
172 // Save any received AS data not processed. It could be useful
173 // for populating a different ledger
174 for (auto& entry : receivedData_)
175 {
176 if (entry.second->type() == protocol::liAS_NODE)
177 app_.getInboundLedgers().gotStaleData(entry.second);
178 }
179 if (!isDone())
180 {
181 JLOG(journal_.debug()) << "Acquire " << hash_ << " abort "
182 << ((timeouts_ == 0) ? std::string()
183 : (std::string("timeouts:") +
185 << stats_.get();
186 }
187}
188
190neededHashes(uint256 const& root, SHAMap& map, int max, SHAMapSyncFilter const* filter)
191{
193
194 if (!root.isZero())
195 {
196 if (map.getHash().isZero())
197 {
198 ret.push_back(root);
199 }
200 else
201 {
202 auto mn = map.getMissingNodes(max, filter);
203 ret.reserve(mn.size());
204 for (auto const& n : mn)
205 ret.push_back(n.second);
206 }
207 }
208
209 return ret;
210}
211
214{
215 return neededHashes(ledger_->header().txHash, ledger_->txMap(), max, filter);
216}
217
220{
221 return neededHashes(ledger_->header().accountHash, ledger_->stateMap(), max, filter);
222}
223
224// See how much of the ledger data is stored locally
225// Data found in a fetch pack will be stored
226void
228{
229 if (!haveHeader_)
230 {
231 auto makeLedger = [&, this](Blob const& data) {
232 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
233 Rules const rules{app_.config().features};
235 deserializePrefixedHeader(makeSlice(data)), rules, app_.getNodeFamily());
236 if (ledger_->header().hash != hash_ || (seq_ != 0 && seq_ != ledger_->header().seq))
237 {
238 // We know for a fact the ledger can never be acquired
239 JLOG(journal_.warn())
240 << "hash " << hash_ << " seq " << std::to_string(seq_) << " cannot be a ledger";
241 ledger_.reset();
242 failed_ = true;
243 }
244 };
245
246 // Try to fetch the ledger header from the DB
247 if (auto nodeObject = srcDB.fetchNodeObject(hash_, seq_))
248 {
249 JLOG(journal_.trace()) << "Ledger header found in local store";
250
251 makeLedger(nodeObject->getData());
252 if (failed_)
253 return;
254
255 // Store the ledger header if the source and destination differ
256 auto& dstDB{ledger_->stateMap().family().db()};
257 if (std::addressof(dstDB) != std::addressof(srcDB))
258 {
259 Blob blob{nodeObject->getData()};
260 dstDB.store(NodeObjectType::Ledger, std::move(blob), hash_, ledger_->header().seq);
261 }
262 }
263 else
264 {
265 // Try to fetch the ledger header from a fetch pack
266 auto data = app_.getLedgerMaster().getFetchPack(hash_);
267 if (!data)
268 return;
269
270 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
271
272 makeLedger(*data);
273 if (failed_)
274 return;
275
276 // Store the ledger header in the ledger's database
277 ledger_->stateMap().family().db().store(
278 NodeObjectType::Ledger, std::move(*data), hash_, ledger_->header().seq);
279 }
280
281 if (seq_ == 0)
282 seq_ = ledger_->header().seq;
283 ledger_->stateMap().setLedgerSeq(seq_);
284 ledger_->txMap().setLedgerSeq(seq_);
285 haveHeader_ = true;
286 }
287
289 {
290 if (ledger_->header().txHash.isZero())
291 {
292 JLOG(journal_.trace()) << "No TXNs to fetch";
293 haveTransactions_ = true;
294 }
295 else
296 {
297 TransactionStateSF filter(ledger_->txMap().family().db(), app_.getLedgerMaster());
298 if (ledger_->txMap().fetchRoot(SHAMapHash{ledger_->header().txHash}, &filter))
299 {
300 if (neededTxHashes(1, &filter).empty())
301 {
302 JLOG(journal_.trace()) << "Had full txn map locally";
303 haveTransactions_ = true;
304 }
305 }
306 }
307 }
308
309 if (!haveState_)
310 {
311 if (ledger_->header().accountHash.isZero())
312 {
313 JLOG(journal_.fatal()) << "We are acquiring a ledger with a zero account hash";
314 failed_ = true;
315 return;
316 }
317 AccountStateSF filter(ledger_->stateMap().family().db(), app_.getLedgerMaster());
318 if (ledger_->stateMap().fetchRoot(SHAMapHash{ledger_->header().accountHash}, &filter))
319 {
320 if (neededStateHashes(1, &filter).empty())
321 {
322 JLOG(journal_.trace()) << "Had full AS map locally";
323 haveState_ = true;
324 }
325 }
326 }
327
329 {
330 JLOG(journal_.debug()) << "Had everything locally";
331 complete_ = true;
332 XRPL_ASSERT(
333 ledger_->header().seq < kXrpLedgerEarliestFees || ledger_->read(keylet::feeSettings()),
334 "xrpl::InboundLedger::tryDB : valid ledger fees");
335 ledger_->setImmutable();
336 }
337}
338
341void
343{
344 recentNodes_.clear();
345
346 if (isDone())
347 {
348 JLOG(journal_.info()) << "Already done " << hash_;
349 return;
350 }
351
353 {
354 if (seq_ != 0)
355 {
356 JLOG(journal_.warn()) << timeouts_ << " timeouts for ledger " << seq_;
357 }
358 else
359 {
360 JLOG(journal_.warn()) << timeouts_ << " timeouts for ledger " << hash_;
361 }
362 failed_ = true;
363 done();
364 return;
365 }
366
367 if (!wasProgress)
368 {
369 checkLocal();
370
371 byHash_ = true;
372
373 std::size_t const pc = getPeerCount();
374 JLOG(journal_.debug()) << "No progress(" << pc << ") for ledger " << hash_;
375
376 // addPeers triggers if the reason is not HISTORY
377 // So if the reason IS HISTORY, need to trigger after we add
378 // otherwise, we need to trigger before we add
379 // so each peer gets triggered once
382 addPeers();
385 }
386}
387
389void
391{
392 peerSet_->addPeers(
394 [this](auto peer) { return peer->hasLedger(hash_, seq_); },
395 [this](auto peer) {
396 // For historical nodes, do not trigger too soon
397 // since a fetch pack is probably coming
400 });
401}
402
408
409void
411{
412 if (signaled_)
413 return;
414
415 signaled_ = true;
416 touch();
417
418 JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
419 << ((timeouts_ == 0)
420 ? std::string()
421 : (std::string("timeouts:") + std::to_string(timeouts_) + " "))
422 << stats_.get();
423
424 XRPL_ASSERT(complete_ || failed_, "xrpl::InboundLedger::done : complete or failed");
425
426 if (complete_ && !failed_ && ledger_)
427 {
428 XRPL_ASSERT(
429 ledger_->header().seq < kXrpLedgerEarliestFees || ledger_->read(keylet::feeSettings()),
430 "xrpl::InboundLedger::done : valid ledger fees");
431 ledger_->setImmutable();
432 switch (reason_)
433 {
434 case Reason::HISTORY:
435 app_.getInboundLedgers().onLedgerFetched();
436 break;
437 default:
438 app_.getLedgerMaster().storeLedger(ledger_);
439 break;
440 }
441 }
442
443 // We hold the PeerSet lock, so must dispatch
444 app_.getJobQueue().addJob(JtLedgerData, "AcqDone", [self = shared_from_this()]() {
445 if (self->complete_ && !self->failed_)
446 {
447 self->app_.getLedgerMaster().checkAccept(self->getLedger());
448 self->app_.getLedgerMaster().tryAdvance();
449 }
450 else
451 {
452 self->app_.getInboundLedgers().logFailure(self->hash_, self->seq_);
453 }
454 });
455}
456
459void
461{
463
464 if (isDone())
465 {
466 JLOG(journal_.debug()) << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
467 << (failed_ ? " failed" : "");
468 return;
469 }
470
471 if (auto stream = journal_.debug())
472 {
474 ss << "Trigger acquiring ledger " << hash_;
475 if (peer)
476 ss << " from " << peer;
477
478 if (complete_ || failed_)
479 {
480 ss << " complete=" << complete_ << " failed=" << failed_;
481 }
482 else
483 {
484 ss << " header=" << haveHeader_ << " tx=" << haveTransactions_ << " as=" << haveState_;
485 }
486 stream << ss.str();
487 }
488
489 if (!haveHeader_)
490 {
491 tryDB(app_.getNodeFamily().db());
492 if (failed_)
493 {
494 JLOG(journal_.warn()) << " failed local for " << hash_;
495 return;
496 }
497 }
498
499 protocol::TMGetLedger tmGL;
500 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
501
502 if (timeouts_ != 0)
503 {
504 // Be more aggressive if we've timed out at least once
505 tmGL.set_querytype(protocol::qtINDIRECT);
506
508 {
509 auto need = getNeededHashes();
510
511 if (!need.empty())
512 {
513 protocol::TMGetObjectByHash tmBH;
514 bool typeSet = false;
515 tmBH.set_query(true);
516 tmBH.set_ledgerhash(hash_.begin(), hash_.size());
517 for (auto const& p : need)
518 {
519 JLOG(journal_.debug()) << "Want: " << p.second;
520
521 if (!typeSet)
522 {
523 tmBH.set_type(p.first);
524 typeSet = true;
525 }
526
527 if (p.first == tmBH.type())
528 {
529 protocol::TMIndexedObject* io = tmBH.add_objects();
530 io->set_hash(p.second.begin(), p.second.size());
531 if (seq_ != 0)
532 io->set_ledgerseq(seq_);
533 }
534 }
535
536 auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
537 auto const& peerIds = peerSet_->getPeerIds();
538 std::ranges::for_each(peerIds, [this, &packet](auto id) {
539 if (auto p = app_.getOverlay().findPeerByShortID(id))
540 {
541 byHash_ = false;
542 p->send(packet);
543 }
544 });
545 }
546 else
547 {
548 JLOG(journal_.info()) << "getNeededHashes says acquire is complete";
549 haveHeader_ = true;
550 haveTransactions_ = true;
551 haveState_ = true;
552 complete_ = true;
553 }
554 }
555 }
556
557 // We can't do much without the header data because we don't know the
558 // state or transaction root hashes.
559 if (!haveHeader_ && !failed_)
560 {
561 tmGL.set_itype(protocol::liBASE);
562 if (seq_ != 0)
563 tmGL.set_ledgerseq(seq_);
564 JLOG(journal_.trace()) << "Sending header request to "
565 << (peer ? "selected peer" : "all peers");
566 peerSet_->sendRequest(tmGL, peer);
567 return;
568 }
569
570 if (ledger_)
571 tmGL.set_ledgerseq(ledger_->header().seq);
572
573 if (reason != TriggerReason::Reply)
574 {
575 // If we're querying blind, don't query deep
576 tmGL.set_querydepth(0);
577 }
578 else if (peer && peer->isHighLatency())
579 {
580 // If the peer has high latency, query extra deep
581 tmGL.set_querydepth(2);
582 }
583 else
584 {
585 tmGL.set_querydepth(1);
586 }
587
588 // Get the state data first because it's the most likely to be useful
589 // if we wind up abandoning this fetch.
590 if (haveHeader_ && !haveState_ && !failed_)
591 {
592 XRPL_ASSERT(
593 ledger_,
594 "xrpl::InboundLedger::trigger : non-null ledger to read state "
595 "from");
596
597 if (!ledger_->stateMap().isValid())
598 {
599 failed_ = true;
600 }
601 else if (ledger_->stateMap().getHash().isZero())
602 {
603 // we need the root node
604 tmGL.set_itype(protocol::liAS_NODE);
605 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
606 JLOG(journal_.trace())
607 << "Sending AS root request to " << (peer ? "selected peer" : "all peers");
608 peerSet_->sendRequest(tmGL, peer);
609 return;
610 }
611 else
612 {
613 AccountStateSF filter(ledger_->stateMap().family().db(), app_.getLedgerMaster());
614
615 // Release the lock while we process the large state map
616 sl.unlock();
617 auto nodes = ledger_->stateMap().getMissingNodes(kMissingNodesFind, &filter);
618 sl.lock();
619
620 // Make sure nothing happened while we released the lock
621 if (!failed_ && !complete_ && !haveState_)
622 {
623 if (nodes.empty())
624 {
625 if (!ledger_->stateMap().isValid())
626 {
627 failed_ = true;
628 }
629 else
630 {
631 haveState_ = true;
632
634 complete_ = true;
635 }
636 }
637 else
638 {
639 filterNodes(nodes, reason);
640
641 if (!nodes.empty())
642 {
643 tmGL.set_itype(protocol::liAS_NODE);
644 for (auto const& id : nodes)
645 {
646 *(tmGL.add_nodeids()) = id.first.getRawString();
647 }
648
649 JLOG(journal_.trace()) << "Sending AS node request (" << nodes.size()
650 << ") to " << (peer ? "selected peer" : "all peers");
651 peerSet_->sendRequest(tmGL, peer);
652 return;
653 }
654
655 JLOG(journal_.trace()) << "All AS nodes filtered";
656 }
657 }
658 }
659 }
660
662 {
663 XRPL_ASSERT(
664 ledger_,
665 "xrpl::InboundLedger::trigger : non-null ledger to read "
666 "transactions from");
667
668 if (!ledger_->txMap().isValid())
669 {
670 failed_ = true;
671 }
672 else if (ledger_->txMap().getHash().isZero())
673 {
674 // we need the root node
675 tmGL.set_itype(protocol::liTX_NODE);
676 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
677 JLOG(journal_.trace())
678 << "Sending TX root request to " << (peer ? "selected peer" : "all peers");
679 peerSet_->sendRequest(tmGL, peer);
680 return;
681 }
682 else
683 {
684 TransactionStateSF filter(ledger_->txMap().family().db(), app_.getLedgerMaster());
685
686 auto nodes = ledger_->txMap().getMissingNodes(kMissingNodesFind, &filter);
687
688 if (nodes.empty())
689 {
690 if (!ledger_->txMap().isValid())
691 {
692 failed_ = true;
693 }
694 else
695 {
696 haveTransactions_ = true;
697
698 if (haveState_)
699 complete_ = true;
700 }
701 }
702 else
703 {
704 filterNodes(nodes, reason);
705
706 if (!nodes.empty())
707 {
708 tmGL.set_itype(protocol::liTX_NODE);
709 for (auto const& n : nodes)
710 {
711 *(tmGL.add_nodeids()) = n.first.getRawString();
712 }
713 JLOG(journal_.trace()) << "Sending TX node request (" << nodes.size() << ") to "
714 << (peer ? "selected peer" : "all peers");
715 peerSet_->sendRequest(tmGL, peer);
716 return;
717 }
718
719 JLOG(journal_.trace()) << "All TX nodes filtered";
720 }
721 }
722 }
723
724 if (complete_ || failed_)
725 {
726 JLOG(journal_.debug()) << "Done:" << (complete_ ? " complete" : "")
727 << (failed_ ? " failed " : " ") << ledger_->header().seq;
728 sl.unlock();
729 done();
730 }
731}
732
733void
736 TriggerReason reason)
737{
738 // Sort nodes so that the ones we haven't recently
739 // requested come before the ones we have.
741 nodes, [this](auto const& item) { return recentNodes_.count(item.second) == 0; });
742
743 // If everything is a duplicate we don't want to send
744 // any query at all except on a timeout where we need
745 // to query everyone:
746 if (dup.begin() == nodes.begin())
747 {
748 JLOG(journal_.trace()) << "filterNodes: all duplicates";
749
750 if (reason != TriggerReason::Timeout)
751 {
752 nodes.clear();
753 return;
754 }
755 }
756 else
757 {
758 JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
759
760 nodes.erase(dup.begin(), dup.end());
761 }
762
763 std::size_t const limit = (reason == TriggerReason::Reply) ? kReqNodesReply : kReqNodes;
764
765 if (nodes.size() > limit)
766 nodes.resize(limit);
767
768 for (auto const& n : nodes)
769 recentNodes_.insert(n.second);
770}
771
775// data must not have hash prefix
776bool
778{
779 // Return value: true=normal, false=bad data
780 JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
781
782 if (complete_ || failed_ || haveHeader_)
783 return true;
784
785 auto* f = &app_.getNodeFamily();
786 Rules const rules{app_.config().features};
788 if (ledger_->header().hash != hash_ || (seq_ != 0 && seq_ != ledger_->header().seq))
789 {
790 JLOG(journal_.warn()) << "Acquire hash mismatch: " << ledger_->header().hash
791 << "!=" << hash_;
792 ledger_.reset();
793 return false;
794 }
795 if (seq_ == 0)
796 seq_ = ledger_->header().seq;
797 ledger_->stateMap().setLedgerSeq(seq_);
798 ledger_->txMap().setLedgerSeq(seq_);
799 haveHeader_ = true;
800
801 Serializer s(data.size() + 4);
803 s.addRaw(data.data(), data.size());
804 f->db().store(NodeObjectType::Ledger, std::move(s.modData()), hash_, seq_);
805
806 if (ledger_->header().txHash.isZero())
807 haveTransactions_ = true;
808
809 if (ledger_->header().accountHash.isZero())
810 haveState_ = true;
811
812 ledger_->txMap().setSynching();
813 ledger_->stateMap().setSynching();
814
815 return true;
816}
817
821void
822InboundLedger::receiveNode(protocol::TMLedgerData const& packet, SHAMapAddNode& san)
823{
824 if (!haveHeader_)
825 {
826 JLOG(journal_.warn()) << "Missing ledger header";
827 san.incInvalid();
828 return;
829 }
830 if (packet.type() == protocol::liTX_NODE)
831 {
833 {
834 san.incDuplicate();
835 return;
836 }
837 }
838 else if (haveState_ || failed_)
839 {
840 san.incDuplicate();
841 return;
842 }
843
844 auto [map, rootHash, filter] =
846 if (packet.type() == protocol::liTX_NODE)
847 {
848 return {
849 ledger_->txMap(),
850 SHAMapHash{ledger_->header().txHash},
852 ledger_->txMap().family().db(), app_.getLedgerMaster())};
853 }
854 return {
855 ledger_->stateMap(),
856 SHAMapHash{ledger_->header().accountHash},
858 ledger_->stateMap().family().db(), app_.getLedgerMaster())};
859 }();
860
861 try
862 {
863 auto const f = filter.get();
864
865 for (auto const& node : packet.nodes())
866 {
867 auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
868
869 if (!nodeID)
870 throw std::runtime_error("data does not properly deserialize");
871
872 if (nodeID->isRoot())
873 {
874 san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
875 }
876 else
877 {
878 san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
879 }
880
881 if (!san.isGood())
882 {
883 JLOG(journal_.warn()) << "Received bad node data";
884 return;
885 }
886 }
887 }
888 catch (std::exception const& e)
889 {
890 JLOG(journal_.error()) << "Received bad node data: " << e.what();
891 san.incInvalid();
892 return;
893 }
894
895 if (!map.isSynching())
896 {
897 if (packet.type() == protocol::liTX_NODE)
898 {
899 haveTransactions_ = true;
900 }
901 else
902 {
903 haveState_ = true;
904 }
905
907 {
908 complete_ = true;
909 done();
910 }
911 }
912}
913
917bool
919{
920 if (failed_ || haveState_)
921 {
922 san.incDuplicate();
923 return true;
924 }
925
926 if (!haveHeader_)
927 {
928 // LCOV_EXCL_START
929 UNREACHABLE("xrpl::InboundLedger::takeAsRootNode : no ledger header");
930 return false;
931 // LCOV_EXCL_STOP
932 }
933
934 AccountStateSF filter(ledger_->stateMap().family().db(), app_.getLedgerMaster());
935 san +=
936 ledger_->stateMap().addRootNode(SHAMapHash{ledger_->header().accountHash}, data, &filter);
937 return san.isGood();
938}
939
943bool
945{
947 {
948 san.incDuplicate();
949 return true;
950 }
951
952 if (!haveHeader_)
953 {
954 // LCOV_EXCL_START
955 UNREACHABLE("xrpl::InboundLedger::takeTxRootNode : no ledger header");
956 return false;
957 // LCOV_EXCL_STOP
958 }
959
960 TransactionStateSF filter(ledger_->txMap().family().db(), app_.getLedgerMaster());
961 san += ledger_->txMap().addRootNode(SHAMapHash{ledger_->header().txHash}, data, &filter);
962 return san.isGood();
963}
964
967{
969
970 if (!haveHeader_)
971 {
972 ret.emplace_back(protocol::TMGetObjectByHash::otLEDGER, hash_);
973 return ret;
974 }
975
976 if (!haveState_)
977 {
978 AccountStateSF filter(ledger_->stateMap().family().db(), app_.getLedgerMaster());
979 for (auto const& h : neededStateHashes(4, &filter))
980 {
981 ret.emplace_back(protocol::TMGetObjectByHash::otSTATE_NODE, h);
982 }
983 }
984
986 {
987 TransactionStateSF filter(ledger_->txMap().family().db(), app_.getLedgerMaster());
988 for (auto const& h : neededTxHashes(4, &filter))
989 {
990 ret.emplace_back(protocol::TMGetObjectByHash::otTRANSACTION_NODE, h);
991 }
992 }
993
994 return ret;
995}
996
1000bool
1004{
1006
1007 if (isDone())
1008 return false;
1009
1010 receivedData_.emplace_back(peer, data);
1011
1013 return false;
1014
1015 receiveDispatched_ = true;
1016 return true;
1017}
1018
1022// VFALCO NOTE, it is not necessary to pass the entire Peer,
1023// we can get away with just a Resource::Consumer endpoint.
1024//
1025// TODO Change peer to Consumer
1026//
1027int
1028InboundLedger::processData(std::shared_ptr<Peer> peer, protocol::TMLedgerData const& packet)
1029{
1030 if (packet.type() == protocol::liBASE)
1031 {
1032 if (packet.nodes().empty())
1033 {
1034 JLOG(journal_.warn()) << peer->id() << ": empty header data";
1035 peer->charge(Resource::kFeeMalformedRequest, "ledger_data empty header");
1036 return -1;
1037 }
1038
1039 SHAMapAddNode san;
1040
1041 ScopedLockType const sl(mtx_);
1042
1043 try
1044 {
1045 if (!haveHeader_)
1046 {
1047 if (!takeHeader(packet.nodes(0).nodedata()))
1048 {
1049 JLOG(journal_.warn()) << "Got invalid header data";
1050 peer->charge(Resource::kFeeMalformedRequest, "ledger_data invalid header");
1051 return -1;
1052 }
1053
1054 san.incUseful();
1055 }
1056
1057 if (!haveState_ && (packet.nodes().size() > 1) &&
1058 !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
1059 {
1060 JLOG(journal_.warn()) << "Included AS root invalid";
1061 }
1062
1063 if (!haveTransactions_ && (packet.nodes().size() > 2) &&
1064 !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
1065 {
1066 JLOG(journal_.warn()) << "Included TX root invalid";
1067 }
1068 }
1069 catch (std::exception const& ex)
1070 {
1071 JLOG(journal_.warn()) << "Included AS/TX root invalid: " << ex.what();
1072 using namespace std::string_literals;
1073 peer->charge(Resource::kFeeInvalidData, "ledger_data "s + ex.what());
1074 return -1;
1075 }
1076
1077 if (san.isUseful())
1078 progress_ = true;
1079
1080 stats_ += san;
1081 return san.getGood();
1082 }
1083
1084 if ((packet.type() == protocol::liTX_NODE) || (packet.type() == protocol::liAS_NODE))
1085 {
1086 if (packet.nodes().empty())
1087 {
1088 JLOG(journal_.info()) << peer->id() << ": response with no nodes";
1089 peer->charge(Resource::kFeeMalformedRequest, "ledger_data no nodes");
1090 return -1;
1091 }
1092
1093 ScopedLockType const sl(mtx_);
1094
1095 // Verify node IDs and data are complete
1096 for (auto const& node : packet.nodes())
1097 {
1098 if (!node.has_nodeid() || !node.has_nodedata())
1099 {
1100 JLOG(journal_.warn()) << "Got bad node";
1101 peer->charge(Resource::kFeeMalformedRequest, "ledger_data bad node");
1102 return -1;
1103 }
1104 }
1105
1106 SHAMapAddNode san;
1107 receiveNode(packet, san);
1108
1109 JLOG(journal_.debug()) << "Ledger "
1110 << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
1111 << " node stats: " << san.get();
1112
1113 if (san.isUseful())
1114 progress_ = true;
1115
1116 stats_ += san;
1117 return san.getGood();
1118 }
1119
1120 return -1;
1121}
1122
1123namespace detail {
1124// Track the amount of useful data that each peer returns
1126{
1127 // Map from peer to amount of useful the peer returned
1129 // The largest amount of useful data that any peer returned
1130 int maxCount = 0;
1131
1132 // Update the data count for a peer
1133 void
1134 update(std::shared_ptr<Peer>&& peer, int dataCount)
1135 {
1136 if (dataCount <= 0)
1137 return;
1138 maxCount = std::max(maxCount, dataCount);
1139 auto i = counts.find(peer);
1140 if (i == counts.end())
1141 {
1142 counts.emplace(std::move(peer), dataCount);
1143 return;
1144 }
1145 i->second = std::max(i->second, dataCount);
1146 }
1147
1148 // Prune all the peers that didn't return enough data.
1149 void
1151 {
1152 // Remove all the peers that didn't return at least half as much data as
1153 // the best peer
1154 auto const thresh = maxCount / 2;
1155 auto i = counts.begin();
1156 while (i != counts.end())
1157 {
1158 if (i->second < thresh)
1159 {
1160 i = counts.erase(i);
1161 }
1162 else
1163 {
1164 ++i;
1165 }
1166 }
1167 }
1168
1169 // call F with the `peer` parameter with a random sample of at most n values
1170 // of the counts vector.
1171 template <class F>
1172 void
1174 {
1175 if (counts.empty())
1176 return;
1177
1178 auto outFunc = [&f](auto&& v) { f(v.first); };
1180#if _MSC_VER
1182 s.reserve(n);
1183 std::sample(counts.begin(), counts.end(), std::back_inserter(s), n, rng);
1184 for (auto& v : s)
1185 {
1186 outFunc(v);
1187 }
1188#else
1190 counts.begin(), counts.end(), boost::make_function_output_iterator(outFunc), n, rng);
1191#endif
1192 }
1193};
1194} // namespace detail
1195
1199void
1201{
1202 // Maximum number of peers to request data from
1203 static constexpr std::size_t kMaxUsefulPeers = 6;
1204
1205 decltype(receivedData_) data;
1206
1207 // Reserve some memory so the first couple iterations don't reallocate
1208 data.reserve(8);
1209
1210 detail::PeerDataCounts dataCounts;
1211
1212 for (;;)
1213 {
1214 data.clear();
1215
1216 {
1218
1219 if (receivedData_.empty())
1220 {
1221 receiveDispatched_ = false;
1222 break;
1223 }
1224
1225 data.swap(receivedData_);
1226 }
1227
1228 for (auto& entry : data)
1229 {
1230 if (auto peer = entry.first.lock())
1231 {
1232 int const count = processData(peer, *(entry.second));
1233 dataCounts.update(std::move(peer), count);
1234 }
1235 }
1236 }
1237
1238 // Select a random sample of the peers that gives us the most nodes that are
1239 // useful
1240 dataCounts.prune();
1241 dataCounts.sampleN(kMaxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
1243 });
1244}
1245
1248{
1250
1251 ScopedLockType const sl(mtx_);
1252
1253 ret[jss::hash] = to_string(hash_);
1254
1255 if (complete_)
1256 ret[jss::complete] = true;
1257
1258 if (failed_)
1259 ret[jss::failed] = true;
1260
1261 if (!complete_ && !failed_)
1262 ret[jss::peers] = static_cast<int>(peerSet_->getPeerIds().size());
1263
1264 ret[jss::have_header] = haveHeader_;
1265
1266 if (haveHeader_)
1267 {
1268 ret[jss::have_state] = haveState_;
1269 ret[jss::have_transactions] = haveTransactions_;
1270 }
1271
1272 ret[jss::timeouts] = timeouts_;
1273
1274 if (haveHeader_ && !haveState_)
1275 {
1277 for (auto const& h : neededStateHashes(16, nullptr))
1278 {
1279 hv.append(to_string(h));
1280 }
1281 ret[jss::needed_state_hashes] = hv;
1282 }
1283
1285 {
1287 for (auto const& h : neededTxHashes(16, nullptr))
1288 {
1289 hv.append(to_string(h));
1290 }
1291 ret[jss::needed_transaction_hashes] = hv;
1292 }
1293
1294 return ret;
1295}
1296
1297} // namespace xrpl
T addressof(T... args)
T back_inserter(T... args)
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
ValueType type() const
json::Value getJson(int)
Return a json::ValueType::Object.
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void runData()
Process pending TMLedgerData Query the a random sample of the 'best' peers.
void tryDB(NodeStore::Database &srcDB)
std::size_t getPeerCount() const
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter const *filter) const
SHAMapAddNode stats_
int processData(std::shared_ptr< Peer > peer, protocol::TMLedgerData const &data)
Process one TMLedgerData Returns the number of useful nodes.
void filterNodes(std::vector< std::pair< SHAMapNodeID, uint256 > > &nodes, TriggerReason reason)
std::shared_ptr< Ledger > ledger_
bool takeAsRootNode(Slice const &data, SHAMapAddNode &)
Process AS root node received from a peer Call with a lock.
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > receivedData_
bool takeHeader(std::string const &data)
Take ledger header data Call with a lock.
std::mutex receivedDataLock_
bool takeTxRootNode(Slice const &data, SHAMapAddNode &)
Process AS root node received from a peer Call with a lock.
std::unique_ptr< PeerSet > peerSet_
void addPeers()
Add more peers to the set, if possible.
beast::AbstractClock< std::chrono::steady_clock > clock_type
void init(ScopedLockType &collectionLock)
void update(std::uint32_t seq)
void receiveNode(protocol::TMLedgerData const &packet, SHAMapAddNode &)
Process node data received from a peer Call with a lock.
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter const *filter) const
std::set< uint256 > recentNodes_
bool gotData(std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > const &)
Stash a TMLedgerData received from a peer for later processing Returns 'true' if we need to dispatch.
std::vector< neededHash_t > getNeededHashes()
Persistency layer for NodeObject.
Definition Database.h:32
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::Synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:231
Rules controlling protocol behavior.
Definition Rules.h:33
std::string get() const
bool isUseful() const
bool isZero() const
Definition SHAMapHash.h:34
Identifies a node inside a SHAMap.
std::string getRawString() const
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:77
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter const *filter)
Check for nodes in the SHAMap not available.
SHAMapHash getHash() const
Definition SHAMap.cpp:836
int addRaw(Blob const &vector)
An immutable linear range of bytes.
Definition Slice.h:26
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
std::recursive_mutex mtx_
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
beast::Journal journal_
T count_if(T... args)
T emplace_back(T... args)
T for_each(T... args)
T lock(T... args)
T make_shared(T... args)
T make_unique(T... args)
T max(T... args)
@ Array
array value (ordered list)
Definition json_value.h:25
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
Charge const kFeeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const kFeeInvalidData
Keylet const & feeSettings() noexcept
The (fixed) index of the object containing the ledger fees.
Definition Indexes.cpp:221
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
static constexpr auto kReqNodesReply
Number root(Number f, unsigned d)
Definition Number.cpp:1201
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
static constexpr std::uint32_t kXrpLedgerEarliestFees
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
static constexpr auto kMissingNodesFind
@ JtLedgerData
Definition Job.h:47
static constexpr auto kLedgerBecomeAggressiveThreshold
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter const *filter)
static constexpr auto kPeerCountAdd
static constexpr auto kLedgerTimeoutRetriesMax
static constexpr auto kPeerCountStart
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
@ LedgerMaster
ledger master data for signing
Definition HashPrefix.h:48
constexpr auto kLedgerAcquireTimeout
std::vector< unsigned char > Blob
Storage for linear binary data.
Definition Blob.h:10
static constexpr auto kReqNodes
BaseUInt< 256 > uint256
Definition base_uint.h:562
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
std::enable_if_t< std::is_same_v< T, char >||std::is_same_v< T, unsigned char >, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
T push_back(T... args)
T reserve(T... args)
T sample(T... args)
T stable_partition(T... args)
T str(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void update(std::shared_ptr< Peer > &&peer, int dataCount)
void sampleN(std::size_t n, F &&f)
T to_string(T... args)
T unlock(T... args)
T what(T... args)