rippled
Loading...
Searching...
No Matches
RCLConsensus.cpp
1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLValidations.h>
3#include <xrpld/app/ledger/BuildLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/InboundTransactions.h>
6#include <xrpld/app/ledger/Ledger.h>
7#include <xrpld/app/ledger/LedgerMaster.h>
8#include <xrpld/app/ledger/LocalTxs.h>
9#include <xrpld/app/ledger/OpenLedger.h>
10#include <xrpld/app/misc/AmendmentTable.h>
11#include <xrpld/app/misc/HashRouter.h>
12#include <xrpld/app/misc/LoadFeeTrack.h>
13#include <xrpld/app/misc/NegativeUNLVote.h>
14#include <xrpld/app/misc/NetworkOPs.h>
15#include <xrpld/app/misc/TxQ.h>
16#include <xrpld/app/misc/ValidatorKeys.h>
17#include <xrpld/app/misc/ValidatorList.h>
18#include <xrpld/consensus/LedgerTiming.h>
19#include <xrpld/overlay/Overlay.h>
20#include <xrpld/overlay/predicates.h>
21
22#include <xrpl/basics/random.h>
23#include <xrpl/beast/core/LexicalCast.h>
24#include <xrpl/beast/utility/instrumentation.h>
25#include <xrpl/protocol/BuildInfo.h>
26#include <xrpl/protocol/Feature.h>
27#include <xrpl/protocol/digest.h>
28
29#include <algorithm>
30#include <iomanip>
31#include <mutex>
32
33namespace ripple {
34
36 Application& app,
39 LocalTxs& localTxs,
40 InboundTransactions& inboundTransactions,
42 ValidatorKeys const& validatorKeys,
43 beast::Journal journal)
44 : adaptor_(
45 app,
46 std::move(feeVote),
48 localTxs,
49 inboundTransactions,
50 validatorKeys,
51 journal)
52 , consensus_(clock, adaptor_, journal)
53 , j_(journal)
54{
55}
56
58 Application& app,
61 LocalTxs& localTxs,
62 InboundTransactions& inboundTransactions,
63 ValidatorKeys const& validatorKeys,
64 beast::Journal journal)
65 : app_(app)
66 , feeVote_(std::move(feeVote))
67 , ledgerMaster_(ledgerMaster)
68 , localTxs_(localTxs)
69 , inboundTransactions_{inboundTransactions}
70 , j_(journal)
71 , validatorKeys_(validatorKeys)
72 , valCookie_(
73 1 +
76 std::numeric_limits<std::uint64_t>::max() - 1))
77 , nUnlVote_(validatorKeys_.nodeID, j_)
78{
79 XRPL_ASSERT(
80 valCookie_, "ripple::RCLConsensus::Adaptor::Adaptor : nonzero cookie");
81
82 JLOG(j_.info()) << "Consensus engine started (cookie: " +
84
85 if (validatorKeys_.nodeID != beast::zero && validatorKeys_.keys)
86 {
88
89 JLOG(j_.info()) << "Validator identity: "
90 << toBase58(
92 validatorKeys_.keys->masterPublicKey);
93
94 if (validatorKeys_.keys->masterPublicKey !=
95 validatorKeys_.keys->publicKey)
96 {
97 JLOG(j_.debug())
98 << "Validator ephemeral signing key: "
99 << toBase58(
101 << " (seq: " << std::to_string(validatorKeys_.sequence) << ")";
102 }
103 }
104}
105
108{
109 // we need to switch the ledger we're working from
110 auto built = ledgerMaster_.getLedgerByHash(hash);
111 if (!built)
112 {
113 if (acquiringLedger_ != hash)
114 {
115 // need to start acquiring the correct consensus LCL
116 JLOG(j_.warn()) << "Need consensus ledger " << hash;
117
118 // Tell the ledger acquire system that we need the consensus ledger
119 acquiringLedger_ = hash;
120
121 app_.getJobQueue().addJob(
122 jtADVANCE,
123 "getConsensusLedger1",
124 [id = hash, &app = app_, this]() {
125 JLOG(j_.debug())
126 << "JOB advanceLedger getConsensusLedger1 started";
127 app.getInboundLedgers().acquireAsync(
129 });
130 }
131 return std::nullopt;
132 }
133
134 XRPL_ASSERT(
135 !built->open() && built->isImmutable(),
136 "ripple::RCLConsensus::Adaptor::acquireLedger : valid ledger state");
137 XRPL_ASSERT(
138 built->info().hash == hash,
139 "ripple::RCLConsensus::Adaptor::acquireLedger : ledger hash match");
140
141 // Notify inbound transactions of the new ledger sequence number
142 inboundTransactions_.newRound(built->info().seq);
143
144 return RCLCxLedger(built);
145}
146
147void
149{
150 protocol::TMProposeSet prop;
151
152 auto const& proposal = peerPos.proposal();
153
154 prop.set_proposeseq(proposal.proposeSeq());
155 prop.set_closetime(proposal.closeTime().time_since_epoch().count());
156
157 prop.set_currenttxhash(
158 proposal.position().begin(), proposal.position().size());
159 prop.set_previousledger(
160 proposal.prevLedger().begin(), proposal.position().size());
161
162 auto const pk = peerPos.publicKey().slice();
163 prop.set_nodepubkey(pk.data(), pk.size());
164
165 auto const sig = peerPos.signature();
166 prop.set_signature(sig.data(), sig.size());
167
168 app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
169}
170
171void
173{
174 // If we didn't relay this transaction recently, relay it to all peers
175 if (app_.getHashRouter().shouldRelay(tx.id()))
176 {
177 JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
178 auto const slice = tx.tx_->slice();
179 protocol::TMTransaction msg;
180 msg.set_rawtransaction(slice.data(), slice.size());
181 msg.set_status(protocol::tsNEW);
182 msg.set_receivetimestamp(
183 app_.timeKeeper().now().time_since_epoch().count());
184 static std::set<Peer::id_t> skip{};
185 app_.overlay().relay(tx.id(), msg, skip);
186 }
187 else
188 {
189 JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
190 }
191}
192void
194{
195 JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
196 << ripple::to_string(proposal.prevLedger()) << " -> "
197 << ripple::to_string(proposal.position());
198
199 protocol::TMProposeSet prop;
200
201 prop.set_currenttxhash(
202 proposal.position().begin(), proposal.position().size());
203 prop.set_previousledger(
204 proposal.prevLedger().begin(), proposal.prevLedger().size());
205 prop.set_proposeseq(proposal.proposeSeq());
206 prop.set_closetime(proposal.closeTime().time_since_epoch().count());
207
208 if (!validatorKeys_.keys)
209 {
210 JLOG(j_.warn()) << "RCLConsensus::Adaptor::propose: ValidatorKeys "
211 "not set: \n";
212 return;
213 }
214
215 auto const& keys = *validatorKeys_.keys;
216
217 prop.set_nodepubkey(keys.publicKey.data(), keys.publicKey.size());
218
219 auto sig =
220 signDigest(keys.publicKey, keys.secretKey, proposal.signingHash());
221
222 prop.set_signature(sig.data(), sig.size());
223
224 auto const suppression = proposalUniqueId(
225 proposal.position(),
226 proposal.prevLedger(),
227 proposal.proposeSeq(),
228 proposal.closeTime(),
229 keys.publicKey,
230 sig);
231
232 app_.getHashRouter().addSuppression(suppression);
233
234 app_.overlay().broadcast(prop);
235}
236
237void
239{
240 inboundTransactions_.giveSet(txns.id(), txns.map_, false);
241}
242
245{
246 if (auto txns = inboundTransactions_.getSet(setId, true))
247 {
248 return RCLTxSet{std::move(txns)};
249 }
250 return std::nullopt;
251}
252
253bool
255{
256 return !app_.openLedger().empty();
257}
258
261{
262 return app_.getValidations().numTrustedForLedger(h);
263}
264
267 RCLCxLedger const& ledger,
268 LedgerHash const& h) const
269{
270 RCLValidations& vals = app_.getValidations();
271 return vals.getNodesAfter(
272 RCLValidatedLedger(ledger.ledger_, vals.adaptor().journal()), h);
273}
274
277 uint256 ledgerID,
278 RCLCxLedger const& ledger,
280{
281 RCLValidations& vals = app_.getValidations();
282 uint256 netLgr = vals.getPreferred(
283 RCLValidatedLedger{ledger.ledger_, vals.adaptor().journal()},
284 ledgerMaster_.getValidLedgerIndex());
285
286 if (netLgr != ledgerID)
287 {
289 app_.getOPs().consensusViewChange();
290
291 JLOG(j_.debug()) << Json::Compact(app_.getValidations().getJsonTrie());
292 }
293
294 return netLgr;
295}
296
297auto
299 RCLCxLedger const& ledger,
300 NetClock::time_point const& closeTime,
302{
303 bool const wrongLCL = mode == ConsensusMode::wrongLedger;
305
306 notify(protocol::neCLOSING_LEDGER, ledger, !wrongLCL);
307
308 auto const& prevLedger = ledger.ledger_;
309
310 ledgerMaster_.applyHeldTransactions();
311 // Tell the ledger master not to acquire the ledger we're probably building
312 ledgerMaster_.setBuildingLedger(prevLedger->info().seq + 1);
313
314 auto initialLedger = app_.openLedger().current();
315
316 auto initialSet =
318 initialSet->setUnbacked();
319
320 // Build SHAMap containing all transactions in our open ledger
321 for (auto const& tx : initialLedger->txs)
322 {
323 JLOG(j_.trace()) << "Adding open ledger TX "
324 << tx.first->getTransactionID();
325 Serializer s(2048);
326 tx.first->add(s);
327 initialSet->addItem(
329 make_shamapitem(tx.first->getTransactionID(), s.slice()));
330 }
331
332 // Add pseudo-transactions to the set
333 if (app_.config().standalone() || (proposing && !wrongLCL))
334 {
335 if (prevLedger->isFlagLedger())
336 {
337 // previous ledger was flag ledger, add fee and amendment
338 // pseudo-transactions
339 auto validations = app_.validators().negativeUNLFilter(
340 app_.getValidations().getTrustedForLedger(
341 prevLedger->info().parentHash, prevLedger->seq() - 1));
342 if (validations.size() >= app_.validators().quorum())
343 {
344 feeVote_->doVoting(prevLedger, validations, initialSet);
345 app_.getAmendmentTable().doVoting(
346 prevLedger, validations, initialSet, j_);
347 }
348 }
349 else if (
350 prevLedger->isVotingLedger() &&
351 prevLedger->rules().enabled(featureNegativeUNL))
352 {
353 // previous ledger was a voting ledger,
354 // so the current consensus session is for a flag ledger,
355 // add negative UNL pseudo-transactions
356 nUnlVote_.doVoting(
357 prevLedger,
358 app_.validators().getTrustedMasterKeys(),
359 app_.getValidations(),
360 initialSet);
361 }
362 }
363
364 // Now we need an immutable snapshot
365 initialSet = initialSet->snapShot(false);
366
367 if (!wrongLCL)
368 {
369 LedgerIndex const seq = prevLedger->info().seq + 1;
371
372 initialSet->visitLeaves(
373 [&proposed,
374 seq](boost::intrusive_ptr<SHAMapItem const> const& item) {
375 proposed.emplace_back(item->key(), seq);
376 });
377
378 censorshipDetector_.propose(std::move(proposed));
379 }
380
381 // Needed because of the move below.
382 auto const setHash = initialSet->getHash().as_uint256();
383
384 return Result{
385 std::move(initialSet),
387 initialLedger->info().parentHash,
389 setHash,
390 closeTime,
391 app_.timeKeeper().closeTime(),
392 validatorKeys_.nodeID}};
393}
394
395void
397 Result const& result,
398 RCLCxLedger const& prevLedger,
399 NetClock::duration const& closeResolution,
400 ConsensusCloseTimes const& rawCloseTimes,
401 ConsensusMode const& mode,
402 Json::Value&& consensusJson)
403{
404 doAccept(
405 result,
406 prevLedger,
407 closeResolution,
408 rawCloseTimes,
409 mode,
410 std::move(consensusJson));
411}
412
413void
415 Result const& result,
416 RCLCxLedger const& prevLedger,
417 NetClock::duration const& closeResolution,
418 ConsensusCloseTimes const& rawCloseTimes,
419 ConsensusMode const& mode,
420 Json::Value&& consensusJson,
421 bool const validating)
422{
423 app_.getJobQueue().addJob(
424 jtACCEPT,
425 "acceptLedger",
426 [=, this, cj = std::move(consensusJson)]() mutable {
427 // Note that no lock is held or acquired during this job.
428 // This is because generic Consensus guarantees that once a ledger
429 // is accepted, the consensus results and capture by reference state
430 // will not change until startRound is called (which happens via
431 // endConsensus).
432 RclConsensusLogger clog("onAccept", validating, j_);
433 this->doAccept(
434 result,
435 prevLedger,
436 closeResolution,
437 rawCloseTimes,
438 mode,
439 std::move(cj));
440 this->app_.getOPs().endConsensus(clog.ss());
441 });
442}
443
444void
446 Result const& result,
447 RCLCxLedger const& prevLedger,
448 NetClock::duration closeResolution,
449 ConsensusCloseTimes const& rawCloseTimes,
450 ConsensusMode const& mode,
451 Json::Value&& consensusJson)
452{
453 prevProposers_ = result.proposers;
454 prevRoundTime_ = result.roundTime.read();
455
456 bool closeTimeCorrect;
457
459 bool const haveCorrectLCL = mode != ConsensusMode::wrongLedger;
460 bool const consensusFail = result.state == ConsensusState::MovedOn;
461
462 auto consensusCloseTime = result.position.closeTime();
463
464 if (consensusCloseTime == NetClock::time_point{})
465 {
466 // We agreed to disagree on the close time
467 using namespace std::chrono_literals;
468 consensusCloseTime = prevLedger.closeTime() + 1s;
469 closeTimeCorrect = false;
470 }
471 else
472 {
473 // We agreed on a close time
474 consensusCloseTime = effCloseTime(
475 consensusCloseTime, closeResolution, prevLedger.closeTime());
476 closeTimeCorrect = true;
477 }
478
479 JLOG(j_.debug()) << "Report: Prop=" << (proposing ? "yes" : "no")
480 << " val=" << (validating_ ? "yes" : "no")
481 << " corLCL=" << (haveCorrectLCL ? "yes" : "no")
482 << " fail=" << (consensusFail ? "yes" : "no");
483 JLOG(j_.debug()) << "Report: Prev = " << prevLedger.id() << ":"
484 << prevLedger.seq();
485
486 //--------------------------------------------------------------------------
487 std::set<TxID> failed;
488
489 // We want to put transactions in an unpredictable but deterministic order:
490 // we use the hash of the set.
491 //
492 // FIXME: Use a std::vector and a custom sorter instead of CanonicalTXSet?
493 CanonicalTXSet retriableTxs{result.txns.map_->getHash().as_uint256()};
494
495 JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key();
496
497 for (auto const& item : *result.txns.map_)
498 {
499 try
500 {
501 retriableTxs.insert(
503 JLOG(j_.debug()) << " Tx: " << item.key();
504 }
505 catch (std::exception const& ex)
506 {
507 failed.insert(item.key());
508 JLOG(j_.warn())
509 << " Tx: " << item.key() << " throws: " << ex.what();
510 }
511 }
512
513 auto built = buildLCL(
514 prevLedger,
515 retriableTxs,
516 consensusCloseTime,
517 closeTimeCorrect,
518 closeResolution,
519 result.roundTime.read(),
520 failed);
521
522 auto const newLCLHash = built.id();
523 JLOG(j_.debug()) << "Built ledger #" << built.seq() << ": " << newLCLHash;
524
525 // Tell directly connected peers that we have a new LCL
526 notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL);
527
528 // As long as we're in sync with the network, attempt to detect attempts
529 // at censorship of transaction by tracking which ones don't make it in
530 // after a period of time.
531 if (haveCorrectLCL && result.state == ConsensusState::Yes)
532 {
534
535 result.txns.map_->visitLeaves(
536 [&accepted](boost::intrusive_ptr<SHAMapItem const> const& item) {
537 accepted.push_back(item->key());
538 });
539
540 // Track all the transactions which failed or were marked as retriable
541 for (auto const& r : retriableTxs)
542 failed.insert(r.first.getTXID());
543
544 censorshipDetector_.check(
545 std::move(accepted),
546 [curr = built.seq(),
547 j = app_.journal("CensorshipDetector"),
548 &failed](uint256 const& id, LedgerIndex seq) {
549 if (failed.count(id))
550 return true;
551
552 auto const wait = curr - seq;
553
554 if (wait && (wait % censorshipWarnInternal == 0))
555 {
557 ss << "Potential Censorship: Eligible tx " << id
558 << ", which we are tracking since ledger " << seq
559 << " has not been included as of ledger " << curr << ".";
560
561 JLOG(j.warn()) << ss.str();
562 }
563
564 return false;
565 });
566 }
567
568 if (validating_)
569 validating_ = ledgerMaster_.isCompatible(
570 *built.ledger_, j_.warn(), "Not validating");
571
572 if (validating_ && !consensusFail &&
573 app_.getValidations().canValidateSeq(built.seq()))
574 {
575 validate(built, result.txns, proposing);
576 JLOG(j_.info()) << "CNF Val " << newLCLHash;
577 }
578 else
579 JLOG(j_.info()) << "CNF buildLCL " << newLCLHash;
580
581 // See if we can accept a ledger as fully-validated
582 ledgerMaster_.consensusBuilt(
583 built.ledger_, result.txns.id(), std::move(consensusJson));
584
585 //-------------------------------------------------------------------------
586 {
587 // Apply disputed transactions that didn't get in
588 //
589 // The first crack of transactions to get into the new
590 // open ledger goes to transactions proposed by a validator
591 // we trust but not included in the consensus set.
592 //
593 // These are done first because they are the most likely
594 // to receive agreement during consensus. They are also
595 // ordered logically "sooner" than transactions not mentioned
596 // in the previous consensus round.
597 //
598 bool anyDisputes = false;
599 for (auto const& [_, dispute] : result.disputes)
600 {
601 (void)_;
602 if (!dispute.getOurVote())
603 {
604 // we voted NO
605 try
606 {
607 JLOG(j_.debug())
608 << "Test applying disputed transaction that did"
609 << " not get in " << dispute.tx().id();
610
611 SerialIter sit(dispute.tx().tx_->slice());
612 auto txn = std::make_shared<STTx const>(sit);
613
614 // Disputed pseudo-transactions that were not accepted
615 // can't be successfully applied in the next ledger
616 if (isPseudoTx(*txn))
617 continue;
618
619 retriableTxs.insert(txn);
620
621 anyDisputes = true;
622 }
623 catch (std::exception const& ex)
624 {
625 JLOG(j_.debug()) << "Failed to apply transaction we voted "
626 "NO on. Exception: "
627 << ex.what();
628 }
629 }
630 }
631
632 // Build new open ledger
633 std::unique_lock lock{app_.getMasterMutex(), std::defer_lock};
634 std::unique_lock sl{ledgerMaster_.peekMutex(), std::defer_lock};
635 std::lock(lock, sl);
636
637 auto const lastVal = ledgerMaster_.getValidatedLedger();
639 if (lastVal)
640 rules = makeRulesGivenLedger(*lastVal, app_.config().features);
641 else
642 rules.emplace(app_.config().features);
643 app_.openLedger().accept(
644 app_,
645 *rules,
646 built.ledger_,
647 localTxs_.getTxSet(),
648 anyDisputes,
649 retriableTxs,
650 tapNONE,
651 "consensus",
652 [&](OpenView& view, beast::Journal j) {
653 // Stuff the ledger with transactions from the queue.
654 return app_.getTxQ().accept(app_, view);
655 });
656
657 // Signal a potential fee change to subscribers after the open ledger
658 // is created
659 app_.getOPs().reportFeeChange();
660 }
661
662 //-------------------------------------------------------------------------
663 {
664 ledgerMaster_.switchLCL(built.ledger_);
665
666 // Do these need to exist?
667 XRPL_ASSERT(
668 ledgerMaster_.getClosedLedger()->info().hash == built.id(),
669 "ripple::RCLConsensus::Adaptor::doAccept : ledger hash match");
670 XRPL_ASSERT(
671 app_.openLedger().current()->info().parentHash == built.id(),
672 "ripple::RCLConsensus::Adaptor::doAccept : parent hash match");
673 }
674
675 //-------------------------------------------------------------------------
676 // we entered the round with the network,
677 // see how close our close time is to other node's
678 // close time reports, and update our clock.
681 !consensusFail)
682 {
683 auto closeTime = rawCloseTimes.self;
684
685 JLOG(j_.info()) << "We closed at "
686 << closeTime.time_since_epoch().count();
688 usec64_t closeTotal =
689 std::chrono::duration_cast<usec64_t>(closeTime.time_since_epoch());
690 int closeCount = 1;
691
692 for (auto const& [t, v] : rawCloseTimes.peers)
693 {
694 JLOG(j_.info()) << std::to_string(v) << " time votes for "
695 << std::to_string(t.time_since_epoch().count());
696 closeCount += v;
697 closeTotal +=
698 std::chrono::duration_cast<usec64_t>(t.time_since_epoch()) * v;
699 }
700
701 closeTotal += usec64_t(closeCount / 2); // for round to nearest
702 closeTotal /= closeCount;
703
704 // Use signed times since we are subtracting
707 auto offset = time_point{closeTotal} -
708 std::chrono::time_point_cast<duration>(closeTime);
709 JLOG(j_.info()) << "Our close offset is estimated at " << offset.count()
710 << " (" << closeCount << ")";
711
712 app_.timeKeeper().adjustCloseTime(offset);
713 }
714}
715
716void
718 protocol::NodeEvent ne,
719 RCLCxLedger const& ledger,
720 bool haveCorrectLCL)
721{
722 protocol::TMStatusChange s;
723
724 if (!haveCorrectLCL)
725 s.set_newevent(protocol::neLOST_SYNC);
726 else
727 s.set_newevent(ne);
728
729 s.set_ledgerseq(ledger.seq());
730 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
731 s.set_ledgerhashprevious(
732 ledger.parentID().begin(),
733 std::decay_t<decltype(ledger.parentID())>::bytes);
734 s.set_ledgerhash(
735 ledger.id().begin(), std::decay_t<decltype(ledger.id())>::bytes);
736
737 std::uint32_t uMin, uMax;
738 if (!ledgerMaster_.getFullValidatedRange(uMin, uMax))
739 {
740 uMin = 0;
741 uMax = 0;
742 }
743 else
744 {
745 // Don't advertise ledgers we're not willing to serve
746 uMin = std::max(uMin, ledgerMaster_.getEarliestFetch());
747 }
748 s.set_firstseq(uMin);
749 s.set_lastseq(uMax);
750 app_.overlay().foreach(
751 send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
752 JLOG(j_.trace()) << "send status change to peer";
753}
754
757 RCLCxLedger const& previousLedger,
758 CanonicalTXSet& retriableTxs,
759 NetClock::time_point closeTime,
760 bool closeTimeCorrect,
761 NetClock::duration closeResolution,
763 std::set<TxID>& failedTxs)
764{
765 std::shared_ptr<Ledger> built = [&]() {
766 if (auto const replayData = ledgerMaster_.releaseReplay())
767 {
768 XRPL_ASSERT(
769 replayData->parent()->info().hash == previousLedger.id(),
770 "ripple::RCLConsensus::Adaptor::buildLCL : parent hash match");
771 return buildLedger(*replayData, tapNONE, app_, j_);
772 }
773 return buildLedger(
774 previousLedger.ledger_,
775 closeTime,
776 closeTimeCorrect,
777 closeResolution,
778 app_,
779 retriableTxs,
780 failedTxs,
781 j_);
782 }();
783
784 // Update fee computations based on accepted txs
785 using namespace std::chrono_literals;
786 app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s);
787
788 // And stash the ledger in the ledger master
789 if (ledgerMaster_.storeLedger(built))
790 JLOG(j_.debug()) << "Consensus built ledger we already had";
791 else if (app_.getInboundLedgers().find(built->info().hash))
792 JLOG(j_.debug()) << "Consensus built ledger we were acquiring";
793 else
794 JLOG(j_.debug()) << "Consensus built new ledger";
795 return RCLCxLedger{std::move(built)};
796}
797
798void
800 RCLCxLedger const& ledger,
801 RCLTxSet const& txns,
802 bool proposing)
803{
804 using namespace std::chrono_literals;
805
806 auto validationTime = app_.timeKeeper().closeTime();
807 if (validationTime <= lastValidationTime_)
808 validationTime = lastValidationTime_ + 1s;
809 lastValidationTime_ = validationTime;
810
811 if (!validatorKeys_.keys)
812 {
813 JLOG(j_.warn()) << "RCLConsensus::Adaptor::validate: ValidatorKeys "
814 "not set\n";
815 return;
816 }
817
818 auto const& keys = *validatorKeys_.keys;
819
821 lastValidationTime_,
822 keys.publicKey,
823 keys.secretKey,
824 validatorKeys_.nodeID,
825 [&](STValidation& v) {
826 v.setFieldH256(sfLedgerHash, ledger.id());
827 v.setFieldH256(sfConsensusHash, txns.id());
828
829 v.setFieldU32(sfLedgerSequence, ledger.seq());
830
831 if (proposing)
832 v.setFlag(vfFullValidation);
833
834 if (ledger.ledger_->rules().enabled(featureHardenedValidations))
835 {
836 // Attest to the hash of what we consider to be the last fully
837 // validated ledger. This may be the hash of the ledger we are
838 // validating here, and that's fine.
839 if (auto const vl = ledgerMaster_.getValidatedLedger())
840 v.setFieldH256(sfValidatedHash, vl->info().hash);
841
842 v.setFieldU64(sfCookie, valCookie_);
843
844 // Report our server version every flag ledger:
845 if (ledger.ledger_->isVotingLedger())
846 v.setFieldU64(
847 sfServerVersion, BuildInfo::getEncodedVersion());
848 }
849
850 // Report our load
851 {
852 auto const& ft = app_.getFeeTrack();
853 auto const fee = std::max(ft.getLocalFee(), ft.getClusterFee());
854 if (fee > ft.getLoadBase())
855 v.setFieldU32(sfLoadFee, fee);
856 }
857
858 // If the next ledger is a flag ledger, suggest fee changes and
859 // new features:
860 if (ledger.ledger_->isVotingLedger())
861 {
862 // Fees:
863 feeVote_->doValidation(
864 ledger.ledger_->fees(), ledger.ledger_->rules(), v);
865
866 // Amendments
867 // FIXME: pass `v` and have the function insert the array
868 // directly?
869 auto const amendments = app_.getAmendmentTable().doValidation(
870 getEnabledAmendments(*ledger.ledger_));
871
872 if (!amendments.empty())
873 v.setFieldV256(
874 sfAmendments, STVector256(sfAmendments, amendments));
875 }
876 });
877
878 auto const serialized = v->getSerialized();
879
880 // suppress it if we receive it
881 app_.getHashRouter().addSuppression(sha512Half(makeSlice(serialized)));
882
883 handleNewValidation(app_, v, "local");
884
885 // Broadcast to all our peers:
886 protocol::TMValidation val;
887 val.set_validation(serialized.data(), serialized.size());
888 app_.overlay().broadcast(val);
889
890 // Publish to all our subscribers:
891 app_.getOPs().pubValidation(v);
892}
893
894void
896{
897 JLOG(j_.info()) << "Consensus mode change before=" << to_string(before)
898 << ", after=" << to_string(after);
899
900 // If we were proposing but aren't any longer, we need to reset the
901 // censorship tracking to avoid bogus warnings.
902 if ((before == ConsensusMode::proposing ||
903 before == ConsensusMode::observing) &&
904 before != after)
905 censorshipDetector_.reset();
906
907 mode_ = after;
908}
909
911RCLConsensus::getJson(bool full) const
912{
913 Json::Value ret;
914 {
916 ret = consensus_.getJson(full);
917 }
918 ret["validating"] = adaptor_.validating();
919 return ret;
920}
921
922void
924 NetClock::time_point const& now,
926{
927 try
928 {
930 consensus_.timerEntry(now, clog);
931 }
932 catch (SHAMapMissingNode const& mn)
933 {
934 // This should never happen
936 ss << "During consensus timerEntry: " << mn.what();
937 JLOG(j_.error()) << ss.str();
938 CLOG(clog) << ss.str();
939 Rethrow();
940 }
941}
942
943void
945{
946 try
947 {
949 consensus_.gotTxSet(now, txSet);
950 }
951 catch (SHAMapMissingNode const& mn)
952 {
953 // This should never happen
954 JLOG(j_.error()) << "During consensus gotTxSet: " << mn.what();
955 Rethrow();
956 }
957}
958
960
961void
963 NetClock::time_point const& now,
965{
967 consensus_.simulate(now, consensusDelay);
968}
969
970bool
972 NetClock::time_point const& now,
973 RCLCxPeerPos const& newProposal)
974{
976 return consensus_.peerProposal(now, newProposal);
977}
978
979bool
981 RCLCxLedger const& prevLgr,
982 hash_set<NodeID> const& nowTrusted)
983{
984 // We have a key, we do not want out of sync validations after a restart
985 // and are not amendment blocked.
986 validating_ = validatorKeys_.keys &&
987 prevLgr.seq() >= app_.getMaxDisallowedLedger() &&
988 !app_.getOPs().isBlocked();
989
990 // If we are not running in standalone mode and there's a configured UNL,
991 // check to make sure that it's not expired.
992 if (validating_ && !app_.config().standalone() && app_.validators().count())
993 {
994 auto const when = app_.validators().expires();
995
996 if (!when || *when < app_.timeKeeper().now())
997 {
998 JLOG(j_.error()) << "Voluntarily bowing out of consensus process "
999 "because of an expired validator list.";
1000 validating_ = false;
1001 }
1002 }
1003
1004 bool const synced = app_.getOPs().getOperatingMode() == OperatingMode::FULL;
1005
1006 if (validating_)
1007 {
1008 JLOG(j_.info()) << "Entering consensus process, validating, synced="
1009 << (synced ? "yes" : "no");
1010 }
1011 else
1012 {
1013 // Otherwise we just want to monitor the validation process.
1014 JLOG(j_.info()) << "Entering consensus process, watching, synced="
1015 << (synced ? "yes" : "no");
1016 }
1017
1018 // Notify inbound ledgers that we are starting a new round
1019 inboundTransactions_.newRound(prevLgr.seq());
1020
1021 // Notify NegativeUNLVote that new validators are added
1022 if (prevLgr.ledger_->rules().enabled(featureNegativeUNL) &&
1023 !nowTrusted.empty())
1024 nUnlVote_.newValidators(prevLgr.seq() + 1, nowTrusted);
1025
1026 // propose only if we're in sync with the network (and validating)
1027 return validating_ && synced;
1028}
1029
1030bool
1032{
1033 return ledgerMaster_.haveValidated();
1034}
1035
1038{
1039 return ledgerMaster_.getValidLedgerIndex();
1040}
1041
1044{
1045 return app_.validators().getQuorumKeys();
1046}
1047
1050 Ledger_t::Seq const seq,
1052{
1053 return app_.getValidations().laggards(seq, trustedKeys);
1054}
1055
1056bool
1058{
1059 return validatorKeys_.keys.has_value();
1060}
1061
1062void
1064{
1065 if (!positions && app_.getOPs().isFull())
1066 app_.getOPs().setMode(OperatingMode::CONNECTED);
1067}
1068
1069void
1071 NetClock::time_point const& now,
1072 RCLCxLedger::ID const& prevLgrId,
1073 RCLCxLedger const& prevLgr,
1074 hash_set<NodeID> const& nowUntrusted,
1075 hash_set<NodeID> const& nowTrusted,
1077{
1079 consensus_.startRound(
1080 now,
1081 prevLgrId,
1082 prevLgr,
1083 nowUntrusted,
1084 adaptor_.preStartRound(prevLgr, nowTrusted),
1085 clog);
1086}
1087
1089 char const* label,
1090 bool const validating,
1092 : j_(j)
1093{
1094 if (!validating && !j.info())
1095 return;
1098 header_ = "ConsensusLogger ";
1099 header_ += label;
1100 header_ += ": ";
1101}
1102
1104{
1105 if (!ss_)
1106 return;
1107 auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>(
1109 std::stringstream outSs;
1110 outSs << header_ << "duration " << (duration.count() / 1000) << '.'
1111 << std::setw(3) << std::setfill('0') << (duration.count() % 1000)
1112 << "s. " << ss_->str();
1114}
1115
1116} // namespace ripple
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
virtual void writeAlways(Severity level, std::string const &text)=0
Bypass filter and write text to the sink at the specified severity.
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:278
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
Holds transactions which were deferred to the next pass of consensus.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
std::chrono::milliseconds read() const
Manages the acquisition and lifetime of transaction sets.
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:46
Slice slice() const noexcept
Definition PublicKey.h:104
void propose(RCLCxPeerPos::Proposal const &proposal)
Propose the given position to my peers.
ValidatorKeys const & validatorKeys_
beast::Journal const j_
std::atomic< ConsensusMode > mode_
Result onClose(RCLCxLedger const &ledger, NetClock::time_point const &closeTime, ConsensusMode mode)
Close the open ledger and return initial consensus position.
void share(RCLCxPeerPos const &peerPos)
Share the given proposal with all peers.
void doAccept(Result const &result, RCLCxLedger const &prevLedger, NetClock::duration closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson)
Accept a new ledger based on the given transactions.
Adaptor(Application &app, std::unique_ptr< FeeVote > &&feeVote, LedgerMaster &ledgerMaster, LocalTxs &localTxs, InboundTransactions &inboundTransactions, ValidatorKeys const &validatorKeys, beast::Journal journal)
void onModeChange(ConsensusMode before, ConsensusMode after)
Notified of change in consensus mode.
std::size_t laggards(Ledger_t::Seq const seq, hash_set< NodeKey_t > &trustedKeys) const
RCLCensorshipDetector< TxID, LedgerIndex > censorshipDetector_
void onForceAccept(Result const &result, RCLCxLedger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson)
Process the accepted ledger that was a result of simulation/force accept.
bool validator() const
Whether I am a validator.
RCLCxLedger buildLCL(RCLCxLedger const &previousLedger, CanonicalTXSet &retriableTxs, NetClock::time_point closeTime, bool closeTimeCorrect, NetClock::duration closeResolution, std::chrono::milliseconds roundTime, std::set< TxID > &failedTxs)
Build the new last closed ledger.
std::optional< RCLCxLedger > acquireLedger(LedgerHash const &hash)
Attempt to acquire a specific ledger.
LedgerIndex getValidLedgerIndex() const
std::size_t proposersFinished(RCLCxLedger const &ledger, LedgerHash const &h) const
Number of proposers that have validated a ledger descended from requested ledger.
std::optional< RCLTxSet > acquireTxSet(RCLTxSet::ID const &setId)
Acquire the transaction set associated with a proposal.
bool hasOpenTransactions() const
Whether the open ledger has any transactions.
void onAccept(Result const &result, RCLCxLedger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson, bool const validating)
Process the accepted ledger.
void validate(RCLCxLedger const &ledger, RCLTxSet const &txns, bool proposing)
Validate the given ledger and share with peers as necessary.
std::uint64_t const valCookie_
bool preStartRound(RCLCxLedger const &prevLedger, hash_set< NodeID > const &nowTrusted)
Called before kicking off a new consensus round.
uint256 getPrevLedger(uint256 ledgerID, RCLCxLedger const &ledger, ConsensusMode mode)
Get the ID of the previous ledger/last closed ledger(LCL) on the network.
std::size_t proposersValidated(LedgerHash const &h) const
Number of proposers that have validated the given ledger.
void notify(protocol::NodeEvent ne, RCLCxLedger const &ledger, bool haveCorrectLCL)
Notify peers of a consensus state change.
void updateOperatingMode(std::size_t const positions) const
Update operating mode based on current peer positions.
std::pair< std::size_t, hash_set< NodeKey_t > > getQuorumKeys() const
void gotTxSet(NetClock::time_point const &now, RCLTxSet const &txSet)
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
bool validating() const
Whether we are validating consensus ledgers.
RCLConsensus(Application &app, std::unique_ptr< FeeVote > &&feeVote, LedgerMaster &ledgerMaster, LocalTxs &localTxs, InboundTransactions &inboundTransactions, Consensus< Adaptor >::clock_type const &clock, ValidatorKeys const &validatorKeys, beast::Journal journal)
Constructor.
ConsensusMode mode() const
static constexpr unsigned int censorshipWarnInternal
Warn for transactions that haven't been included every so many ledgers.
std::recursive_mutex mutex_
Consensus< Adaptor > consensus_
bool peerProposal(NetClock::time_point const &now, RCLCxPeerPos const &newProposal)
void startRound(NetClock::time_point const &now, RCLCxLedger::ID const &prevLgrId, RCLCxLedger const &prevLgr, hash_set< NodeID > const &nowUntrusted, hash_set< NodeID > const &nowTrusted, std::unique_ptr< std::stringstream > const &clog)
Adjust the set of trusted validators and kick-off the next round of consensus.
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Json::Value getJson(bool full) const
beast::Journal const j_
Represents a ledger in RCLConsensus.
Definition RCLCxLedger.h:17
Seq const & seq() const
Sequence number of the ledger.
Definition RCLCxLedger.h:42
ID const & id() const
Unique identifier (hash) of this ledger.
Definition RCLCxLedger.h:49
std::shared_ptr< Ledger const > ledger_
The ledger instance.
NetClock::time_point closeTime() const
The close time of this ledger.
Definition RCLCxLedger.h:77
ID const & parentID() const
Unique identifier (hash) of this ledger's parent.
Definition RCLCxLedger.h:56
A peer's signed, proposed position for use in RCLConsensus.
Proposal const & proposal() const
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Slice signature() const
Signature of the proposal (not necessarily verified)
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
Represents a transaction in RCLConsensus.
Definition RCLCxTx.h:14
ID const & id() const
The unique identifier/hash of the transaction.
Definition RCLCxTx.h:29
boost::intrusive_ptr< SHAMapItem const > tx_
The SHAMapItem that represents the transaction.
Definition RCLCxTx.h:35
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:44
ID id() const
The unique ID/hash of the transaction set.
Definition RCLCxTx.h:134
std::shared_ptr< SHAMap > map_
The SHAMap representing the transactions.
Definition RCLCxTx.h:169
Wraps a ledger instance for use in generic Validations LedgerTrie.
beast::Journal journal() const
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
RclConsensusLogger(char const *label, bool validating, beast::Journal j)
std::chrono::steady_clock::time_point start_
std::unique_ptr< std::stringstream > ss_
Slice slice() const noexcept
Definition Serializer.h:47
std::optional< std::pair< Seq, ID > > getPreferred(Ledger const &curr)
Return the sequence number and ID of the preferred working ledger.
Adaptor const & adaptor() const
Return the adaptor instance.
std::size_t getNodesAfter(Ledger const &ledger, ID const &ledgerID)
Count the number of current trusted validators working on a ledger after the specified one.
Validator keys and manifest as set in configuration file.
std::optional< Keys > keys
std::uint32_t sequence
iterator begin()
Definition base_uint.h:117
T count(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T insert(T... args)
T is_same_v
T lock(T... args)
T max(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:95
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
std::chrono::time_point< Clock, Duration > effCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > resolution, std::chrono::time_point< Clock, Duration > priorCloseTime)
Calculate the effective ledger close time.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:851
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ observing
We are observing peer positions, but not proposing our position.
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
@ MovedOn
The network has consensus without us.
@ Yes
We have consensus along with the network.
@ accepted
Manifest is valid.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:225
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, bool const closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
Buffer signDigest(PublicKey const &pk, SecretKey const &sk, uint256 const &digest)
Generate a signature for a message digest.
boost::intrusive_ptr< SHAMapItem > make_shamapitem(uint256 const &tag, Slice data)
Definition SHAMapItem.h:142
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:50
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
bool after(NetClock::time_point now, std::uint32_t mark)
Has the specified time passed?
Definition View.cpp:3247
@ tapNONE
Definition ApplyView.h:12
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
void Rethrow()
Rethrow the exception currently being handled.
Definition contract.h:29
@ jtACCEPT
Definition Job.h:54
@ jtADVANCE
Definition Job.h:48
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:205
STL namespace.
T setfill(T... args)
T setw(T... args)
T str(T... args)
Stores the set of initial close times.
std::map< NetClock::time_point, int > peers
Close time estimates, keep ordered for predictable traverse.
NetClock::time_point self
Our close time estimate.
Encapsulates the result of consensus.
hash_map< typename Tx_t::ID, Dispute_t > disputes
Transactions which are under dispute with our peers.
TxSet_t txns
The set of transactions consensus agrees go in the ledger.
Proposal_t position
Our proposed position on transactions/close time.
Sends a message to all peers.
Definition predicates.h:13
T to_string(T... args)
T what(T... args)