rippled
Loading...
Searching...
No Matches
InboundLedger.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/ledger/AccountStateSF.h>
21#include <xrpld/app/ledger/InboundLedger.h>
22#include <xrpld/app/ledger/InboundLedgers.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionStateSF.h>
25#include <xrpld/app/main/Application.h>
26#include <xrpld/core/JobQueue.h>
27#include <xrpld/overlay/Overlay.h>
28
29#include <xrpl/basics/Log.h>
30#include <xrpl/protocol/HashPrefix.h>
31#include <xrpl/protocol/jss.h>
32#include <xrpl/resource/Fees.h>
33#include <xrpl/shamap/SHAMapNodeID.h>
34
35#include <boost/iterator/function_output_iterator.hpp>
36
37#include <algorithm>
38#include <random>
39
40namespace ripple {
41
42using namespace std::chrono_literals;
43
44enum {
45 // Number of peers to start with
47
48 // Number of peers to add on a timeout
49 ,
50 peerCountAdd = 3
51
52 // how many timeouts before we give up
53 ,
55
56 // how many timeouts before we get aggressive
57 ,
59
60 // Number of nodes to find initially
61 ,
63
64 // Number of nodes to request for a reply
65 ,
66 reqNodesReply = 128
67
68 // Number of nodes to request blindly
69 ,
70 reqNodes = 12
71};
72
73// millisecond for each ledger timeout
74auto constexpr ledgerAcquireTimeout = 3000ms;
75
77 Application& app,
78 uint256 const& hash,
79 std::uint32_t seq,
80 Reason reason,
81 clock_type& clock,
84 app,
85 hash,
87 {jtLEDGER_DATA, "InboundLedger", 5},
88 app.journal("InboundLedger"))
89 , m_clock(clock)
90 , mHaveHeader(false)
91 , mHaveState(false)
92 , mHaveTransactions(false)
93 , mSignaled(false)
94 , mByHash(true)
95 , mSeq(seq)
96 , mReason(reason)
97 , mReceiveDispatched(false)
98 , mPeerSet(std::move(peerSet))
99{
100 JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
101 touch();
102}
103
104void
106{
108 collectionLock.unlock();
109
111 if (failed_)
112 return;
113
114 if (!complete_)
115 {
116 addPeers();
117 queueJob(sl);
118 return;
119 }
120
121 JLOG(journal_.debug()) << "Acquiring ledger we already have in "
122 << " local store. " << hash_;
123 XRPL_ASSERT(
124 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
125 mLedger->read(keylet::fees()),
126 "ripple::InboundLedger::init : valid ledger fees");
127 mLedger->setImmutable();
128
130 return;
131
133
134 // Check if this could be a newer fully-validated ledger
137}
138
141{
142 auto const& peerIds = mPeerSet->getPeerIds();
143 return std::count_if(peerIds.begin(), peerIds.end(), [this](auto id) {
144 return (app_.overlay().findPeerByShortID(id) != nullptr);
145 });
146}
147
148void
150{
152
153 // If we didn't know the sequence number, but now do, save it
154 if ((seq != 0) && (mSeq == 0))
155 mSeq = seq;
156
157 // Prevent this from being swept
158 touch();
159}
160
161bool
163{
165 if (!isDone())
166 {
167 if (mLedger)
168 tryDB(mLedger->stateMap().family().db());
169 else
171 if (failed_ || complete_)
172 {
173 done();
174 return true;
175 }
176 }
177 return false;
178}
179
181{
182 // Save any received AS data not processed. It could be useful
183 // for populating a different ledger
184 for (auto& entry : mReceivedData)
185 {
186 if (entry.second->type() == protocol::liAS_NODE)
187 app_.getInboundLedgers().gotStaleData(entry.second);
188 }
189 if (!isDone())
190 {
191 JLOG(journal_.debug())
192 << "Acquire " << hash_ << " abort "
193 << ((timeouts_ == 0) ? std::string()
194 : (std::string("timeouts:") +
196 << mStats.get();
197 }
198}
199
202 uint256 const& root,
203 SHAMap& map,
204 int max,
205 SHAMapSyncFilter* filter)
206{
208
209 if (!root.isZero())
210 {
211 if (map.getHash().isZero())
212 ret.push_back(root);
213 else
214 {
215 auto mn = map.getMissingNodes(max, filter);
216 ret.reserve(mn.size());
217 for (auto const& n : mn)
218 ret.push_back(n.second);
219 }
220 }
221
222 return ret;
223}
224
227{
228 return neededHashes(mLedger->info().txHash, mLedger->txMap(), max, filter);
229}
230
233{
234 return neededHashes(
235 mLedger->info().accountHash, mLedger->stateMap(), max, filter);
236}
237
238// See how much of the ledger data is stored locally
239// Data found in a fetch pack will be stored
240void
242{
243 if (!mHaveHeader)
244 {
245 auto makeLedger = [&, this](Blob const& data) {
246 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
249 app_.config(),
251 if (mLedger->info().hash != hash_ ||
252 (mSeq != 0 && mSeq != mLedger->info().seq))
253 {
254 // We know for a fact the ledger can never be acquired
255 JLOG(journal_.warn())
256 << "hash " << hash_ << " seq " << std::to_string(mSeq)
257 << " cannot be a ledger";
258 mLedger.reset();
259 failed_ = true;
260 }
261 };
262
263 // Try to fetch the ledger header from the DB
264 if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
265 {
266 JLOG(journal_.trace()) << "Ledger header found in local store";
267
268 makeLedger(nodeObject->getData());
269 if (failed_)
270 return;
271
272 // Store the ledger header if the source and destination differ
273 auto& dstDB{mLedger->stateMap().family().db()};
274 if (std::addressof(dstDB) != std::addressof(srcDB))
275 {
276 Blob blob{nodeObject->getData()};
277 dstDB.store(
278 hotLEDGER, std::move(blob), hash_, mLedger->info().seq);
279 }
280 }
281 else
282 {
283 // Try to fetch the ledger header from a fetch pack
284 auto data = app_.getLedgerMaster().getFetchPack(hash_);
285 if (!data)
286 return;
287
288 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
289
290 makeLedger(*data);
291 if (failed_)
292 return;
293
294 // Store the ledger header in the ledger's database
295 mLedger->stateMap().family().db().store(
296 hotLEDGER, std::move(*data), hash_, mLedger->info().seq);
297 }
298
299 if (mSeq == 0)
300 mSeq = mLedger->info().seq;
301 mLedger->stateMap().setLedgerSeq(mSeq);
302 mLedger->txMap().setLedgerSeq(mSeq);
303 mHaveHeader = true;
304 }
305
307 {
308 if (mLedger->info().txHash.isZero())
309 {
310 JLOG(journal_.trace()) << "No TXNs to fetch";
311 mHaveTransactions = true;
312 }
313 else
314 {
315 TransactionStateSF filter(
316 mLedger->txMap().family().db(), app_.getLedgerMaster());
317 if (mLedger->txMap().fetchRoot(
318 SHAMapHash{mLedger->info().txHash}, &filter))
319 {
320 if (neededTxHashes(1, &filter).empty())
321 {
322 JLOG(journal_.trace()) << "Had full txn map locally";
323 mHaveTransactions = true;
324 }
325 }
326 }
327 }
328
329 if (!mHaveState)
330 {
331 if (mLedger->info().accountHash.isZero())
332 {
333 JLOG(journal_.fatal())
334 << "We are acquiring a ledger with a zero account hash";
335 failed_ = true;
336 return;
337 }
338 AccountStateSF filter(
339 mLedger->stateMap().family().db(), app_.getLedgerMaster());
340 if (mLedger->stateMap().fetchRoot(
341 SHAMapHash{mLedger->info().accountHash}, &filter))
342 {
343 if (neededStateHashes(1, &filter).empty())
344 {
345 JLOG(journal_.trace()) << "Had full AS map locally";
346 mHaveState = true;
347 }
348 }
349 }
350
352 {
353 JLOG(journal_.debug()) << "Had everything locally";
354 complete_ = true;
355 XRPL_ASSERT(
356 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
357 mLedger->read(keylet::fees()),
358 "ripple::InboundLedger::tryDB : valid ledger fees");
359 mLedger->setImmutable();
360 }
361}
362
365void
367{
368 mRecentNodes.clear();
369
370 if (isDone())
371 {
372 JLOG(journal_.info()) << "Already done " << hash_;
373 return;
374 }
375
377 {
378 if (mSeq != 0)
379 {
380 JLOG(journal_.warn())
381 << timeouts_ << " timeouts for ledger " << mSeq;
382 }
383 else
384 {
385 JLOG(journal_.warn())
386 << timeouts_ << " timeouts for ledger " << hash_;
387 }
388 failed_ = true;
389 done();
390 return;
391 }
392
393 if (!wasProgress)
394 {
395 checkLocal();
396
397 mByHash = true;
398
400 JLOG(journal_.debug())
401 << "No progress(" << pc << ") for ledger " << hash_;
402
403 // addPeers triggers if the reason is not HISTORY
404 // So if the reason IS HISTORY, need to trigger after we add
405 // otherwise, we need to trigger before we add
406 // so each peer gets triggered once
409 addPeers();
412 }
413}
414
416void
418{
419 mPeerSet->addPeers(
421 [this](auto peer) { return peer->hasLedger(hash_, mSeq); },
422 [this](auto peer) {
423 // For historical nodes, do not trigger too soon
424 // since a fetch pack is probably coming
427 });
428}
429
435
436void
438{
439 if (mSignaled)
440 return;
441
442 mSignaled = true;
443 touch();
444
445 JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
446 << ((timeouts_ == 0)
447 ? std::string()
448 : (std::string("timeouts:") +
450 << mStats.get();
451
452 XRPL_ASSERT(
454 "ripple::InboundLedger::done : complete or failed");
455
456 if (complete_ && !failed_ && mLedger)
457 {
458 XRPL_ASSERT(
459 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
460 mLedger->read(keylet::fees()),
461 "ripple::InboundLedger::done : valid ledger fees");
462 mLedger->setImmutable();
463 switch (mReason)
464 {
465 case Reason::HISTORY:
467 break;
468 default:
470 break;
471 }
472 }
473
474 // We hold the PeerSet lock, so must dispatch
476 jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
477 if (self->complete_ && !self->failed_)
478 {
479 self->app_.getLedgerMaster().checkAccept(self->getLedger());
480 self->app_.getLedgerMaster().tryAdvance();
481 }
482 else
483 self->app_.getInboundLedgers().logFailure(
484 self->hash_, self->mSeq);
485 });
486}
487
490void
492{
494
495 if (isDone())
496 {
497 JLOG(journal_.debug())
498 << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
499 << (failed_ ? " failed" : "");
500 return;
501 }
502
503 if (auto stream = journal_.debug())
504 {
506 ss << "Trigger acquiring ledger " << hash_;
507 if (peer)
508 ss << " from " << peer;
509
510 if (complete_ || failed_)
511 ss << " complete=" << complete_ << " failed=" << failed_;
512 else
513 ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
514 << " as=" << mHaveState;
515 stream << ss.str();
516 }
517
518 if (!mHaveHeader)
519 {
521 if (failed_)
522 {
523 JLOG(journal_.warn()) << " failed local for " << hash_;
524 return;
525 }
526 }
527
528 protocol::TMGetLedger tmGL;
529 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
530
531 if (timeouts_ != 0)
532 {
533 // Be more aggressive if we've timed out at least once
534 tmGL.set_querytype(protocol::qtINDIRECT);
535
536 if (!progress_ && !failed_ && mByHash &&
538 {
539 auto need = getNeededHashes();
540
541 if (!need.empty())
542 {
543 protocol::TMGetObjectByHash tmBH;
544 bool typeSet = false;
545 tmBH.set_query(true);
546 tmBH.set_ledgerhash(hash_.begin(), hash_.size());
547 for (auto const& p : need)
548 {
549 JLOG(journal_.debug()) << "Want: " << p.second;
550
551 if (!typeSet)
552 {
553 tmBH.set_type(p.first);
554 typeSet = true;
555 }
556
557 if (p.first == tmBH.type())
558 {
559 protocol::TMIndexedObject* io = tmBH.add_objects();
560 io->set_hash(p.second.begin(), p.second.size());
561 if (mSeq != 0)
562 io->set_ledgerseq(mSeq);
563 }
564 }
565
566 auto packet =
567 std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
568 auto const& peerIds = mPeerSet->getPeerIds();
570 peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
571 if (auto p = app_.overlay().findPeerByShortID(id))
572 {
573 mByHash = false;
574 p->send(packet);
575 }
576 });
577 }
578 else
579 {
580 JLOG(journal_.info())
581 << "getNeededHashes says acquire is complete";
582 mHaveHeader = true;
583 mHaveTransactions = true;
584 mHaveState = true;
585 complete_ = true;
586 }
587 }
588 }
589
590 // We can't do much without the header data because we don't know the
591 // state or transaction root hashes.
592 if (!mHaveHeader && !failed_)
593 {
594 tmGL.set_itype(protocol::liBASE);
595 if (mSeq != 0)
596 tmGL.set_ledgerseq(mSeq);
597 JLOG(journal_.trace()) << "Sending header request to "
598 << (peer ? "selected peer" : "all peers");
599 mPeerSet->sendRequest(tmGL, peer);
600 return;
601 }
602
603 if (mLedger)
604 tmGL.set_ledgerseq(mLedger->info().seq);
605
606 if (reason != TriggerReason::reply)
607 {
608 // If we're querying blind, don't query deep
609 tmGL.set_querydepth(0);
610 }
611 else if (peer && peer->isHighLatency())
612 {
613 // If the peer has high latency, query extra deep
614 tmGL.set_querydepth(2);
615 }
616 else
617 tmGL.set_querydepth(1);
618
619 // Get the state data first because it's the most likely to be useful
620 // if we wind up abandoning this fetch.
621 if (mHaveHeader && !mHaveState && !failed_)
622 {
623 XRPL_ASSERT(
624 mLedger,
625 "ripple::InboundLedger::trigger : non-null ledger to read state "
626 "from");
627
628 if (!mLedger->stateMap().isValid())
629 {
630 failed_ = true;
631 }
632 else if (mLedger->stateMap().getHash().isZero())
633 {
634 // we need the root node
635 tmGL.set_itype(protocol::liAS_NODE);
636 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
637 JLOG(journal_.trace()) << "Sending AS root request to "
638 << (peer ? "selected peer" : "all peers");
639 mPeerSet->sendRequest(tmGL, peer);
640 return;
641 }
642 else
643 {
644 AccountStateSF filter(
645 mLedger->stateMap().family().db(), app_.getLedgerMaster());
646
647 // Release the lock while we process the large state map
648 sl.unlock();
649 auto nodes =
650 mLedger->stateMap().getMissingNodes(missingNodesFind, &filter);
651 sl.lock();
652
653 // Make sure nothing happened while we released the lock
654 if (!failed_ && !complete_ && !mHaveState)
655 {
656 if (nodes.empty())
657 {
658 if (!mLedger->stateMap().isValid())
659 failed_ = true;
660 else
661 {
662 mHaveState = true;
663
664 if (mHaveTransactions)
665 complete_ = true;
666 }
667 }
668 else
669 {
670 filterNodes(nodes, reason);
671
672 if (!nodes.empty())
673 {
674 tmGL.set_itype(protocol::liAS_NODE);
675 for (auto const& id : nodes)
676 {
677 *(tmGL.add_nodeids()) = id.first.getRawString();
678 }
679
680 JLOG(journal_.trace())
681 << "Sending AS node request (" << nodes.size()
682 << ") to "
683 << (peer ? "selected peer" : "all peers");
684 mPeerSet->sendRequest(tmGL, peer);
685 return;
686 }
687 else
688 {
689 JLOG(journal_.trace()) << "All AS nodes filtered";
690 }
691 }
692 }
693 }
694 }
695
696 if (mHaveHeader && !mHaveTransactions && !failed_)
697 {
698 XRPL_ASSERT(
699 mLedger,
700 "ripple::InboundLedger::trigger : non-null ledger to read "
701 "transactions from");
702
703 if (!mLedger->txMap().isValid())
704 {
705 failed_ = true;
706 }
707 else if (mLedger->txMap().getHash().isZero())
708 {
709 // we need the root node
710 tmGL.set_itype(protocol::liTX_NODE);
711 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
712 JLOG(journal_.trace()) << "Sending TX root request to "
713 << (peer ? "selected peer" : "all peers");
714 mPeerSet->sendRequest(tmGL, peer);
715 return;
716 }
717 else
718 {
719 TransactionStateSF filter(
720 mLedger->txMap().family().db(), app_.getLedgerMaster());
721
722 auto nodes =
723 mLedger->txMap().getMissingNodes(missingNodesFind, &filter);
724
725 if (nodes.empty())
726 {
727 if (!mLedger->txMap().isValid())
728 failed_ = true;
729 else
730 {
731 mHaveTransactions = true;
732
733 if (mHaveState)
734 complete_ = true;
735 }
736 }
737 else
738 {
739 filterNodes(nodes, reason);
740
741 if (!nodes.empty())
742 {
743 tmGL.set_itype(protocol::liTX_NODE);
744 for (auto const& n : nodes)
745 {
746 *(tmGL.add_nodeids()) = n.first.getRawString();
747 }
748 JLOG(journal_.trace())
749 << "Sending TX node request (" << nodes.size()
750 << ") to " << (peer ? "selected peer" : "all peers");
751 mPeerSet->sendRequest(tmGL, peer);
752 return;
753 }
754 else
755 {
756 JLOG(journal_.trace()) << "All TX nodes filtered";
757 }
758 }
759 }
760 }
761
762 if (complete_ || failed_)
763 {
764 JLOG(journal_.debug())
765 << "Done:" << (complete_ ? " complete" : "")
766 << (failed_ ? " failed " : " ") << mLedger->info().seq;
767 sl.unlock();
768 done();
769 }
770}
771
772void
773InboundLedger::filterNodes(
775 TriggerReason reason)
776{
777 // Sort nodes so that the ones we haven't recently
778 // requested come before the ones we have.
779 auto dup = std::stable_partition(
780 nodes.begin(), nodes.end(), [this](auto const& item) {
781 return mRecentNodes.count(item.second) == 0;
782 });
783
784 // If everything is a duplicate we don't want to send
785 // any query at all except on a timeout where we need
786 // to query everyone:
787 if (dup == nodes.begin())
788 {
789 JLOG(journal_.trace()) << "filterNodes: all duplicates";
790
791 if (reason != TriggerReason::timeout)
792 {
793 nodes.clear();
794 return;
795 }
796 }
797 else
798 {
799 JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
800
801 nodes.erase(dup, nodes.end());
802 }
803
804 std::size_t const limit =
805 (reason == TriggerReason::reply) ? reqNodesReply : reqNodes;
806
807 if (nodes.size() > limit)
808 nodes.resize(limit);
809
810 for (auto const& n : nodes)
811 mRecentNodes.insert(n.second);
812}
813
817// data must not have hash prefix
818bool
819InboundLedger::takeHeader(std::string const& data)
820{
821 // Return value: true=normal, false=bad data
822 JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
823
824 if (complete_ || failed_ || mHaveHeader)
825 return true;
826
827 auto* f = &app_.getNodeFamily();
828 mLedger = std::make_shared<Ledger>(
829 deserializeHeader(makeSlice(data)), app_.config(), *f);
830 if (mLedger->info().hash != hash_ ||
831 (mSeq != 0 && mSeq != mLedger->info().seq))
832 {
833 JLOG(journal_.warn())
834 << "Acquire hash mismatch: " << mLedger->info().hash
835 << "!=" << hash_;
836 mLedger.reset();
837 return false;
838 }
839 if (mSeq == 0)
840 mSeq = mLedger->info().seq;
841 mLedger->stateMap().setLedgerSeq(mSeq);
842 mLedger->txMap().setLedgerSeq(mSeq);
843 mHaveHeader = true;
844
845 Serializer s(data.size() + 4);
846 s.add32(HashPrefix::ledgerMaster);
847 s.addRaw(data.data(), data.size());
848 f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
849
850 if (mLedger->info().txHash.isZero())
851 mHaveTransactions = true;
852
853 if (mLedger->info().accountHash.isZero())
854 mHaveState = true;
855
856 mLedger->txMap().setSynching();
857 mLedger->stateMap().setSynching();
858
859 return true;
860}
861
865void
866InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
867{
868 if (!mHaveHeader)
869 {
870 JLOG(journal_.warn()) << "Missing ledger header";
871 san.incInvalid();
872 return;
873 }
874 if (packet.type() == protocol::liTX_NODE)
875 {
876 if (mHaveTransactions || failed_)
877 {
878 san.incDuplicate();
879 return;
880 }
881 }
882 else if (mHaveState || failed_)
883 {
884 san.incDuplicate();
885 return;
886 }
887
888 auto [map, rootHash, filter] = [&]()
890 if (packet.type() == protocol::liTX_NODE)
891 return {
892 mLedger->txMap(),
893 SHAMapHash{mLedger->info().txHash},
895 mLedger->txMap().family().db(), app_.getLedgerMaster())};
896 return {
897 mLedger->stateMap(),
898 SHAMapHash{mLedger->info().accountHash},
900 mLedger->stateMap().family().db(), app_.getLedgerMaster())};
901 }();
902
903 try
904 {
905 auto const f = filter.get();
906
907 for (auto const& node : packet.nodes())
908 {
909 auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
910
911 if (!nodeID)
912 throw std::runtime_error("data does not properly deserialize");
913
914 if (nodeID->isRoot())
915 {
916 san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
917 }
918 else
919 {
920 san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
921 }
922
923 if (!san.isGood())
924 {
925 JLOG(journal_.warn()) << "Received bad node data";
926 return;
927 }
928 }
929 }
930 catch (std::exception const& e)
931 {
932 JLOG(journal_.error()) << "Received bad node data: " << e.what();
933 san.incInvalid();
934 return;
935 }
936
937 if (!map.isSynching())
938 {
939 if (packet.type() == protocol::liTX_NODE)
940 mHaveTransactions = true;
941 else
942 mHaveState = true;
943
944 if (mHaveTransactions && mHaveState)
945 {
946 complete_ = true;
947 done();
948 }
949 }
950}
951
955bool
956InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
957{
958 if (failed_ || mHaveState)
959 {
960 san.incDuplicate();
961 return true;
962 }
963
964 if (!mHaveHeader)
965 {
966 // LCOV_EXCL_START
967 UNREACHABLE("ripple::InboundLedger::takeAsRootNode : no ledger header");
968 return false;
969 // LCOV_EXCL_STOP
970 }
971
972 AccountStateSF filter(
973 mLedger->stateMap().family().db(), app_.getLedgerMaster());
974 san += mLedger->stateMap().addRootNode(
975 SHAMapHash{mLedger->info().accountHash}, data, &filter);
976 return san.isGood();
977}
978
982bool
983InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
984{
985 if (failed_ || mHaveTransactions)
986 {
987 san.incDuplicate();
988 return true;
989 }
990
991 if (!mHaveHeader)
992 {
993 // LCOV_EXCL_START
994 UNREACHABLE("ripple::InboundLedger::takeTxRootNode : no ledger header");
995 return false;
996 // LCOV_EXCL_STOP
997 }
998
999 TransactionStateSF filter(
1000 mLedger->txMap().family().db(), app_.getLedgerMaster());
1001 san += mLedger->txMap().addRootNode(
1002 SHAMapHash{mLedger->info().txHash}, data, &filter);
1003 return san.isGood();
1004}
1005
1007InboundLedger::getNeededHashes()
1008{
1010
1011 if (!mHaveHeader)
1012 {
1013 ret.push_back(
1014 std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
1015 return ret;
1016 }
1017
1018 if (!mHaveState)
1019 {
1020 AccountStateSF filter(
1021 mLedger->stateMap().family().db(), app_.getLedgerMaster());
1022 for (auto const& h : neededStateHashes(4, &filter))
1023 {
1024 ret.push_back(
1025 std::make_pair(protocol::TMGetObjectByHash::otSTATE_NODE, h));
1026 }
1027 }
1028
1029 if (!mHaveTransactions)
1030 {
1031 TransactionStateSF filter(
1032 mLedger->txMap().family().db(), app_.getLedgerMaster());
1033 for (auto const& h : neededTxHashes(4, &filter))
1034 {
1036 protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
1037 }
1038 }
1039
1040 return ret;
1041}
1042
1046bool
1047InboundLedger::gotData(
1050{
1051 std::lock_guard sl(mReceivedDataLock);
1052
1053 if (isDone())
1054 return false;
1055
1056 mReceivedData.emplace_back(peer, data);
1057
1058 if (mReceiveDispatched)
1059 return false;
1060
1061 mReceiveDispatched = true;
1062 return true;
1063}
1064
1068// VFALCO NOTE, it is not necessary to pass the entire Peer,
1069// we can get away with just a Resource::Consumer endpoint.
1070//
1071// TODO Change peer to Consumer
1072//
1073int
1074InboundLedger::processData(
1076 protocol::TMLedgerData& packet)
1077{
1078 if (packet.type() == protocol::liBASE)
1079 {
1080 if (packet.nodes().empty())
1081 {
1082 JLOG(journal_.warn()) << peer->id() << ": empty header data";
1083 peer->charge(
1084 Resource::feeMalformedRequest, "ledger_data empty header");
1085 return -1;
1086 }
1087
1088 SHAMapAddNode san;
1089
1090 ScopedLockType sl(mtx_);
1091
1092 try
1093 {
1094 if (!mHaveHeader)
1095 {
1096 if (!takeHeader(packet.nodes(0).nodedata()))
1097 {
1098 JLOG(journal_.warn()) << "Got invalid header data";
1099 peer->charge(
1100 Resource::feeMalformedRequest,
1101 "ledger_data invalid header");
1102 return -1;
1103 }
1104
1105 san.incUseful();
1106 }
1107
1108 if (!mHaveState && (packet.nodes().size() > 1) &&
1109 !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
1110 {
1111 JLOG(journal_.warn()) << "Included AS root invalid";
1112 }
1113
1114 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1115 !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
1116 {
1117 JLOG(journal_.warn()) << "Included TX root invalid";
1118 }
1119 }
1120 catch (std::exception const& ex)
1121 {
1122 JLOG(journal_.warn())
1123 << "Included AS/TX root invalid: " << ex.what();
1124 using namespace std::string_literals;
1125 peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
1126 return -1;
1127 }
1128
1129 if (san.isUseful())
1130 progress_ = true;
1131
1132 mStats += san;
1133 return san.getGood();
1134 }
1135
1136 if ((packet.type() == protocol::liTX_NODE) ||
1137 (packet.type() == protocol::liAS_NODE))
1138 {
1139 std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: "
1140 : "liAS_NODE: ";
1141
1142 if (packet.nodes().empty())
1143 {
1144 JLOG(journal_.info()) << peer->id() << ": response with no nodes";
1145 peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
1146 return -1;
1147 }
1148
1149 ScopedLockType sl(mtx_);
1150
1151 // Verify node IDs and data are complete
1152 for (auto const& node : packet.nodes())
1153 {
1154 if (!node.has_nodeid() || !node.has_nodedata())
1155 {
1156 JLOG(journal_.warn()) << "Got bad node";
1157 peer->charge(
1158 Resource::feeMalformedRequest, "ledger_data bad node");
1159 return -1;
1160 }
1161 }
1162
1163 SHAMapAddNode san;
1164 receiveNode(packet, san);
1165
1166 JLOG(journal_.debug())
1167 << "Ledger "
1168 << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
1169 << " node stats: " << san.get();
1170
1171 if (san.isUseful())
1172 progress_ = true;
1173
1174 mStats += san;
1175 return san.getGood();
1176 }
1177
1178 return -1;
1179}
1180
1181namespace detail {
1182// Track the amount of useful data that each peer returns
1184{
1185 // Map from peer to amount of useful the peer returned
1187 // The largest amount of useful data that any peer returned
1188 int maxCount = 0;
1189
1190 // Update the data count for a peer
1191 void
1192 update(std::shared_ptr<Peer>&& peer, int dataCount)
1193 {
1194 if (dataCount <= 0)
1195 return;
1196 maxCount = std::max(maxCount, dataCount);
1197 auto i = counts.find(peer);
1198 if (i == counts.end())
1199 {
1200 counts.emplace(std::move(peer), dataCount);
1201 return;
1202 }
1203 i->second = std::max(i->second, dataCount);
1204 }
1205
1206 // Prune all the peers that didn't return enough data.
1207 void
1209 {
1210 // Remove all the peers that didn't return at least half as much data as
1211 // the best peer
1212 auto const thresh = maxCount / 2;
1213 auto i = counts.begin();
1214 while (i != counts.end())
1215 {
1216 if (i->second < thresh)
1217 i = counts.erase(i);
1218 else
1219 ++i;
1220 }
1221 }
1222
1223 // call F with the `peer` parameter with a random sample of at most n values
1224 // of the counts vector.
1225 template <class F>
1226 void
1228 {
1229 if (counts.empty())
1230 return;
1231
1232 auto outFunc = [&f](auto&& v) { f(v.first); };
1234#if _MSC_VER
1236 s.reserve(n);
1238 counts.begin(), counts.end(), std::back_inserter(s), n, rng);
1239 for (auto& v : s)
1240 {
1241 outFunc(v);
1242 }
1243#else
1245 counts.begin(),
1246 counts.end(),
1247 boost::make_function_output_iterator(outFunc),
1248 n,
1249 rng);
1250#endif
1251 }
1252};
1253} // namespace detail
1254
1258void
1259InboundLedger::runData()
1260{
1261 // Maximum number of peers to request data from
1262 constexpr std::size_t maxUsefulPeers = 6;
1263
1264 decltype(mReceivedData) data;
1265
1266 // Reserve some memory so the first couple iterations don't reallocate
1267 data.reserve(8);
1268
1269 detail::PeerDataCounts dataCounts;
1270
1271 for (;;)
1272 {
1273 data.clear();
1274
1275 {
1276 std::lock_guard sl(mReceivedDataLock);
1277
1278 if (mReceivedData.empty())
1279 {
1280 mReceiveDispatched = false;
1281 break;
1282 }
1283
1284 data.swap(mReceivedData);
1285 }
1286
1287 for (auto& entry : data)
1288 {
1289 if (auto peer = entry.first.lock())
1290 {
1291 int count = processData(peer, *(entry.second));
1292 dataCounts.update(std::move(peer), count);
1293 }
1294 }
1295 }
1296
1297 // Select a random sample of the peers that gives us the most nodes that are
1298 // useful
1299 dataCounts.prune();
1300 dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
1301 trigger(peer, TriggerReason::reply);
1302 });
1303}
1304
1306InboundLedger::getJson(int)
1307{
1309
1310 ScopedLockType sl(mtx_);
1311
1312 ret[jss::hash] = to_string(hash_);
1313
1314 if (complete_)
1315 ret[jss::complete] = true;
1316
1317 if (failed_)
1318 ret[jss::failed] = true;
1319
1320 if (!complete_ && !failed_)
1321 ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
1322
1323 ret[jss::have_header] = mHaveHeader;
1324
1325 if (mHaveHeader)
1326 {
1327 ret[jss::have_state] = mHaveState;
1328 ret[jss::have_transactions] = mHaveTransactions;
1329 }
1330
1331 ret[jss::timeouts] = timeouts_;
1332
1333 if (mHaveHeader && !mHaveState)
1334 {
1336 for (auto const& h : neededStateHashes(16, nullptr))
1337 {
1338 hv.append(to_string(h));
1339 }
1340 ret[jss::needed_state_hashes] = hv;
1341 }
1342
1343 if (mHaveHeader && !mHaveTransactions)
1344 {
1346 for (auto const& h : neededTxHashes(16, nullptr))
1347 {
1348 hv.append(to_string(h));
1349 }
1350 ret[jss::needed_transaction_hashes] = hv;
1351 }
1352
1353 return ret;
1354}
1355
1356} // namespace ripple
T addressof(T... args)
T back_inserter(T... args)
T begin(T... args)
Represents a JSON value.
Definition json_value.h:149
Value & append(Value const &value)
Append value to array at the end.
ValueType type() const
Stream fatal() const
Definition Journal.h:352
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
Stream warn() const
Definition Journal.h:340
virtual Config & config()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual Family & getNodeFamily()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual NodeStore::Database & db()=0
std::size_t getPeerCount() const
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
void init(ScopedLockType &collectionLock)
std::set< uint256 > mRecentNodes
void addPeers()
Add more peers to the set, if possible.
std::shared_ptr< Ledger > mLedger
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
void tryDB(NodeStore::Database &srcDB)
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
std::vector< neededHash_t > getNeededHashes()
void update(std::uint32_t seq)
std::unique_ptr< PeerSet > mPeerSet
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:168
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the coresponding hash from peers.
bool storeLedger(std::shared_ptr< Ledger const > ledger)
Persistency layer for NodeObject.
Definition Database.h:51
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:240
std::string get() const
bool isZero() const
Definition SHAMapHash.h:54
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:97
SHAMapHash getHash() const
Definition SHAMap.cpp:890
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
int addRaw(Blob const &vector)
An immutable linear range of bytes.
Definition Slice.h:46
This class is an "active" object.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
iterator begin()
Definition base_uint.h:136
static constexpr std::size_t size()
Definition base_uint.h:526
T count_if(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T is_same_v
T make_pair(T... args)
T max(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:44
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Definition Indexes.cpp:222
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
auto constexpr ledgerAcquireTimeout
@ hotLEDGER
Definition NodeObject.h:34
@ ledgerBecomeAggressiveThreshold
@ ledgerTimeoutRetriesMax
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:244
Number root(Number f, unsigned d)
Definition Number.cpp:636
@ jtLEDGER_DATA
Definition Job.h:66
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T sample(T... args)
T stable_partition(T... args)
T str(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void sampleN(std::size_t n, F &&f)
void update(std::shared_ptr< Peer > &&peer, int dataCount)
T to_string(T... args)
T unlock(T... args)
T what(T... args)