rippled
Loading...
Searching...
No Matches
RCLConsensus.cpp
1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLValidations.h>
3#include <xrpld/app/ledger/BuildLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/InboundTransactions.h>
6#include <xrpld/app/ledger/LedgerMaster.h>
7#include <xrpld/app/ledger/LocalTxs.h>
8#include <xrpld/app/ledger/OpenLedger.h>
9#include <xrpld/app/misc/NegativeUNLVote.h>
10#include <xrpld/app/misc/TxQ.h>
11#include <xrpld/app/misc/ValidatorKeys.h>
12#include <xrpld/app/misc/ValidatorList.h>
13#include <xrpld/overlay/Overlay.h>
14#include <xrpld/overlay/predicates.h>
15
16#include <xrpl/basics/random.h>
17#include <xrpl/beast/core/LexicalCast.h>
18#include <xrpl/beast/utility/instrumentation.h>
19#include <xrpl/core/HashRouter.h>
20#include <xrpl/ledger/AmendmentTable.h>
21#include <xrpl/ledger/Ledger.h>
22#include <xrpl/ledger/LedgerTiming.h>
23#include <xrpl/protocol/BuildInfo.h>
24#include <xrpl/protocol/Feature.h>
25#include <xrpl/protocol/digest.h>
26#include <xrpl/server/LoadFeeTrack.h>
27#include <xrpl/server/NetworkOPs.h>
28
29#include <algorithm>
30#include <iomanip>
31#include <mutex>
32
33namespace xrpl {
34
36 Application& app,
39 LocalTxs& localTxs,
40 InboundTransactions& inboundTransactions,
42 ValidatorKeys const& validatorKeys,
43 beast::Journal journal)
44 : adaptor_(
45 app,
46 std::move(feeVote),
48 localTxs,
49 inboundTransactions,
50 validatorKeys,
51 journal)
52 , consensus_(clock, adaptor_, journal)
53 , j_(journal)
54{
55}
56
58 Application& app,
61 LocalTxs& localTxs,
62 InboundTransactions& inboundTransactions,
63 ValidatorKeys const& validatorKeys,
64 beast::Journal journal)
65 : app_(app)
66 , feeVote_(std::move(feeVote))
67 , ledgerMaster_(ledgerMaster)
68 , localTxs_(localTxs)
69 , inboundTransactions_{inboundTransactions}
70 , j_(journal)
71 , validatorKeys_(validatorKeys)
72 , valCookie_(1 + rand_int(crypto_prng(), std::numeric_limits<std::uint64_t>::max() - 1))
73 , nUnlVote_(validatorKeys_.nodeID, j_)
74{
75 XRPL_ASSERT(valCookie_, "xrpl::RCLConsensus::Adaptor::Adaptor : nonzero cookie");
76
77 JLOG(j_.info()) << "Consensus engine started (cookie: " + std::to_string(valCookie_) + ")";
78
79 if (validatorKeys_.nodeID != beast::zero && validatorKeys_.keys)
80 {
81 JLOG(j_.info()) << "Validator identity: "
83
84 if (validatorKeys_.keys->masterPublicKey != validatorKeys_.keys->publicKey)
85 {
86 JLOG(j_.debug()) << "Validator ephemeral signing key: "
88 << " (seq: " << std::to_string(validatorKeys_.sequence) << ")";
89 }
90 }
91}
92
95{
96 // we need to switch the ledger we're working from
97 auto built = ledgerMaster_.getLedgerByHash(hash);
98 if (!built)
99 {
100 if (acquiringLedger_ != hash)
101 {
102 // need to start acquiring the correct consensus LCL
103 JLOG(j_.warn()) << "Need consensus ledger " << hash;
104
105 // Tell the ledger acquire system that we need the consensus ledger
106 acquiringLedger_ = hash;
107
108 app_.getJobQueue().addJob(jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
109 JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
110 app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
111 });
112 }
113 return std::nullopt;
114 }
115
116 XRPL_ASSERT(
117 !built->open() && built->isImmutable(),
118 "xrpl::RCLConsensus::Adaptor::acquireLedger : valid ledger state");
119 XRPL_ASSERT(
120 built->header().hash == hash,
121 "xrpl::RCLConsensus::Adaptor::acquireLedger : ledger hash match");
122
123 // Notify inbound transactions of the new ledger sequence number
124 inboundTransactions_.newRound(built->header().seq);
125
126 return RCLCxLedger(built);
127}
128
129void
131{
132 protocol::TMProposeSet prop;
133
134 auto const& proposal = peerPos.proposal();
135
136 prop.set_proposeseq(proposal.proposeSeq());
137 prop.set_closetime(proposal.closeTime().time_since_epoch().count());
138
139 prop.set_currenttxhash(proposal.position().begin(), proposal.position().size());
140 prop.set_previousledger(proposal.prevLedger().begin(), proposal.prevLedger().size());
141
142 auto const pk = peerPos.publicKey().slice();
143 prop.set_nodepubkey(pk.data(), pk.size());
144
145 auto const sig = peerPos.signature();
146 prop.set_signature(sig.data(), sig.size());
147
148 app_.getOverlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
149}
150
151void
153{
154 // If we didn't relay this transaction recently, relay it to all peers
155 if (app_.getHashRouter().shouldRelay(tx.id()))
156 {
157 JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
158 auto const slice = tx.tx_->slice();
159 protocol::TMTransaction msg;
160 msg.set_rawtransaction(slice.data(), slice.size());
161 msg.set_status(protocol::tsNEW);
162 msg.set_receivetimestamp(app_.getTimeKeeper().now().time_since_epoch().count());
163 static std::set<Peer::id_t> const skip{};
164 app_.getOverlay().relay(tx.id(), msg, skip);
165 }
166 else
167 {
168 JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
169 }
170}
171void
173{
174 JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
175 << xrpl::to_string(proposal.prevLedger()) << " -> "
176 << xrpl::to_string(proposal.position());
177
178 protocol::TMProposeSet prop;
179
180 prop.set_currenttxhash(proposal.position().begin(), proposal.position().size());
181 prop.set_previousledger(proposal.prevLedger().begin(), proposal.prevLedger().size());
182 prop.set_proposeseq(proposal.proposeSeq());
183 prop.set_closetime(proposal.closeTime().time_since_epoch().count());
184
185 if (!validatorKeys_.keys)
186 {
187 JLOG(j_.warn()) << "RCLConsensus::Adaptor::propose: ValidatorKeys "
188 "not set: \n";
189 return;
190 }
191
192 auto const& keys = *validatorKeys_.keys;
193
194 prop.set_nodepubkey(keys.publicKey.data(), keys.publicKey.size());
195
196 auto sig = signDigest(keys.publicKey, keys.secretKey, proposal.signingHash());
197
198 prop.set_signature(sig.data(), sig.size());
199
200 auto const suppression = proposalUniqueId(
201 proposal.position(),
202 proposal.prevLedger(),
203 proposal.proposeSeq(),
204 proposal.closeTime(),
205 keys.publicKey,
206 sig);
207
208 app_.getHashRouter().addSuppression(suppression);
209
210 app_.getOverlay().broadcast(prop);
211}
212
213void
215{
216 inboundTransactions_.giveSet(txns.id(), txns.map_, false);
217}
218
221{
222 if (auto txns = inboundTransactions_.getSet(setId, true))
223 {
224 return RCLTxSet{std::move(txns)};
225 }
226 return std::nullopt;
227}
228
229bool
231{
232 return !app_.getOpenLedger().empty();
233}
234
237{
238 return app_.getValidations().numTrustedForLedger(h);
239}
240
243{
244 RCLValidations& vals = app_.getValidations();
245 return vals.getNodesAfter(RCLValidatedLedger(ledger.ledger_, vals.adaptor().journal()), h);
246}
247
250 uint256 ledgerID,
251 RCLCxLedger const& ledger,
253{
254 RCLValidations& vals = app_.getValidations();
255 uint256 netLgr = vals.getPreferred(
256 RCLValidatedLedger{ledger.ledger_, vals.adaptor().journal()},
257 ledgerMaster_.getValidLedgerIndex());
258
259 if (netLgr != ledgerID)
260 {
262 app_.getOPs().consensusViewChange();
263
264 JLOG(j_.debug()) << Json::Compact(app_.getValidations().getJsonTrie());
265 }
266
267 return netLgr;
268}
269
270auto
272 RCLCxLedger const& ledger,
273 NetClock::time_point const& closeTime,
275{
276 bool const wrongLCL = mode == ConsensusMode::wrongLedger;
278
279 notify(protocol::neCLOSING_LEDGER, ledger, !wrongLCL);
280
281 auto const& prevLedger = ledger.ledger_;
282
283 ledgerMaster_.applyHeldTransactions();
284 // Tell the ledger master not to acquire the ledger we're probably building
285 ledgerMaster_.setBuildingLedger(prevLedger->header().seq + 1);
286
287 auto initialLedger = app_.getOpenLedger().current();
288
289 auto initialSet = std::make_shared<SHAMap>(SHAMapType::TRANSACTION, app_.getNodeFamily());
290 initialSet->setUnbacked();
291
292 // Build SHAMap containing all transactions in our open ledger
293 for (auto const& tx : initialLedger->txs)
294 {
295 JLOG(j_.trace()) << "Adding open ledger TX " << tx.first->getTransactionID();
296 Serializer s(2048);
297 tx.first->add(s);
298 initialSet->addItem(
300 make_shamapitem(tx.first->getTransactionID(), s.slice()));
301 }
302
303 // Add pseudo-transactions to the set
304 if (app_.config().standalone() || (proposing && !wrongLCL))
305 {
306 if (prevLedger->isFlagLedger())
307 {
308 // previous ledger was flag ledger, add fee and amendment
309 // pseudo-transactions
310 auto validations =
311 app_.getValidators().negativeUNLFilter(app_.getValidations().getTrustedForLedger(
312 prevLedger->header().parentHash, prevLedger->seq() - 1));
313 if (validations.size() >= app_.getValidators().quorum())
314 {
315 feeVote_->doVoting(prevLedger, validations, initialSet);
316 app_.getAmendmentTable().doVoting(prevLedger, validations, initialSet, j_);
317 }
318 }
319 else if (prevLedger->isVotingLedger())
320 {
321 // previous ledger was a voting ledger,
322 // so the current consensus session is for a flag ledger,
323 // add negative UNL pseudo-transactions
324 nUnlVote_.doVoting(
325 prevLedger,
326 app_.getValidators().getTrustedMasterKeys(),
327 app_.getValidations(),
328 initialSet);
329 }
330 }
331
332 // Now we need an immutable snapshot
333 initialSet = initialSet->snapShot(false);
334
335 if (!wrongLCL)
336 {
337 LedgerIndex const seq = prevLedger->header().seq + 1;
339
340 initialSet->visitLeaves(
341 [&proposed, seq](boost::intrusive_ptr<SHAMapItem const> const& item) {
342 proposed.emplace_back(item->key(), seq);
343 });
344
345 censorshipDetector_.propose(std::move(proposed));
346 }
347
348 // Needed because of the move below.
349 auto const setHash = initialSet->getHash().as_uint256();
350
351 return Result{
352 std::move(initialSet),
354 initialLedger->header().parentHash,
356 setHash,
357 closeTime,
358 app_.getTimeKeeper().closeTime(),
359 validatorKeys_.nodeID}};
360}
361
362void
364 Result const& result,
365 RCLCxLedger const& prevLedger,
366 NetClock::duration const& closeResolution,
367 ConsensusCloseTimes const& rawCloseTimes,
368 ConsensusMode const& mode,
369 Json::Value&& consensusJson)
370{
371 doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(consensusJson));
372}
373
374void
376 Result const& result,
377 RCLCxLedger const& prevLedger,
378 NetClock::duration const& closeResolution,
379 ConsensusCloseTimes const& rawCloseTimes,
380 ConsensusMode const& mode,
381 Json::Value&& consensusJson,
382 bool const validating)
383{
384 app_.getJobQueue().addJob(
385 jtACCEPT,
386 "AcceptLedger",
387 // NOLINTNEXTLINE(cppcoreguidelines-misleading-capture-default-by-value)
388 [=, this, cj = std::move(consensusJson)]() mutable {
389 // Note that no lock is held or acquired during this job.
390 // This is because generic Consensus guarantees that once a ledger
391 // is accepted, the consensus results and capture by reference state
392 // will not change until startRound is called (which happens via
393 // endConsensus).
394 RclConsensusLogger clog("onAccept", validating, j_);
395 this->doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(cj));
396 this->app_.getOPs().endConsensus(clog.ss());
397 });
398}
399
400void
402 Result const& result,
403 RCLCxLedger const& prevLedger,
404 NetClock::duration closeResolution,
405 ConsensusCloseTimes const& rawCloseTimes,
406 ConsensusMode const& mode,
407 Json::Value&& consensusJson)
408{
409 prevProposers_ = result.proposers;
410 prevRoundTime_ = result.roundTime.read();
411
412 bool closeTimeCorrect = false;
413
415 bool const haveCorrectLCL = mode != ConsensusMode::wrongLedger;
416 bool const consensusFail = result.state == ConsensusState::MovedOn;
417
418 auto consensusCloseTime = result.position.closeTime();
419
420 if (consensusCloseTime == NetClock::time_point{})
421 {
422 // We agreed to disagree on the close time
423 using namespace std::chrono_literals;
424 consensusCloseTime = prevLedger.closeTime() + 1s;
425 closeTimeCorrect = false;
426 }
427 else
428 {
429 // We agreed on a close time
430 consensusCloseTime =
431 effCloseTime(consensusCloseTime, closeResolution, prevLedger.closeTime());
432 closeTimeCorrect = true;
433 }
434
435 JLOG(j_.debug()) << "Report: Prop=" << (proposing ? "yes" : "no")
436 << " val=" << (validating_ ? "yes" : "no")
437 << " corLCL=" << (haveCorrectLCL ? "yes" : "no")
438 << " fail=" << (consensusFail ? "yes" : "no");
439 JLOG(j_.debug()) << "Report: Prev = " << prevLedger.id() << ":" << prevLedger.seq();
440
441 //--------------------------------------------------------------------------
442 std::set<TxID> failed;
443
444 // We want to put transactions in an unpredictable but deterministic order:
445 // we use the hash of the set.
446 //
447 // FIXME: Use a std::vector and a custom sorter instead of CanonicalTXSet?
448 CanonicalTXSet retriableTxs{result.txns.map_->getHash().as_uint256()};
449
450 JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key();
451
452 for (auto const& item : *result.txns.map_)
453 {
454 try
455 {
456 retriableTxs.insert(std::make_shared<STTx const>(SerialIter{item.slice()}));
457 JLOG(j_.debug()) << " Tx: " << item.key();
458 }
459 catch (std::exception const& ex)
460 {
461 failed.insert(item.key());
462 JLOG(j_.warn()) << " Tx: " << item.key() << " throws: " << ex.what();
463 }
464 }
465
466 auto built = buildLCL(
467 prevLedger,
468 retriableTxs,
469 consensusCloseTime,
470 closeTimeCorrect,
471 closeResolution,
472 result.roundTime.read(),
473 failed);
474
475 auto const newLCLHash = built.id();
476 JLOG(j_.debug()) << "Built ledger #" << built.seq() << ": " << newLCLHash;
477
478 // Tell directly connected peers that we have a new LCL
479 notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL);
480
481 // As long as we're in sync with the network, attempt to detect attempts
482 // at censorship of transaction by tracking which ones don't make it in
483 // after a period of time.
484 if (haveCorrectLCL && result.state == ConsensusState::Yes)
485 {
487
488 result.txns.map_->visitLeaves(
489 [&accepted](boost::intrusive_ptr<SHAMapItem const> const& item) {
490 accepted.push_back(item->key());
491 });
492
493 // Track all the transactions which failed or were marked as retriable
494 for (auto const& r : retriableTxs)
495 failed.insert(r.first.getTXID());
496
497 censorshipDetector_.check(
498 std::move(accepted),
499 [curr = built.seq(), j = app_.getJournal("CensorshipDetector"), &failed](
500 uint256 const& id, LedgerIndex seq) {
501 if (failed.contains(id))
502 return true;
503
504 auto const wait = curr - seq;
505
506 if (wait && (wait % censorshipWarnInternal == 0))
507 {
509 ss << "Potential Censorship: Eligible tx " << id
510 << ", which we are tracking since ledger " << seq
511 << " has not been included as of ledger " << curr << ".";
512
513 JLOG(j.warn()) << ss.str();
514 }
515
516 return false;
517 });
518 }
519
520 if (validating_)
521 validating_ = ledgerMaster_.isCompatible(*built.ledger_, j_.warn(), "Not validating");
522
523 if (validating_ && !consensusFail && app_.getValidations().canValidateSeq(built.seq()))
524 {
525 validate(built, result.txns, proposing);
526 JLOG(j_.info()) << "CNF Val " << newLCLHash;
527 }
528 else
529 JLOG(j_.info()) << "CNF buildLCL " << newLCLHash;
530
531 // See if we can accept a ledger as fully-validated
532 ledgerMaster_.consensusBuilt(built.ledger_, result.txns.id(), std::move(consensusJson));
533
534 //-------------------------------------------------------------------------
535 {
536 // Apply disputed transactions that didn't get in
537 //
538 // The first crack of transactions to get into the new
539 // open ledger goes to transactions proposed by a validator
540 // we trust but not included in the consensus set.
541 //
542 // These are done first because they are the most likely
543 // to receive agreement during consensus. They are also
544 // ordered logically "sooner" than transactions not mentioned
545 // in the previous consensus round.
546 //
547 bool anyDisputes = false;
548 for (auto const& [_, dispute] : result.disputes)
549 {
550 (void)_;
551 if (!dispute.getOurVote())
552 {
553 // we voted NO
554 try
555 {
556 JLOG(j_.debug()) << "Test applying disputed transaction that did"
557 << " not get in " << dispute.tx().id();
558
559 SerialIter sit(dispute.tx().tx_->slice());
560 auto txn = std::make_shared<STTx const>(sit);
561
562 // Disputed pseudo-transactions that were not accepted
563 // can't be successfully applied in the next ledger
564 if (isPseudoTx(*txn))
565 continue;
566
567 retriableTxs.insert(txn);
568
569 anyDisputes = true;
570 }
571 catch (std::exception const& ex)
572 {
573 JLOG(j_.debug()) << "Failed to apply transaction we voted "
574 "NO on. Exception: "
575 << ex.what();
576 }
577 }
578 }
579
580 // Build new open ledger
581 std::unique_lock lock{app_.getMasterMutex(), std::defer_lock};
582 std::unique_lock sl{ledgerMaster_.peekMutex(), std::defer_lock};
583 std::lock(lock, sl);
584
585 auto const lastVal = ledgerMaster_.getValidatedLedger();
587 if (lastVal)
588 {
589 rules = makeRulesGivenLedger(*lastVal, app_.config().features);
590 }
591 else
592 {
593 rules.emplace(app_.config().features);
594 }
595 app_.getOpenLedger().accept(
596 app_,
597 *rules,
598 built.ledger_,
599 localTxs_.getTxSet(),
600 anyDisputes,
601 retriableTxs,
602 tapNONE,
603 "consensus",
604 [&](OpenView& view, beast::Journal j) {
605 // Stuff the ledger with transactions from the queue.
606 return app_.getTxQ().accept(app_, view);
607 });
608
609 // Signal a potential fee change to subscribers after the open ledger
610 // is created
611 app_.getOPs().reportFeeChange();
612 }
613
614 //-------------------------------------------------------------------------
615 {
616 ledgerMaster_.switchLCL(built.ledger_);
617
618 // Do these need to exist?
619 XRPL_ASSERT(
620 ledgerMaster_.getClosedLedger()->header().hash == built.id(),
621 "xrpl::RCLConsensus::Adaptor::doAccept : ledger hash match");
622 XRPL_ASSERT(
623 app_.getOpenLedger().current()->header().parentHash == built.id(),
624 "xrpl::RCLConsensus::Adaptor::doAccept : parent hash match");
625 }
626
627 //-------------------------------------------------------------------------
628 // we entered the round with the network,
629 // see how close our close time is to other node's
630 // close time reports, and update our clock.
631 if ((mode == ConsensusMode::proposing || mode == ConsensusMode::observing) && !consensusFail)
632 {
633 auto closeTime = rawCloseTimes.self;
634
635 JLOG(j_.info()) << "We closed at " << closeTime.time_since_epoch().count();
637 usec64_t closeTotal = std::chrono::duration_cast<usec64_t>(closeTime.time_since_epoch());
638 int closeCount = 1;
639
640 for (auto const& [t, v] : rawCloseTimes.peers)
641 {
642 JLOG(j_.info()) << std::to_string(v) << " time votes for "
643 << std::to_string(t.time_since_epoch().count());
644 closeCount += v;
645 closeTotal += std::chrono::duration_cast<usec64_t>(t.time_since_epoch()) * v;
646 }
647
648 closeTotal += usec64_t(closeCount / 2); // for round to nearest
649 closeTotal /= closeCount;
650
651 // Use signed times since we are subtracting
654 auto offset = time_point{closeTotal} - std::chrono::time_point_cast<duration>(closeTime);
655 JLOG(j_.info()) << "Our close offset is estimated at " << offset.count() << " ("
656 << closeCount << ")";
657
658 app_.getTimeKeeper().adjustCloseTime(offset);
659 }
660}
661
662void
664 protocol::NodeEvent ne,
665 RCLCxLedger const& ledger,
666 bool haveCorrectLCL)
667{
668 protocol::TMStatusChange s;
669
670 if (!haveCorrectLCL)
671 {
672 s.set_newevent(protocol::neLOST_SYNC);
673 }
674 else
675 {
676 s.set_newevent(ne);
677 }
678
679 s.set_ledgerseq(ledger.seq());
680 s.set_networktime(app_.getTimeKeeper().now().time_since_epoch().count());
681 s.set_ledgerhashprevious(
682 ledger.parentID().begin(), std::decay_t<decltype(ledger.parentID())>::bytes);
683 s.set_ledgerhash(ledger.id().begin(), std::decay_t<decltype(ledger.id())>::bytes);
684
685 std::uint32_t uMin = 0, uMax = 0;
686 if (!ledgerMaster_.getFullValidatedRange(uMin, uMax))
687 {
688 uMin = 0;
689 uMax = 0;
690 }
691 else
692 {
693 // Don't advertise ledgers we're not willing to serve
694 uMin = std::max(uMin, ledgerMaster_.getEarliestFetch());
695 }
696 s.set_firstseq(uMin);
697 s.set_lastseq(uMax);
698 app_.getOverlay().foreach(send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
699 JLOG(j_.trace()) << "send status change to peer";
700}
701
704 RCLCxLedger const& previousLedger,
705 CanonicalTXSet& retriableTxs,
706 NetClock::time_point closeTime,
707 bool closeTimeCorrect,
708 NetClock::duration closeResolution,
710 std::set<TxID>& failedTxs)
711{
712 std::shared_ptr<Ledger> built = [&]() {
713 if (auto const replayData = ledgerMaster_.releaseReplay())
714 {
715 XRPL_ASSERT(
716 replayData->parent()->header().hash == previousLedger.id(),
717 "xrpl::RCLConsensus::Adaptor::buildLCL : parent hash match");
718 return buildLedger(*replayData, tapNONE, app_, j_);
719 }
720 return buildLedger(
721 previousLedger.ledger_,
722 closeTime,
723 closeTimeCorrect,
724 closeResolution,
725 app_,
726 retriableTxs,
727 failedTxs,
728 j_);
729 }();
730
731 // Update fee computations based on accepted txs
732 using namespace std::chrono_literals;
733 app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s);
734
735 // And stash the ledger in the ledger master
736 if (ledgerMaster_.storeLedger(built))
737 {
738 JLOG(j_.debug()) << "Consensus built ledger we already had";
739 }
740 else if (app_.getInboundLedgers().find(built->header().hash))
741 {
742 JLOG(j_.debug()) << "Consensus built ledger we were acquiring";
743 }
744 else
745 JLOG(j_.debug()) << "Consensus built new ledger";
746 return RCLCxLedger{std::move(built)};
747}
748
749void
751{
752 using namespace std::chrono_literals;
753
754 auto validationTime = app_.getTimeKeeper().closeTime();
755 if (validationTime <= lastValidationTime_)
756 validationTime = lastValidationTime_ + 1s;
757 lastValidationTime_ = validationTime;
758
759 if (!validatorKeys_.keys)
760 {
761 JLOG(j_.warn()) << "RCLConsensus::Adaptor::validate: ValidatorKeys "
762 "not set\n";
763 return;
764 }
765
766 auto const& keys = *validatorKeys_.keys;
767
769 lastValidationTime_,
770 keys.publicKey,
771 keys.secretKey,
772 validatorKeys_.nodeID,
773 [&](STValidation& v) {
774 v.setFieldH256(sfLedgerHash, ledger.id());
775 v.setFieldH256(sfConsensusHash, txns.id());
776
777 v.setFieldU32(sfLedgerSequence, ledger.seq());
778
779 if (proposing)
780 v.setFlag(vfFullValidation);
781
782 // Attest to the hash of what we consider to be the last fully
783 // validated ledger. This may be the hash of the ledger we are
784 // validating here, and that's fine.
785 if (auto const vl = ledgerMaster_.getValidatedLedger())
786 v.setFieldH256(sfValidatedHash, vl->header().hash);
787
788 v.setFieldU64(sfCookie, valCookie_);
789
790 // Report our server version every flag ledger:
791 if (ledger.ledger_->isVotingLedger())
792 v.setFieldU64(sfServerVersion, BuildInfo::getEncodedVersion());
793
794 // Report our load
795 {
796 auto const& ft = app_.getFeeTrack();
797 auto const fee = std::max(ft.getLocalFee(), ft.getClusterFee());
798 if (fee > ft.getLoadBase())
799 v.setFieldU32(sfLoadFee, fee);
800 }
801
802 // If the next ledger is a flag ledger, suggest fee changes and
803 // new features:
804 if (ledger.ledger_->isVotingLedger())
805 {
806 // Fees:
807 feeVote_->doValidation(ledger.ledger_->fees(), ledger.ledger_->rules(), v);
808
809 // Amendments
810 // FIXME: pass `v` and have the function insert the array
811 // directly?
812 auto const amendments =
813 app_.getAmendmentTable().doValidation(getEnabledAmendments(*ledger.ledger_));
814
815 if (!amendments.empty())
816 v.setFieldV256(sfAmendments, STVector256(sfAmendments, amendments));
817 }
818 });
819
820 auto const serialized = v->getSerialized();
821
822 // suppress it if we receive it
823 app_.getHashRouter().addSuppression(sha512Half(makeSlice(serialized)));
824
825 handleNewValidation(app_, v, "local");
826
827 // Broadcast to all our peers:
828 protocol::TMValidation val;
829 val.set_validation(serialized.data(), serialized.size());
830 app_.getOverlay().broadcast(val);
831
832 // Publish to all our subscribers:
833 app_.getOPs().pubValidation(v);
834}
835
836void
838{
839 JLOG(j_.info()) << "Consensus mode change before=" << to_string(before)
840 << ", after=" << to_string(after);
841
842 // If we were proposing but aren't any longer, we need to reset the
843 // censorship tracking to avoid bogus warnings.
844 if ((before == ConsensusMode::proposing || before == ConsensusMode::observing) &&
845 before != after)
846 censorshipDetector_.reset();
847
848 mode_ = after;
849}
850
853{
854 Json::Value ret;
855 {
856 std::lock_guard const _{mutex_};
857 ret = consensus_.getJson(full);
858 }
859 ret["validating"] = adaptor_.validating();
860 return ret;
861}
862
863void
865 NetClock::time_point const& now,
867{
868 try
869 {
870 std::lock_guard const _{mutex_};
871 consensus_.timerEntry(now, clog);
872 }
873 catch (SHAMapMissingNode const& mn)
874 {
875 // This should never happen
877 ss << "During consensus timerEntry: " << mn.what();
878 JLOG(j_.error()) << ss.str();
879 CLOG(clog) << ss.str();
880 Rethrow();
881 }
882}
883
884void
886{
887 try
888 {
889 std::lock_guard const _{mutex_};
890 consensus_.gotTxSet(now, txSet);
891 }
892 catch (SHAMapMissingNode const& mn)
893 {
894 // This should never happen
895 JLOG(j_.error()) << "During consensus gotTxSet: " << mn.what();
896 Rethrow();
897 }
898}
899
901
902void
904 NetClock::time_point const& now,
906{
907 std::lock_guard const _{mutex_};
908 consensus_.simulate(now, consensusDelay);
909}
910
911bool
913{
914 std::lock_guard const _{mutex_};
915 return consensus_.peerProposal(now, newProposal);
916}
917
918bool
920{
921 // We have a key, we do not want out of sync validations after a restart
922 // and are not amendment blocked.
923 validating_ = validatorKeys_.keys && prevLgr.seq() >= app_.getMaxDisallowedLedger() &&
924 !app_.getOPs().isBlocked();
925
926 // If we are not running in standalone mode and there's a configured UNL,
927 // check to make sure that it's not expired.
928 if (validating_ && !app_.config().standalone() && (app_.getValidators().count() != 0u))
929 {
930 auto const when = app_.getValidators().expires();
931
932 if (!when || *when < app_.getTimeKeeper().now())
933 {
934 JLOG(j_.error()) << "Voluntarily bowing out of consensus process "
935 "because of an expired validator list.";
936 validating_ = false;
937 }
938 }
939
940 bool const synced = app_.getOPs().getOperatingMode() == OperatingMode::FULL;
941
942 if (validating_)
943 {
944 JLOG(j_.info()) << "Entering consensus process, validating, synced="
945 << (synced ? "yes" : "no");
946 }
947 else
948 {
949 // Otherwise we just want to monitor the validation process.
950 JLOG(j_.info()) << "Entering consensus process, watching, synced="
951 << (synced ? "yes" : "no");
952 }
953
954 // Notify inbound ledgers that we are starting a new round
955 inboundTransactions_.newRound(prevLgr.seq());
956
957 // Notify NegativeUNLVote that new validators are added
958 if (!nowTrusted.empty())
959 nUnlVote_.newValidators(prevLgr.seq() + 1, nowTrusted);
960
961 // propose only if we're in sync with the network (and validating)
962 return validating_ && synced;
963}
964
965bool
967{
968 return ledgerMaster_.haveValidated();
969}
970
973{
974 return ledgerMaster_.getValidLedgerIndex();
975}
976
979{
980 return app_.getValidators().getQuorumKeys();
981}
982
985 Ledger_t::Seq const seq,
987{
988 return app_.getValidations().laggards(seq, trustedKeys);
989}
990
991bool
993{
994 return validatorKeys_.keys.has_value();
995}
996
997void
999{
1000 if ((positions == 0u) && app_.getOPs().isFull())
1001 app_.getOPs().setMode(OperatingMode::CONNECTED);
1002}
1003
1004void
1006 NetClock::time_point const& now,
1007 RCLCxLedger::ID const& prevLgrId,
1008 RCLCxLedger const& prevLgr,
1009 hash_set<NodeID> const& nowUntrusted,
1010 hash_set<NodeID> const& nowTrusted,
1012{
1013 std::lock_guard const _{mutex_};
1014 consensus_.startRound(
1015 now, prevLgrId, prevLgr, nowUntrusted, adaptor_.preStartRound(prevLgr, nowTrusted), clog);
1016}
1017
1019 : j_(j)
1020{
1021 if (!validating && !j.info())
1022 return;
1025 header_ = "ConsensusLogger ";
1026 header_ += label;
1027 header_ += ": ";
1028}
1029
1031{
1032 if (!ss_)
1033 return;
1034 auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>(
1036 std::stringstream outSs;
1037 outSs << header_ << "duration " << (duration.count() / 1000) << '.' << std::setw(3)
1038 << std::setfill('0') << (duration.count() % 1000) << "s. " << ss_->str();
1040}
1041
1042} // namespace xrpl
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:130
virtual void writeAlways(Severity level, std::string const &text)=0
Bypass filter and write text to the sink at the specified severity.
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:270
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
Holds transactions which were deferred to the next pass of consensus.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
std::chrono::milliseconds read() const
Manages the acquisition and lifetime of transaction sets.
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:45
Slice slice() const noexcept
Definition PublicKey.h:103
Result onClose(RCLCxLedger const &ledger, NetClock::time_point const &closeTime, ConsensusMode mode)
Close the open ledger and return initial consensus position.
bool preStartRound(RCLCxLedger const &prevLedger, hash_set< NodeID > const &nowTrusted)
Called before kicking off a new consensus round.
bool validator() const
Whether I am a validator.
LedgerIndex getValidLedgerIndex() const
Adaptor(Application &app, std::unique_ptr< FeeVote > &&feeVote, LedgerMaster &ledgerMaster, LocalTxs &localTxs, InboundTransactions &inboundTransactions, ValidatorKeys const &validatorKeys, beast::Journal journal)
void updateOperatingMode(std::size_t const positions) const
Update operating mode based on current peer positions.
std::size_t proposersFinished(RCLCxLedger const &ledger, LedgerHash const &h) const
Number of proposers that have validated a ledger descended from requested ledger.
void validate(RCLCxLedger const &ledger, RCLTxSet const &txns, bool proposing)
Validate the given ledger and share with peers as necessary.
void doAccept(Result const &result, RCLCxLedger const &prevLedger, NetClock::duration closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson)
Accept a new ledger based on the given transactions.
void propose(RCLCxPeerPos::Proposal const &proposal)
Propose the given position to my peers.
RCLCxLedger buildLCL(RCLCxLedger const &previousLedger, CanonicalTXSet &retriableTxs, NetClock::time_point closeTime, bool closeTimeCorrect, NetClock::duration closeResolution, std::chrono::milliseconds roundTime, std::set< TxID > &failedTxs)
Build the new last closed ledger.
std::size_t proposersValidated(LedgerHash const &h) const
Number of proposers that have validated the given ledger.
void notify(protocol::NodeEvent ne, RCLCxLedger const &ledger, bool haveCorrectLCL)
Notify peers of a consensus state change.
void onAccept(Result const &result, RCLCxLedger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson, bool const validating)
Process the accepted ledger.
std::atomic< ConsensusMode > mode_
std::pair< std::size_t, hash_set< NodeKey_t > > getQuorumKeys() const
std::optional< RCLTxSet > acquireTxSet(RCLTxSet::ID const &setId)
Acquire the transaction set associated with a proposal.
ValidatorKeys const & validatorKeys_
uint256 getPrevLedger(uint256 ledgerID, RCLCxLedger const &ledger, ConsensusMode mode)
Get the ID of the previous ledger/last closed ledger(LCL) on the network.
beast::Journal const j_
std::size_t laggards(Ledger_t::Seq const seq, hash_set< NodeKey_t > &trustedKeys) const
void onForceAccept(Result const &result, RCLCxLedger const &prevLedger, NetClock::duration const &closeResolution, ConsensusCloseTimes const &rawCloseTimes, ConsensusMode const &mode, Json::Value &&consensusJson)
Process the accepted ledger that was a result of simulation/force accept.
std::optional< RCLCxLedger > acquireLedger(LedgerHash const &hash)
Attempt to acquire a specific ledger.
bool hasOpenTransactions() const
Whether the open ledger has any transactions.
RCLCensorshipDetector< TxID, LedgerIndex > censorshipDetector_
void onModeChange(ConsensusMode before, ConsensusMode after)
Notified of change in consensus mode.
std::uint64_t const valCookie_
void share(RCLCxPeerPos const &peerPos)
Share the given proposal with all peers.
Consensus< Adaptor > consensus_
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
void startRound(NetClock::time_point const &now, RCLCxLedger::ID const &prevLgrId, RCLCxLedger const &prevLgr, hash_set< NodeID > const &nowUntrusted, hash_set< NodeID > const &nowTrusted, std::unique_ptr< std::stringstream > const &clog)
Adjust the set of trusted validators and kick-off the next round of consensus.
bool validating() const
Whether we are validating consensus ledgers.
beast::Journal const j_
bool peerProposal(NetClock::time_point const &now, RCLCxPeerPos const &newProposal)
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Json::Value getJson(bool full) const
std::recursive_mutex mutex_
static constexpr unsigned int censorshipWarnInternal
Warn for transactions that haven't been included every so many ledgers.
RCLConsensus(Application &app, std::unique_ptr< FeeVote > &&feeVote, LedgerMaster &ledgerMaster, LocalTxs &localTxs, InboundTransactions &inboundTransactions, Consensus< Adaptor >::clock_type const &clock, ValidatorKeys const &validatorKeys, beast::Journal journal)
Constructor.
ConsensusMode mode() const
void gotTxSet(NetClock::time_point const &now, RCLTxSet const &txSet)
Represents a ledger in RCLConsensus.
Definition RCLCxLedger.h:16
std::shared_ptr< Ledger const > ledger_
The ledger instance.
Seq const & seq() const
Sequence number of the ledger.
Definition RCLCxLedger.h:41
ID const & parentID() const
Unique identifier (hash) of this ledger's parent.
Definition RCLCxLedger.h:55
ID const & id() const
Unique identifier (hash) of this ledger.
Definition RCLCxLedger.h:48
NetClock::time_point closeTime() const
The close time of this ledger.
Definition RCLCxLedger.h:76
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Slice signature() const
Signature of the proposal (not necessarily verified)
Proposal const & proposal() const
Represents a transaction in RCLConsensus.
Definition RCLCxTx.h:13
boost::intrusive_ptr< SHAMapItem const > tx_
The SHAMapItem that represents the transaction.
Definition RCLCxTx.h:34
ID const & id() const
The unique identifier/hash of the transaction.
Definition RCLCxTx.h:28
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:43
std::shared_ptr< SHAMap > map_
The SHAMap representing the transactions.
Definition RCLCxTx.h:167
ID id() const
The unique ID/hash of the transaction set.
Definition RCLCxTx.h:132
Wraps a ledger instance for use in generic Validations LedgerTrie.
beast::Journal journal() const
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
std::chrono::steady_clock::time_point start_
RclConsensusLogger(char const *label, bool validating, beast::Journal j)
std::unique_ptr< std::stringstream > ss_
Slice slice() const noexcept
Definition Serializer.h:44
std::size_t getNodesAfter(Ledger const &ledger, ID const &ledgerID)
Count the number of current trusted validators working on a ledger after the specified one.
Adaptor const & adaptor() const
Return the adaptor instance.
std::optional< std::pair< Seq, ID > > getPreferred(Ledger const &curr)
Return the sequence number and ID of the preferred working ledger.
Validator keys and manifest as set in configuration file.
std::uint32_t sequence
std::optional< Keys > keys
iterator begin()
Definition base_uint.h:112
T contains(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T insert(T... args)
T is_same_v
T lock(T... args)
T max(T... args)
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::chrono::time_point< Clock, Duration > effCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > resolution, std::chrono::time_point< Clock, Duration > priorCloseTime)
Calculate the effective ledger close time.
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ observing
We are observing peer positions, but not proposing our position.
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
boost::intrusive_ptr< SHAMapItem > make_shamapitem(uint256 const &tag, Slice data)
Definition SHAMapItem.h:139
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:204
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
XRPL_NO_SANITIZE_ADDRESS void Rethrow()
Rethrow the exception currently being handled.
Definition contract.h:33
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:50
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:92
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
@ MovedOn
The network has consensus without us.
@ Yes
We have consensus along with the network.
@ jtACCEPT
Definition Job.h:52
@ jtADVANCE
Definition Job.h:46
bool after(NetClock::time_point now, std::uint32_t mark)
Has the specified time passed?
Definition View.cpp:523
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, bool const closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
Buffer signDigest(PublicKey const &pk, SecretKey const &sk, uint256 const &digest)
Generate a signature for a message digest.
@ tapNONE
Definition ApplyView.h:11
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:215
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:809
@ accepted
Manifest is valid.
T setfill(T... args)
T setw(T... args)
T str(T... args)
Stores the set of initial close times.
NetClock::time_point self
Our close time estimate.
std::map< NetClock::time_point, int > peers
Close time estimates, keep ordered for predictable traverse.
Encapsulates the result of consensus.
TxSet_t txns
The set of transactions consensus agrees go in the ledger.
hash_map< typename Tx_t::ID, Dispute_t > disputes
Transactions which are under dispute with our peers.
ConsensusTimer roundTime
Proposal_t position
Our proposed position on transactions/close time.
Sends a message to all peers.
Definition predicates.h:12
T to_string(T... args)
T what(T... args)