rippled
Loading...
Searching...
No Matches
InboundLedger.cpp
1#include <xrpld/app/ledger/AccountStateSF.h>
2#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/InboundLedgers.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionStateSF.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/overlay/Overlay.h>
8
9#include <xrpl/core/JobQueue.h>
10#include <xrpl/protocol/HashPrefix.h>
11#include <xrpl/protocol/jss.h>
12#include <xrpl/resource/Fees.h>
13#include <xrpl/shamap/SHAMapNodeID.h>
14
15#include <boost/iterator/function_output_iterator.hpp>
16
17#include <algorithm>
18#include <random>
19
20namespace xrpl {
21
22using namespace std::chrono_literals;
23
24enum {
25 // Number of peers to start with
27
28 // Number of peers to add on a timeout
29 ,
30 peerCountAdd = 3
31
32 // how many timeouts before we give up
33 ,
35
36 // how many timeouts before we get aggressive
37 ,
39
40 // Number of nodes to find initially
41 ,
43
44 // Number of nodes to request for a reply
45 ,
46 reqNodesReply = 128
47
48 // Number of nodes to request blindly
49 ,
50 reqNodes = 12
51};
52
53// millisecond for each ledger timeout
54auto constexpr ledgerAcquireTimeout = 3000ms;
55
57 Application& app,
58 uint256 const& hash,
59 std::uint32_t seq,
60 Reason reason,
61 clock_type& clock,
64 app,
65 hash,
67 {jtLEDGER_DATA, "InboundLedger", 5},
68 app.getJournal("InboundLedger"))
69 , m_clock(clock)
70 , mSeq(seq)
71 , mReason(reason)
72 , mPeerSet(std::move(peerSet))
73{
74 JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
75 touch();
76}
77
78void
80{
82 collectionLock.unlock();
83
85 if (failed_)
86 return;
87
88 if (!complete_)
89 {
90 addPeers();
91 queueJob(sl);
92 return;
93 }
94
95 JLOG(journal_.debug()) << "Acquiring ledger we already have in "
96 << " local store. " << hash_;
97 XRPL_ASSERT(
98 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES || mLedger->read(keylet::fees()),
99 "xrpl::InboundLedger::init : valid ledger fees");
100 mLedger->setImmutable();
101
103 return;
104
106
107 // Check if this could be a newer fully-validated ledger
110}
111
114{
115 auto const& peerIds = mPeerSet->getPeerIds();
116 return std::count_if(peerIds.begin(), peerIds.end(), [this](auto id) {
117 return (app_.getOverlay().findPeerByShortID(id) != nullptr);
118 });
119}
120
121void
123{
124 ScopedLockType const sl(mtx_);
125
126 // If we didn't know the sequence number, but now do, save it
127 if ((seq != 0) && (mSeq == 0))
128 mSeq = seq;
129
130 // Prevent this from being swept
131 touch();
132}
133
134bool
136{
137 ScopedLockType const sl(mtx_);
138 if (!isDone())
139 {
140 if (mLedger)
141 {
142 tryDB(mLedger->stateMap().family().db());
143 }
144 else
145 {
147 }
148 if (failed_ || complete_)
149 {
150 done();
151 return true;
152 }
153 }
154 return false;
155}
156
158{
159 // Save any received AS data not processed. It could be useful
160 // for populating a different ledger
161 for (auto& entry : mReceivedData)
162 {
163 if (entry.second->type() == protocol::liAS_NODE)
164 app_.getInboundLedgers().gotStaleData(entry.second);
165 }
166 if (!isDone())
167 {
168 JLOG(journal_.debug()) << "Acquire " << hash_ << " abort "
169 << ((timeouts_ == 0) ? std::string()
170 : (std::string("timeouts:") +
172 << mStats.get();
173 }
174}
175
177neededHashes(uint256 const& root, SHAMap& map, int max, SHAMapSyncFilter* filter)
178{
180
181 if (!root.isZero())
182 {
183 if (map.getHash().isZero())
184 {
185 ret.push_back(root);
186 }
187 else
188 {
189 auto mn = map.getMissingNodes(max, filter);
190 ret.reserve(mn.size());
191 for (auto const& n : mn)
192 ret.push_back(n.second);
193 }
194 }
195
196 return ret;
197}
198
201{
202 return neededHashes(mLedger->header().txHash, mLedger->txMap(), max, filter);
203}
204
207{
208 return neededHashes(mLedger->header().accountHash, mLedger->stateMap(), max, filter);
209}
210
211// See how much of the ledger data is stored locally
212// Data found in a fetch pack will be stored
213void
215{
216 if (!mHaveHeader)
217 {
218 auto makeLedger = [&, this](Blob const& data) {
219 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
220 Rules const rules{app_.config().features};
223 if (mLedger->header().hash != hash_ || (mSeq != 0 && mSeq != mLedger->header().seq))
224 {
225 // We know for a fact the ledger can never be acquired
226 JLOG(journal_.warn())
227 << "hash " << hash_ << " seq " << std::to_string(mSeq) << " cannot be a ledger";
228 mLedger.reset();
229 failed_ = true;
230 }
231 };
232
233 // Try to fetch the ledger header from the DB
234 if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
235 {
236 JLOG(journal_.trace()) << "Ledger header found in local store";
237
238 makeLedger(nodeObject->getData());
239 if (failed_)
240 return;
241
242 // Store the ledger header if the source and destination differ
243 auto& dstDB{mLedger->stateMap().family().db()};
244 if (std::addressof(dstDB) != std::addressof(srcDB))
245 {
246 Blob blob{nodeObject->getData()};
247 dstDB.store(hotLEDGER, std::move(blob), hash_, mLedger->header().seq);
248 }
249 }
250 else
251 {
252 // Try to fetch the ledger header from a fetch pack
253 auto data = app_.getLedgerMaster().getFetchPack(hash_);
254 if (!data)
255 return;
256
257 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
258
259 makeLedger(*data);
260 if (failed_)
261 return;
262
263 // Store the ledger header in the ledger's database
264 mLedger->stateMap().family().db().store(
265 hotLEDGER, std::move(*data), hash_, mLedger->header().seq);
266 }
267
268 if (mSeq == 0)
269 mSeq = mLedger->header().seq;
270 mLedger->stateMap().setLedgerSeq(mSeq);
271 mLedger->txMap().setLedgerSeq(mSeq);
272 mHaveHeader = true;
273 }
274
276 {
277 if (mLedger->header().txHash.isZero())
278 {
279 JLOG(journal_.trace()) << "No TXNs to fetch";
280 mHaveTransactions = true;
281 }
282 else
283 {
284 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
285 if (mLedger->txMap().fetchRoot(SHAMapHash{mLedger->header().txHash}, &filter))
286 {
287 if (neededTxHashes(1, &filter).empty())
288 {
289 JLOG(journal_.trace()) << "Had full txn map locally";
290 mHaveTransactions = true;
291 }
292 }
293 }
294 }
295
296 if (!mHaveState)
297 {
298 if (mLedger->header().accountHash.isZero())
299 {
300 JLOG(journal_.fatal()) << "We are acquiring a ledger with a zero account hash";
301 failed_ = true;
302 return;
303 }
304 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
305 if (mLedger->stateMap().fetchRoot(SHAMapHash{mLedger->header().accountHash}, &filter))
306 {
307 if (neededStateHashes(1, &filter).empty())
308 {
309 JLOG(journal_.trace()) << "Had full AS map locally";
310 mHaveState = true;
311 }
312 }
313 }
314
316 {
317 JLOG(journal_.debug()) << "Had everything locally";
318 complete_ = true;
319 XRPL_ASSERT(
320 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES || mLedger->read(keylet::fees()),
321 "xrpl::InboundLedger::tryDB : valid ledger fees");
322 mLedger->setImmutable();
323 }
324}
325
328void
330{
331 mRecentNodes.clear();
332
333 if (isDone())
334 {
335 JLOG(journal_.info()) << "Already done " << hash_;
336 return;
337 }
338
340 {
341 if (mSeq != 0)
342 {
343 JLOG(journal_.warn()) << timeouts_ << " timeouts for ledger " << mSeq;
344 }
345 else
346 {
347 JLOG(journal_.warn()) << timeouts_ << " timeouts for ledger " << hash_;
348 }
349 failed_ = true;
350 done();
351 return;
352 }
353
354 if (!wasProgress)
355 {
356 checkLocal();
357
358 mByHash = true;
359
360 std::size_t const pc = getPeerCount();
361 JLOG(journal_.debug()) << "No progress(" << pc << ") for ledger " << hash_;
362
363 // addPeers triggers if the reason is not HISTORY
364 // So if the reason IS HISTORY, need to trigger after we add
365 // otherwise, we need to trigger before we add
366 // so each peer gets triggered once
369 addPeers();
372 }
373}
374
376void
378{
379 mPeerSet->addPeers(
381 [this](auto peer) { return peer->hasLedger(hash_, mSeq); },
382 [this](auto peer) {
383 // For historical nodes, do not trigger too soon
384 // since a fetch pack is probably coming
387 });
388}
389
395
396void
398{
399 if (mSignaled)
400 return;
401
402 mSignaled = true;
403 touch();
404
405 JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
406 << ((timeouts_ == 0)
407 ? std::string()
408 : (std::string("timeouts:") + std::to_string(timeouts_) + " "))
409 << mStats.get();
410
411 XRPL_ASSERT(complete_ || failed_, "xrpl::InboundLedger::done : complete or failed");
412
413 if (complete_ && !failed_ && mLedger)
414 {
415 XRPL_ASSERT(
416 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES || mLedger->read(keylet::fees()),
417 "xrpl::InboundLedger::done : valid ledger fees");
418 mLedger->setImmutable();
419 switch (mReason)
420 {
421 case Reason::HISTORY:
423 break;
424 default:
426 break;
427 }
428 }
429
430 // We hold the PeerSet lock, so must dispatch
431 app_.getJobQueue().addJob(jtLEDGER_DATA, "AcqDone", [self = shared_from_this()]() {
432 if (self->complete_ && !self->failed_)
433 {
434 self->app_.getLedgerMaster().checkAccept(self->getLedger());
435 self->app_.getLedgerMaster().tryAdvance();
436 }
437 else
438 {
439 self->app_.getInboundLedgers().logFailure(self->hash_, self->mSeq);
440 }
441 });
442}
443
446void
448{
450
451 if (isDone())
452 {
453 JLOG(journal_.debug()) << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
454 << (failed_ ? " failed" : "");
455 return;
456 }
457
458 if (auto stream = journal_.debug())
459 {
461 ss << "Trigger acquiring ledger " << hash_;
462 if (peer)
463 ss << " from " << peer;
464
465 if (complete_ || failed_)
466 {
467 ss << " complete=" << complete_ << " failed=" << failed_;
468 }
469 else
470 {
471 ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions << " as=" << mHaveState;
472 }
473 stream << ss.str();
474 }
475
476 if (!mHaveHeader)
477 {
479 if (failed_)
480 {
481 JLOG(journal_.warn()) << " failed local for " << hash_;
482 return;
483 }
484 }
485
486 protocol::TMGetLedger tmGL;
487 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
488
489 if (timeouts_ != 0)
490 {
491 // Be more aggressive if we've timed out at least once
492 tmGL.set_querytype(protocol::qtINDIRECT);
493
495 {
496 auto need = getNeededHashes();
497
498 if (!need.empty())
499 {
500 protocol::TMGetObjectByHash tmBH;
501 bool typeSet = false;
502 tmBH.set_query(true);
503 tmBH.set_ledgerhash(hash_.begin(), hash_.size());
504 for (auto const& p : need)
505 {
506 JLOG(journal_.debug()) << "Want: " << p.second;
507
508 if (!typeSet)
509 {
510 tmBH.set_type(p.first);
511 typeSet = true;
512 }
513
514 if (p.first == tmBH.type())
515 {
516 protocol::TMIndexedObject* io = tmBH.add_objects();
517 io->set_hash(p.second.begin(), p.second.size());
518 if (mSeq != 0)
519 io->set_ledgerseq(mSeq);
520 }
521 }
522
523 auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
524 auto const& peerIds = mPeerSet->getPeerIds();
525 std::for_each(peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
526 if (auto p = app_.getOverlay().findPeerByShortID(id))
527 {
528 mByHash = false;
529 p->send(packet);
530 }
531 });
532 }
533 else
534 {
535 JLOG(journal_.info()) << "getNeededHashes says acquire is complete";
536 mHaveHeader = true;
537 mHaveTransactions = true;
538 mHaveState = true;
539 complete_ = true;
540 }
541 }
542 }
543
544 // We can't do much without the header data because we don't know the
545 // state or transaction root hashes.
546 if (!mHaveHeader && !failed_)
547 {
548 tmGL.set_itype(protocol::liBASE);
549 if (mSeq != 0)
550 tmGL.set_ledgerseq(mSeq);
551 JLOG(journal_.trace()) << "Sending header request to "
552 << (peer ? "selected peer" : "all peers");
553 mPeerSet->sendRequest(tmGL, peer);
554 return;
555 }
556
557 if (mLedger)
558 tmGL.set_ledgerseq(mLedger->header().seq);
559
560 if (reason != TriggerReason::reply)
561 {
562 // If we're querying blind, don't query deep
563 tmGL.set_querydepth(0);
564 }
565 else if (peer && peer->isHighLatency())
566 {
567 // If the peer has high latency, query extra deep
568 tmGL.set_querydepth(2);
569 }
570 else
571 {
572 tmGL.set_querydepth(1);
573 }
574
575 // Get the state data first because it's the most likely to be useful
576 // if we wind up abandoning this fetch.
577 if (mHaveHeader && !mHaveState && !failed_)
578 {
579 XRPL_ASSERT(
580 mLedger,
581 "xrpl::InboundLedger::trigger : non-null ledger to read state "
582 "from");
583
584 if (!mLedger->stateMap().isValid())
585 {
586 failed_ = true;
587 }
588 else if (mLedger->stateMap().getHash().isZero())
589 {
590 // we need the root node
591 tmGL.set_itype(protocol::liAS_NODE);
592 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
593 JLOG(journal_.trace())
594 << "Sending AS root request to " << (peer ? "selected peer" : "all peers");
595 mPeerSet->sendRequest(tmGL, peer);
596 return;
597 }
598 else
599 {
600 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
601
602 // Release the lock while we process the large state map
603 sl.unlock();
604 auto nodes = mLedger->stateMap().getMissingNodes(missingNodesFind, &filter);
605 sl.lock();
606
607 // Make sure nothing happened while we released the lock
608 if (!failed_ && !complete_ && !mHaveState)
609 {
610 if (nodes.empty())
611 {
612 if (!mLedger->stateMap().isValid())
613 {
614 failed_ = true;
615 }
616 else
617 {
618 mHaveState = true;
619
620 if (mHaveTransactions)
621 complete_ = true;
622 }
623 }
624 else
625 {
626 filterNodes(nodes, reason);
627
628 if (!nodes.empty())
629 {
630 tmGL.set_itype(protocol::liAS_NODE);
631 for (auto const& id : nodes)
632 {
633 *(tmGL.add_nodeids()) = id.first.getRawString();
634 }
635
636 JLOG(journal_.trace()) << "Sending AS node request (" << nodes.size()
637 << ") to " << (peer ? "selected peer" : "all peers");
638 mPeerSet->sendRequest(tmGL, peer);
639 return;
640 }
641
642 JLOG(journal_.trace()) << "All AS nodes filtered";
643 }
644 }
645 }
646 }
647
648 if (mHaveHeader && !mHaveTransactions && !failed_)
649 {
650 XRPL_ASSERT(
651 mLedger,
652 "xrpl::InboundLedger::trigger : non-null ledger to read "
653 "transactions from");
654
655 if (!mLedger->txMap().isValid())
656 {
657 failed_ = true;
658 }
659 else if (mLedger->txMap().getHash().isZero())
660 {
661 // we need the root node
662 tmGL.set_itype(protocol::liTX_NODE);
663 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
664 JLOG(journal_.trace())
665 << "Sending TX root request to " << (peer ? "selected peer" : "all peers");
666 mPeerSet->sendRequest(tmGL, peer);
667 return;
668 }
669 else
670 {
671 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
672
673 auto nodes = mLedger->txMap().getMissingNodes(missingNodesFind, &filter);
674
675 if (nodes.empty())
676 {
677 if (!mLedger->txMap().isValid())
678 {
679 failed_ = true;
680 }
681 else
682 {
683 mHaveTransactions = true;
684
685 if (mHaveState)
686 complete_ = true;
687 }
688 }
689 else
690 {
691 filterNodes(nodes, reason);
692
693 if (!nodes.empty())
694 {
695 tmGL.set_itype(protocol::liTX_NODE);
696 for (auto const& n : nodes)
697 {
698 *(tmGL.add_nodeids()) = n.first.getRawString();
699 }
700 JLOG(journal_.trace()) << "Sending TX node request (" << nodes.size() << ") to "
701 << (peer ? "selected peer" : "all peers");
702 mPeerSet->sendRequest(tmGL, peer);
703 return;
704 }
705
706 JLOG(journal_.trace()) << "All TX nodes filtered";
707 }
708 }
709 }
710
711 if (complete_ || failed_)
712 {
713 JLOG(journal_.debug()) << "Done:" << (complete_ ? " complete" : "")
714 << (failed_ ? " failed " : " ") << mLedger->header().seq;
715 sl.unlock();
716 done();
717 }
718}
719
720void
721InboundLedger::filterNodes(
723 TriggerReason reason)
724{
725 // Sort nodes so that the ones we haven't recently
726 // requested come before the ones we have.
727 auto dup = std::stable_partition(nodes.begin(), nodes.end(), [this](auto const& item) {
728 return mRecentNodes.count(item.second) == 0;
729 });
730
731 // If everything is a duplicate we don't want to send
732 // any query at all except on a timeout where we need
733 // to query everyone:
734 if (dup == nodes.begin())
735 {
736 JLOG(journal_.trace()) << "filterNodes: all duplicates";
737
738 if (reason != TriggerReason::timeout)
739 {
740 nodes.clear();
741 return;
742 }
743 }
744 else
745 {
746 JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
747
748 nodes.erase(dup, nodes.end());
749 }
750
751 std::size_t const limit = (reason == TriggerReason::reply) ? reqNodesReply : reqNodes;
752
753 if (nodes.size() > limit)
754 nodes.resize(limit);
755
756 for (auto const& n : nodes)
757 mRecentNodes.insert(n.second);
758}
759
763// data must not have hash prefix
764bool
765InboundLedger::takeHeader(std::string const& data)
766{
767 // Return value: true=normal, false=bad data
768 JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
769
770 if (complete_ || failed_ || mHaveHeader)
771 return true;
772
773 auto* f = &app_.getNodeFamily();
774 Rules const rules{app_.config().features};
775 mLedger = std::make_shared<Ledger>(deserializeHeader(makeSlice(data)), rules, *f);
776 if (mLedger->header().hash != hash_ || (mSeq != 0 && mSeq != mLedger->header().seq))
777 {
778 JLOG(journal_.warn()) << "Acquire hash mismatch: " << mLedger->header().hash
779 << "!=" << hash_;
780 mLedger.reset();
781 return false;
782 }
783 if (mSeq == 0)
784 mSeq = mLedger->header().seq;
785 mLedger->stateMap().setLedgerSeq(mSeq);
786 mLedger->txMap().setLedgerSeq(mSeq);
787 mHaveHeader = true;
788
789 Serializer s(data.size() + 4);
790 s.add32(HashPrefix::ledgerMaster);
791 s.addRaw(data.data(), data.size());
792 f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
793
794 if (mLedger->header().txHash.isZero())
795 mHaveTransactions = true;
796
797 if (mLedger->header().accountHash.isZero())
798 mHaveState = true;
799
800 mLedger->txMap().setSynching();
801 mLedger->stateMap().setSynching();
802
803 return true;
804}
805
809void
810InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
811{
812 if (!mHaveHeader)
813 {
814 JLOG(journal_.warn()) << "Missing ledger header";
815 san.incInvalid();
816 return;
817 }
818 if (packet.type() == protocol::liTX_NODE)
819 {
820 if (mHaveTransactions || failed_)
821 {
822 san.incDuplicate();
823 return;
824 }
825 }
826 else if (mHaveState || failed_)
827 {
828 san.incDuplicate();
829 return;
830 }
831
832 auto [map, rootHash, filter] =
834 if (packet.type() == protocol::liTX_NODE)
835 {
836 return {
837 mLedger->txMap(),
838 SHAMapHash{mLedger->header().txHash},
840 mLedger->txMap().family().db(), app_.getLedgerMaster())};
841 }
842 return {
843 mLedger->stateMap(),
844 SHAMapHash{mLedger->header().accountHash},
846 mLedger->stateMap().family().db(), app_.getLedgerMaster())};
847 }();
848
849 try
850 {
851 auto const f = filter.get();
852
853 for (auto const& node : packet.nodes())
854 {
855 auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
856
857 if (!nodeID)
858 throw std::runtime_error("data does not properly deserialize");
859
860 if (nodeID->isRoot())
861 {
862 san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
863 }
864 else
865 {
866 san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
867 }
868
869 if (!san.isGood())
870 {
871 JLOG(journal_.warn()) << "Received bad node data";
872 return;
873 }
874 }
875 }
876 catch (std::exception const& e)
877 {
878 JLOG(journal_.error()) << "Received bad node data: " << e.what();
879 san.incInvalid();
880 return;
881 }
882
883 if (!map.isSynching())
884 {
885 if (packet.type() == protocol::liTX_NODE)
886 {
887 mHaveTransactions = true;
888 }
889 else
890 {
891 mHaveState = true;
892 }
893
894 if (mHaveTransactions && mHaveState)
895 {
896 complete_ = true;
897 done();
898 }
899 }
900}
901
905bool
906InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
907{
908 if (failed_ || mHaveState)
909 {
910 san.incDuplicate();
911 return true;
912 }
913
914 if (!mHaveHeader)
915 {
916 // LCOV_EXCL_START
917 UNREACHABLE("xrpl::InboundLedger::takeAsRootNode : no ledger header");
918 return false;
919 // LCOV_EXCL_STOP
920 }
921
922 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
923 san +=
924 mLedger->stateMap().addRootNode(SHAMapHash{mLedger->header().accountHash}, data, &filter);
925 return san.isGood();
926}
927
931bool
932InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
933{
934 if (failed_ || mHaveTransactions)
935 {
936 san.incDuplicate();
937 return true;
938 }
939
940 if (!mHaveHeader)
941 {
942 // LCOV_EXCL_START
943 UNREACHABLE("xrpl::InboundLedger::takeTxRootNode : no ledger header");
944 return false;
945 // LCOV_EXCL_STOP
946 }
947
948 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
949 san += mLedger->txMap().addRootNode(SHAMapHash{mLedger->header().txHash}, data, &filter);
950 return san.isGood();
951}
952
954InboundLedger::getNeededHashes()
955{
957
958 if (!mHaveHeader)
959 {
960 ret.push_back(std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
961 return ret;
962 }
963
964 if (!mHaveState)
965 {
966 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
967 for (auto const& h : neededStateHashes(4, &filter))
968 {
969 ret.push_back(std::make_pair(protocol::TMGetObjectByHash::otSTATE_NODE, h));
970 }
971 }
972
973 if (!mHaveTransactions)
974 {
975 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
976 for (auto const& h : neededTxHashes(4, &filter))
977 {
978 ret.push_back(std::make_pair(protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
979 }
980 }
981
982 return ret;
983}
984
988bool
989InboundLedger::gotData(
992{
993 std::lock_guard const sl(mReceivedDataLock);
994
995 if (isDone())
996 return false;
997
998 mReceivedData.emplace_back(peer, data);
999
1000 if (mReceiveDispatched)
1001 return false;
1002
1003 mReceiveDispatched = true;
1004 return true;
1005}
1006
1010// VFALCO NOTE, it is not necessary to pass the entire Peer,
1011// we can get away with just a Resource::Consumer endpoint.
1012//
1013// TODO Change peer to Consumer
1014//
1015int
1016InboundLedger::processData(std::shared_ptr<Peer> peer, protocol::TMLedgerData& packet)
1017{
1018 if (packet.type() == protocol::liBASE)
1019 {
1020 if (packet.nodes().empty())
1021 {
1022 JLOG(journal_.warn()) << peer->id() << ": empty header data";
1023 peer->charge(Resource::feeMalformedRequest, "ledger_data empty header");
1024 return -1;
1025 }
1026
1027 SHAMapAddNode san;
1028
1029 ScopedLockType const sl(mtx_);
1030
1031 try
1032 {
1033 if (!mHaveHeader)
1034 {
1035 if (!takeHeader(packet.nodes(0).nodedata()))
1036 {
1037 JLOG(journal_.warn()) << "Got invalid header data";
1038 peer->charge(Resource::feeMalformedRequest, "ledger_data invalid header");
1039 return -1;
1040 }
1041
1042 san.incUseful();
1043 }
1044
1045 if (!mHaveState && (packet.nodes().size() > 1) &&
1046 !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
1047 {
1048 JLOG(journal_.warn()) << "Included AS root invalid";
1049 }
1050
1051 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1052 !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
1053 {
1054 JLOG(journal_.warn()) << "Included TX root invalid";
1055 }
1056 }
1057 catch (std::exception const& ex)
1058 {
1059 JLOG(journal_.warn()) << "Included AS/TX root invalid: " << ex.what();
1060 using namespace std::string_literals;
1061 peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
1062 return -1;
1063 }
1064
1065 if (san.isUseful())
1066 progress_ = true;
1067
1068 mStats += san;
1069 return san.getGood();
1070 }
1071
1072 if ((packet.type() == protocol::liTX_NODE) || (packet.type() == protocol::liAS_NODE))
1073 {
1074 if (packet.nodes().empty())
1075 {
1076 JLOG(journal_.info()) << peer->id() << ": response with no nodes";
1077 peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
1078 return -1;
1079 }
1080
1081 ScopedLockType const sl(mtx_);
1082
1083 // Verify node IDs and data are complete
1084 for (auto const& node : packet.nodes())
1085 {
1086 if (!node.has_nodeid() || !node.has_nodedata())
1087 {
1088 JLOG(journal_.warn()) << "Got bad node";
1089 peer->charge(Resource::feeMalformedRequest, "ledger_data bad node");
1090 return -1;
1091 }
1092 }
1093
1094 SHAMapAddNode san;
1095 receiveNode(packet, san);
1096
1097 JLOG(journal_.debug()) << "Ledger "
1098 << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
1099 << " node stats: " << san.get();
1100
1101 if (san.isUseful())
1102 progress_ = true;
1103
1104 mStats += san;
1105 return san.getGood();
1106 }
1107
1108 return -1;
1109}
1110
1111namespace detail {
1112// Track the amount of useful data that each peer returns
1114{
1115 // Map from peer to amount of useful the peer returned
1117 // The largest amount of useful data that any peer returned
1118 int maxCount = 0;
1119
1120 // Update the data count for a peer
1121 void
1122 update(std::shared_ptr<Peer>&& peer, int dataCount)
1123 {
1124 if (dataCount <= 0)
1125 return;
1126 maxCount = std::max(maxCount, dataCount);
1127 auto i = counts.find(peer);
1128 if (i == counts.end())
1129 {
1130 counts.emplace(std::move(peer), dataCount);
1131 return;
1132 }
1133 i->second = std::max(i->second, dataCount);
1134 }
1135
1136 // Prune all the peers that didn't return enough data.
1137 void
1139 {
1140 // Remove all the peers that didn't return at least half as much data as
1141 // the best peer
1142 auto const thresh = maxCount / 2;
1143 auto i = counts.begin();
1144 while (i != counts.end())
1145 {
1146 if (i->second < thresh)
1147 {
1148 i = counts.erase(i);
1149 }
1150 else
1151 {
1152 ++i;
1153 }
1154 }
1155 }
1156
1157 // call F with the `peer` parameter with a random sample of at most n values
1158 // of the counts vector.
1159 template <class F>
1160 void
1162 {
1163 if (counts.empty())
1164 return;
1165
1166 auto outFunc = [&f](auto&& v) { f(v.first); };
1168#if _MSC_VER
1170 s.reserve(n);
1171 std::sample(counts.begin(), counts.end(), std::back_inserter(s), n, rng);
1172 for (auto& v : s)
1173 {
1174 outFunc(v);
1175 }
1176#else
1178 counts.begin(), counts.end(), boost::make_function_output_iterator(outFunc), n, rng);
1179#endif
1180 }
1181};
1182} // namespace detail
1183
1187void
1188InboundLedger::runData()
1189{
1190 // Maximum number of peers to request data from
1191 constexpr std::size_t maxUsefulPeers = 6;
1192
1193 decltype(mReceivedData) data;
1194
1195 // Reserve some memory so the first couple iterations don't reallocate
1196 data.reserve(8);
1197
1198 detail::PeerDataCounts dataCounts;
1199
1200 for (;;)
1201 {
1202 data.clear();
1203
1204 {
1205 std::lock_guard const sl(mReceivedDataLock);
1206
1207 if (mReceivedData.empty())
1208 {
1209 mReceiveDispatched = false;
1210 break;
1211 }
1212
1213 data.swap(mReceivedData);
1214 }
1215
1216 for (auto& entry : data)
1217 {
1218 if (auto peer = entry.first.lock())
1219 {
1220 int const count = processData(peer, *(entry.second));
1221 dataCounts.update(std::move(peer), count);
1222 }
1223 }
1224 }
1225
1226 // Select a random sample of the peers that gives us the most nodes that are
1227 // useful
1228 dataCounts.prune();
1229 dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
1230 trigger(peer, TriggerReason::reply);
1231 });
1232}
1233
1235InboundLedger::getJson(int)
1236{
1238
1239 ScopedLockType const sl(mtx_);
1240
1241 ret[jss::hash] = to_string(hash_);
1242
1243 if (complete_)
1244 ret[jss::complete] = true;
1245
1246 if (failed_)
1247 ret[jss::failed] = true;
1248
1249 if (!complete_ && !failed_)
1250 ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
1251
1252 ret[jss::have_header] = mHaveHeader;
1253
1254 if (mHaveHeader)
1255 {
1256 ret[jss::have_state] = mHaveState;
1257 ret[jss::have_transactions] = mHaveTransactions;
1258 }
1259
1260 ret[jss::timeouts] = timeouts_;
1261
1262 if (mHaveHeader && !mHaveState)
1263 {
1265 for (auto const& h : neededStateHashes(16, nullptr))
1266 {
1267 hv.append(to_string(h));
1268 }
1269 ret[jss::needed_state_hashes] = hv;
1270 }
1271
1272 if (mHaveHeader && !mHaveTransactions)
1273 {
1275 for (auto const& h : neededTxHashes(16, nullptr))
1276 {
1277 hv.append(to_string(h));
1278 }
1279 ret[jss::needed_transaction_hashes] = hv;
1280 }
1281
1282 return ret;
1283}
1284
1285} // namespace xrpl
T addressof(T... args)
T back_inserter(T... args)
T begin(T... args)
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
ValueType type() const
Stream fatal() const
Definition Journal.h:325
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual Config & config()=0
std::unordered_set< uint256, beast::uhash<> > features
Definition Config.h:261
virtual NodeStore::Database & db()=0
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void tryDB(NodeStore::Database &srcDB)
std::size_t getPeerCount() const
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
SHAMapAddNode mStats
void addPeers()
Add more peers to the set, if possible.
std::shared_ptr< Ledger > mLedger
std::set< uint256 > mRecentNodes
void init(ScopedLockType &collectionLock)
void update(std::uint32_t seq)
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
std::vector< neededHash_t > getNeededHashes()
std::unique_ptr< PeerSet > mPeerSet
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
bool storeLedger(std::shared_ptr< Ledger const > ledger)
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the corresponding hash from peers.
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
Persistency layer for NodeObject.
Definition Database.h:31
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:209
Rules controlling protocol behavior.
Definition Rules.h:18
std::string get() const
bool isUseful() const
bool isZero() const
Definition SHAMapHash.h:34
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:77
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
SHAMapHash getHash() const
Definition SHAMap.cpp:813
int addRaw(Blob const &vector)
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual Family & getNodeFamily()=0
An immutable linear range of bytes.
Definition Slice.h:26
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
beast::Journal journal_
iterator begin()
Definition base_uint.h:112
static constexpr std::size_t size()
Definition base_uint.h:499
T count_if(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T is_same_v
T make_pair(T... args)
T max(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Definition Indexes.cpp:200
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
auto constexpr ledgerAcquireTimeout
Number root(Number f, unsigned d)
Definition Number.cpp:958
@ hotLEDGER
Definition NodeObject.h:14
@ ledgerBecomeAggressiveThreshold
@ peerCountStart
@ ledgerTimeoutRetriesMax
@ reqNodesReply
@ missingNodesFind
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
@ jtLEDGER_DATA
Definition Job.h:45
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T sample(T... args)
T stable_partition(T... args)
T str(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void update(std::shared_ptr< Peer > &&peer, int dataCount)
void sampleN(std::size_t n, F &&f)
T to_string(T... args)
T unlock(T... args)
T what(T... args)